Browse Source

取消db模块的sql包;给CreateBucket接口增加上锁过程

gitlink
Sydonian 2 years ago
parent
commit
18ed91c074
2 changed files with 12 additions and 13 deletions
  1. +5
    -6
      internal/event/agent_check_state.go
  2. +7
    -7
      internal/event/agent_check_storage.go

+ 5
- 6
internal/event/agent_check_state.go View File

@@ -8,7 +8,6 @@ import (
"gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/common/pkg/logger"
"gitlink.org.cn/cloudream/db/model" "gitlink.org.cn/cloudream/db/model"
mysql "gitlink.org.cn/cloudream/db/sql"
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
@@ -38,7 +37,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) {
log := logger.WithType[AgentCheckState]("Event") log := logger.WithType[AgentCheckState]("Event")
log.Debugf("begin with %v", logger.FormatStruct(t)) log.Debugf("begin with %v", logger.FormatStruct(t))


node, err := mysql.Node.GetByID(execCtx.Args.DB.SQLCtx(), t.NodeID)
node, err := execCtx.Args.DB.Node().GetByID(execCtx.Args.DB.SQLCtx(), t.NodeID)
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
return return
} }
@@ -55,7 +54,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) {
// 检查上次上报时间,超时的设置为不可用 // 检查上次上报时间,超时的设置为不可用
// TODO 没有上报过是否要特殊处理? // TODO 没有上报过是否要特殊处理?
if node.LastReportTime == nil && time.Since(*node.LastReportTime) > time.Duration(config.Cfg().NodeUnavailableSeconds)*time.Second { if node.LastReportTime == nil && time.Since(*node.LastReportTime) > time.Duration(config.Cfg().NodeUnavailableSeconds)*time.Second {
err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE)
err := execCtx.Args.DB.Node().ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE)
if err != nil { if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("set node state failed, err: %s", err.Error()) log.WithField("NodeID", t.NodeID).Warnf("set node state failed, err: %s", err.Error())
return return
@@ -79,7 +78,6 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) {
} }
defer agentClient.Close() defer agentClient.Close()


// 紧急任务
getResp, err := agentClient.GetState(agtmsg.NewGetStateBody()) getResp, err := agentClient.GetState(agtmsg.NewGetStateBody())
if err != nil { if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error()) log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error())
@@ -89,10 +87,11 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) {
return return
} }


// 根据返回结果修改节点状态
if getResp.Body.IPFSState != consts.IPFS_STATE_OK { if getResp.Body.IPFSState != consts.IPFS_STATE_OK {
log.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", getResp.Body.IPFSState) log.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", getResp.Body.IPFSState)


err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE)
err := execCtx.Args.DB.Node().ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE)
if err != nil { if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error()) log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error())
} }
@@ -100,7 +99,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) {
} }


// TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal // TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal
err = mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_NORMAL)
err = execCtx.Args.DB.Node().ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_NORMAL)
if err != nil { if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error()) log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error())
} }


+ 7
- 7
internal/event/agent_check_storage.go View File

@@ -7,7 +7,6 @@ import (
"gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/common/pkg/logger"
"gitlink.org.cn/cloudream/db/model" "gitlink.org.cn/cloudream/db/model"
mysql "gitlink.org.cn/cloudream/db/sql"
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
@@ -48,7 +47,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
log := logger.WithType[AgentCheckStorage]("Event") log := logger.WithType[AgentCheckStorage]("Event")
log.Debugf("begin with %v", logger.FormatStruct(t)) log.Debugf("begin with %v", logger.FormatStruct(t))


stg, err := mysql.Storage.GetByID(execCtx.Args.DB.SQLCtx(), t.StorageID)
stg, err := execCtx.Args.DB.Storage().GetByID(execCtx.Args.DB.SQLCtx(), t.StorageID)
if err != nil { if err != nil {
if err != sql.ErrNoRows { if err != sql.ErrNoRows {
log.WithField("StorageID", t.StorageID).Warnf("get storage failed, err: %s", err.Error()) log.WithField("StorageID", t.StorageID).Warnf("get storage failed, err: %s", err.Error())
@@ -56,7 +55,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
return return
} }


node, err := mysql.Node.GetByID(execCtx.Args.DB.SQLCtx(), stg.NodeID)
node, err := execCtx.Args.DB.Node().GetByID(execCtx.Args.DB.SQLCtx(), stg.NodeID)
if err != nil { if err != nil {
if err != sql.ErrNoRows { if err != sql.ErrNoRows {
log.WithField("StorageID", t.StorageID).Warnf("get storage node failed, err: %s", err.Error()) log.WithField("StorageID", t.StorageID).Warnf("get storage node failed, err: %s", err.Error())
@@ -74,7 +73,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
var objects []model.StorageObject var objects []model.StorageObject
if t.ObjectIDs == nil { if t.ObjectIDs == nil {
var err error var err error
objects, err = mysql.StorageObject.GetAllByStorageID(execCtx.Args.DB.SQLCtx(), t.StorageID)
objects, err = execCtx.Args.DB.StorageObject().GetAllByStorageID(execCtx.Args.DB.SQLCtx(), t.StorageID)
if err != nil { if err != nil {
log.WithField("StorageID", t.StorageID).Warnf("get storage objects failed, err: %s", err.Error()) log.WithField("StorageID", t.StorageID).Warnf("get storage objects failed, err: %s", err.Error())
return return
@@ -82,7 +81,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
isComplete = true isComplete = true
} else { } else {
for _, objID := range t.ObjectIDs { for _, objID := range t.ObjectIDs {
objs, err := mysql.StorageObject.GetAllByStorageAndObjectID(execCtx.Args.DB.SQLCtx(), t.StorageID, objID)
objs, err := execCtx.Args.DB.StorageObject().GetAllByStorageAndObjectID(execCtx.Args.DB.SQLCtx(), t.StorageID, objID)
if err != nil { if err != nil {
log.WithField("StorageID", t.StorageID). log.WithField("StorageID", t.StorageID).
WithField("ObjectID", objID). WithField("ObjectID", objID).
@@ -112,11 +111,12 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
return return
} }


// 根据返回结果修改数据库
var chkObjIDs []int var chkObjIDs []int
for _, entry := range checkResp.Body.Entries { for _, entry := range checkResp.Body.Entries {
switch entry.Operation { switch entry.Operation {
case agtmsg.CHECK_STORAGE_RESP_OP_DELETE: case agtmsg.CHECK_STORAGE_RESP_OP_DELETE:
err := mysql.StorageObject.Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID)
err := execCtx.Args.DB.StorageObject().Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID)
if err != nil { if err != nil {
log.WithField("StorageID", t.StorageID). log.WithField("StorageID", t.StorageID).
WithField("ObjectID", entry.ObjectID). WithField("ObjectID", entry.ObjectID).
@@ -130,7 +130,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
Debugf("delete storage object") Debugf("delete storage object")


case agtmsg.CHECK_STORAGE_RESP_OP_SET_NORMAL: case agtmsg.CHECK_STORAGE_RESP_OP_SET_NORMAL:
err := mysql.StorageObject.SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID)
err := execCtx.Args.DB.StorageObject().SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID)
if err != nil { if err != nil {
log.WithField("StorageID", t.StorageID). log.WithField("StorageID", t.StorageID).
WithField("ObjectID", entry.ObjectID). WithField("ObjectID", entry.ObjectID).


Loading…
Cancel
Save