Browse Source

修复检查节点状态的逻辑

gitlink
Sydonian 2 years ago
parent
commit
ce581dbc94
3 changed files with 24 additions and 26 deletions
  1. +1
    -1
      internal/event/agent_check_cache.go
  2. +22
    -25
      internal/event/agent_check_state.go
  3. +1
    -0
      internal/event/agent_check_storage.go

+ 1
- 1
internal/event/agent_check_cache.go View File

@@ -120,7 +120,7 @@ func (t *AgentCheckCache) checkIncrement(execCtx ExecuteContext) {
caches = append(caches, ch)
}

t.startCheck(execCtx, true, caches)
t.startCheck(execCtx, false, caches)
}

func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, caches []model.Cache) {


+ 22
- 25
internal/event/agent_check_state.go View File

@@ -61,30 +61,6 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) {
return
}

if node.State != consts.NODE_STATE_NORMAL {
return
}

// 检查上次上报时间,超时的设置为不可用
// TODO 没有上报过是否要特殊处理?
if node.LastReportTime != nil && time.Since(*node.LastReportTime) > time.Duration(config.Cfg().NodeUnavailableSeconds)*time.Second {
err := execCtx.Args.DB.Node().UpdateState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE)
if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("set node state failed, err: %s", err.Error())
return
}

caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID)
if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error())
return
}

// 补充备份数
execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.FileHash })))
return
}

agentClient, err := agtcli.NewClient(t.NodeID, &config.Cfg().RabbitMQ)
if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error())
@@ -92,9 +68,30 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) {
}
defer agentClient.Close()

getResp, err := agentClient.GetState(agtmsg.NewGetState(), rabbitmq.RequestOption{Timeout: time.Minute})
getResp, err := agentClient.GetState(agtmsg.NewGetState(), rabbitmq.RequestOption{Timeout: time.Second * 30})
if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("getting state: %s", err.Error())

// 检查上次上报时间,超时的设置为不可用
// TODO 没有上报过是否要特殊处理?
if node.LastReportTime != nil && time.Since(*node.LastReportTime) > time.Duration(config.Cfg().NodeUnavailableSeconds)*time.Second {
err := execCtx.Args.DB.Node().UpdateState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE)
if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("set node state failed, err: %s", err.Error())
return
}

caches, err := execCtx.Args.DB.Cache().GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID)
if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error())
return
}

// 补充备份数
execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.FileHash })))
return
}
return
}

// 根据返回结果修改节点状态


+ 1
- 0
internal/event/agent_check_storage.go View File

@@ -154,6 +154,7 @@ func (t *AgentCheckStorage) startCheck(execCtx ExecuteContext, stg model.Storage
checkResp, err := agentClient.CheckStorage(agtmsg.NewCheckStorage(stg.StorageID, stg.Directory, isComplete, objects), rabbitmq.RequestOption{Timeout: time.Minute})
if err != nil {
log.WithField("NodeID", stg.NodeID).Warnf("checking storage: %s", err.Error())
return
}

// 根据返回结果修改数据库


Loading…
Cancel
Save