From 2071a9fbae2dd51c6f97c585aadb9e3a8e282d14 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 19 May 2023 15:11:16 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=A3=80=E6=9F=A5=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E7=8A=B6=E6=80=81=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/config/config.go | 1 + internal/task/agent_check_state.go | 86 ++++++++++++++++++++++++++++++ internal/task/check_rep_count.go | 6 +++ 3 files changed, 93 insertions(+) create mode 100644 internal/task/agent_check_state.go diff --git a/internal/config/config.go b/internal/config/config.go index 01c555b..4301a0b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,6 +9,7 @@ import ( type Config struct { MinAvailableRepProportion float32 `json:"minAvailableRepProportion"` // 可用的备份至少要占所有备份的比例,向上去整 + NodeUnavailableSeconds int `json:"nodeUnavailableSeconds"` // 如果节点上次上报时间超过这个值,则认为节点已经不可用 Logger log.Config `json:"logger"` DB db.Config `json:"db"` diff --git a/internal/task/agent_check_state.go b/internal/task/agent_check_state.go new file mode 100644 index 0000000..1addb95 --- /dev/null +++ b/internal/task/agent_check_state.go @@ -0,0 +1,86 @@ +package task + +import ( + "database/sql" + "time" + + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/consts" + "gitlink.org.cn/cloudream/db/model" + mysql "gitlink.org.cn/cloudream/db/sql" + agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" + agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" + agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/task" + "gitlink.org.cn/cloudream/scanner/internal/config" + "gitlink.org.cn/cloudream/utils/logger" +) + +type AgentCheckStateTask struct { + NodeIDs []int +} + +func NewAgentCheckStateTask(nodeIDs []int) AgentCheckStateTask { + return AgentCheckStateTask{ + NodeIDs: nodeIDs, + } +} + +func (t *AgentCheckStateTask) TryMerge(other Task) bool { + chkTask, ok := other.(*AgentCheckStateTask) + if !ok { + return false + } + + t.NodeIDs = lo.Union(t.NodeIDs, chkTask.NodeIDs) + return true +} + +func (t *AgentCheckStateTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { + for _, nodeID := range t.NodeIDs { + node, err := mysql.Node.GetByID(execCtx.DB.SQLCtx(), nodeID) + if err == sql.ErrNoRows { + continue + } + + if err != nil { + logger.WithField("NodeID", nodeID).Warnf("get node by id failed, err: %s", err.Error()) + continue + } + + if node.State != consts.NODE_STATE_NORMAL { + continue + } + + // 检查上次上报时间,超时的设置为不可用 + if time.Since(node.LastReportTime) > time.Duration(config.Cfg().NodeUnavailableSeconds)*time.Second { + err := mysql.Node.ChangeState(execCtx.DB.SQLCtx(), nodeID, consts.NODE_STATE_UNAVAILABLE) + if err != nil { + logger.WithField("NodeID", nodeID).Warnf("set node state failed, err: %s", err.Error()) + continue + } + + caches, err := mysql.Cache.GetNodeCaches(execCtx.DB.SQLCtx(), nodeID) + if err != nil { + logger.WithField("NodeID", nodeID).Warnf("get node caches failed, err: %s", err.Error()) + continue + } + + // 补充备份数 + execCtx.Executor.Post(NewCheckRepCountTask(lo.Map(caches, func(ch model.Cache, index int) string { return ch.HashValue }))) + continue + } + + agentClient, err := agtcli.NewAgentClient(nodeID, &config.Cfg().RabbitMQ) + if err != nil { + logger.WithField("NodeID", nodeID).Warnf("create agent client failed, err: %s", err.Error()) + continue + } + defer agentClient.Close() + + // 紧急任务 + err = agentClient.PostTask(agtmsg.NewPostTaskBody(agttsk.NewCheckStateTask(), true, true)) + if err != nil { + logger.WithField("NodeID", nodeID).Warnf("request to agent failed, err: %s", err.Error()) + } + } +} diff --git a/internal/task/check_rep_count.go b/internal/task/check_rep_count.go index f843bb7..99e1814 100644 --- a/internal/task/check_rep_count.go +++ b/internal/task/check_rep_count.go @@ -21,6 +21,12 @@ type CheckRepCountTask struct { FileHashes []string } +func NewCheckRepCountTask(fileHashes []string) *CheckRepCountTask { + return &CheckRepCountTask{ + FileHashes: fileHashes, + } +} + func (t *CheckRepCountTask) TryMerge(other Task) bool { chkTask, ok := other.(*CheckRepCountTask) if !ok {