From 18ed91c074118b2d55c689937fadee66c90703fd Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 14 Jun 2023 10:44:37 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8F=96=E6=B6=88db=E6=A8=A1=E5=9D=97=E7=9A=84?= =?UTF-8?q?sql=E5=8C=85=EF=BC=9B=E7=BB=99CreateBucket=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=B8=8A=E9=94=81=E8=BF=87=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/event/agent_check_state.go | 11 +++++------ internal/event/agent_check_storage.go | 14 +++++++------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/internal/event/agent_check_state.go b/internal/event/agent_check_state.go index 1ea3d04..0df1f1a 100644 --- a/internal/event/agent_check_state.go +++ b/internal/event/agent_check_state.go @@ -8,7 +8,6 @@ import ( "gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/db/model" - mysql "gitlink.org.cn/cloudream/db/sql" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" @@ -38,7 +37,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { log := logger.WithType[AgentCheckState]("Event") log.Debugf("begin with %v", logger.FormatStruct(t)) - node, err := mysql.Node.GetByID(execCtx.Args.DB.SQLCtx(), t.NodeID) + node, err := execCtx.Args.DB.Node().GetByID(execCtx.Args.DB.SQLCtx(), t.NodeID) if err == sql.ErrNoRows { return } @@ -55,7 +54,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { // 检查上次上报时间,超时的设置为不可用 // TODO 没有上报过是否要特殊处理? if node.LastReportTime == nil && time.Since(*node.LastReportTime) > time.Duration(config.Cfg().NodeUnavailableSeconds)*time.Second { - err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) + err := execCtx.Args.DB.Node().ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("set node state failed, err: %s", err.Error()) return @@ -79,7 +78,6 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { } defer agentClient.Close() - // 紧急任务 getResp, err := agentClient.GetState(agtmsg.NewGetStateBody()) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error()) @@ -89,10 +87,11 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { return } + // 根据返回结果修改节点状态 if getResp.Body.IPFSState != consts.IPFS_STATE_OK { log.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", getResp.Body.IPFSState) - err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) + err := execCtx.Args.DB.Node().ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error()) } @@ -100,7 +99,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { } // TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal - err = mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_NORMAL) + err = execCtx.Args.DB.Node().ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_NORMAL) if err != nil { log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error()) } diff --git a/internal/event/agent_check_storage.go b/internal/event/agent_check_storage.go index 4b19d6c..9f54c1a 100644 --- a/internal/event/agent_check_storage.go +++ b/internal/event/agent_check_storage.go @@ -7,7 +7,6 @@ import ( "gitlink.org.cn/cloudream/common/consts" "gitlink.org.cn/cloudream/common/pkg/logger" "gitlink.org.cn/cloudream/db/model" - mysql "gitlink.org.cn/cloudream/db/sql" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" @@ -48,7 +47,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { log := logger.WithType[AgentCheckStorage]("Event") log.Debugf("begin with %v", logger.FormatStruct(t)) - stg, err := mysql.Storage.GetByID(execCtx.Args.DB.SQLCtx(), t.StorageID) + stg, err := execCtx.Args.DB.Storage().GetByID(execCtx.Args.DB.SQLCtx(), t.StorageID) if err != nil { if err != sql.ErrNoRows { log.WithField("StorageID", t.StorageID).Warnf("get storage failed, err: %s", err.Error()) @@ -56,7 +55,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { return } - node, err := mysql.Node.GetByID(execCtx.Args.DB.SQLCtx(), stg.NodeID) + node, err := execCtx.Args.DB.Node().GetByID(execCtx.Args.DB.SQLCtx(), stg.NodeID) if err != nil { if err != sql.ErrNoRows { log.WithField("StorageID", t.StorageID).Warnf("get storage node failed, err: %s", err.Error()) @@ -74,7 +73,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { var objects []model.StorageObject if t.ObjectIDs == nil { var err error - objects, err = mysql.StorageObject.GetAllByStorageID(execCtx.Args.DB.SQLCtx(), t.StorageID) + objects, err = execCtx.Args.DB.StorageObject().GetAllByStorageID(execCtx.Args.DB.SQLCtx(), t.StorageID) if err != nil { log.WithField("StorageID", t.StorageID).Warnf("get storage objects failed, err: %s", err.Error()) return @@ -82,7 +81,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { isComplete = true } else { for _, objID := range t.ObjectIDs { - objs, err := mysql.StorageObject.GetAllByStorageAndObjectID(execCtx.Args.DB.SQLCtx(), t.StorageID, objID) + objs, err := execCtx.Args.DB.StorageObject().GetAllByStorageAndObjectID(execCtx.Args.DB.SQLCtx(), t.StorageID, objID) if err != nil { log.WithField("StorageID", t.StorageID). WithField("ObjectID", objID). @@ -112,11 +111,12 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { return } + // 根据返回结果修改数据库 var chkObjIDs []int for _, entry := range checkResp.Body.Entries { switch entry.Operation { case agtmsg.CHECK_STORAGE_RESP_OP_DELETE: - err := mysql.StorageObject.Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID) + err := execCtx.Args.DB.StorageObject().Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID) if err != nil { log.WithField("StorageID", t.StorageID). WithField("ObjectID", entry.ObjectID). @@ -130,7 +130,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { Debugf("delete storage object") case agtmsg.CHECK_STORAGE_RESP_OP_SET_NORMAL: - err := mysql.StorageObject.SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID) + err := execCtx.Args.DB.StorageObject().SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID) if err != nil { log.WithField("StorageID", t.StorageID). WithField("ObjectID", entry.ObjectID).