Browse Source

取消agent的event机制,改为对外提供API

gitlink
Sydonian 2 years ago
parent
commit
2cfe8fa35c
7 changed files with 104 additions and 221 deletions
  1. +36
    -6
      internal/event/agent_check_cache.go
  2. +23
    -3
      internal/event/agent_check_state.go
  3. +43
    -6
      internal/event/agent_check_storage.go
  4. +0
    -47
      internal/event/update_agent_state.go
  5. +0
    -70
      internal/event/update_cache.go
  6. +0
    -87
      internal/event/update_storage.go
  7. +2
    -2
      main.go

+ 36
- 6
internal/event/agent_check_cache.go View File

@@ -9,7 +9,7 @@ import (
"gitlink.org.cn/cloudream/scanner/internal/config" "gitlink.org.cn/cloudream/scanner/internal/config"


agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
) )


@@ -80,21 +80,51 @@ func (t *AgentCheckCache) Execute(execCtx ExecuteContext) {
} }


// 然后向代理端发送移动文件的请求 // 然后向代理端发送移动文件的请求
agentClient, err := agtcli.NewAgentClient(t.NodeID, &config.Cfg().RabbitMQ)
agentClient, err := agtcli.NewClient(t.NodeID, &config.Cfg().RabbitMQ)
if err != nil { if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error())
return return
} }
defer agentClient.Close() defer agentClient.Close()


err = agentClient.PostEvent(
agtevt.NewCheckCache(isComplete, caches),
execCtx.Option.IsEmergency, // 继承本任务的执行选项
execCtx.Option.DontMerge)
checkResp, err := agentClient.CheckIPFS(agtmsg.NewCheckIPFSBody(isComplete, caches))
if err != nil { if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error()) log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error())
return return
} }
if checkResp.IsFailed() {
log.WithField("NodeID", t.NodeID).Warnf("agent operation failed, err: %s", err.Error())
return
}

// 根据返回结果修改数据库
for _, entry := range checkResp.Body.Entries {
switch entry.Operation {
case agtmsg.CHECK_IPFS_RESP_OP_DELETE_TEMP:
err := execCtx.Args.DB.Cache().DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID)
if err != nil {
log.WithField("FileHash", entry.FileHash).
WithField("NodeID", t.NodeID).
Warnf("delete temp cache failed, err: %s", err.Error())
}

log.WithField("FileHash", entry.FileHash).
WithField("NodeID", t.NodeID).
Debugf("delete temp cache")

case agtmsg.CHECK_IPFS_RESP_OP_CREATE_TEMP:
err := execCtx.Args.DB.Cache().CreateTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID)
if err != nil {
log.WithField("FileHash", entry.FileHash).
WithField("NodeID", t.NodeID).
Warnf("create temp cache failed, err: %s", err.Error())
}

log.WithField("FileHash", entry.FileHash).
WithField("NodeID", t.NodeID).
Debugf("create temp cache")
}
}
} }
func init() { func init() {
RegisterMessageConvertor(func(msg scevt.AgentCheckCache) Event { return NewAgentCheckCache(msg.NodeID, msg.FileHashes) }) RegisterMessageConvertor(func(msg scevt.AgentCheckCache) Event { return NewAgentCheckCache(msg.NodeID, msg.FileHashes) })


+ 23
- 3
internal/event/agent_check_state.go View File

@@ -10,7 +10,7 @@ import (
"gitlink.org.cn/cloudream/db/model" "gitlink.org.cn/cloudream/db/model"
mysql "gitlink.org.cn/cloudream/db/sql" mysql "gitlink.org.cn/cloudream/db/sql"
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
"gitlink.org.cn/cloudream/scanner/internal/config" "gitlink.org.cn/cloudream/scanner/internal/config"
) )
@@ -72,7 +72,7 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) {
return return
} }


agentClient, err := agtcli.NewAgentClient(t.NodeID, &config.Cfg().RabbitMQ)
agentClient, err := agtcli.NewClient(t.NodeID, &config.Cfg().RabbitMQ)
if err != nil { if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error())
return return
@@ -80,10 +80,30 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) {
defer agentClient.Close() defer agentClient.Close()


// 紧急任务 // 紧急任务
err = agentClient.PostEvent(agtevt.NewCheckState(), true, true)
getResp, err := agentClient.GetState(agtmsg.NewGetStateBody())
if err != nil { if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error()) log.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error())
} }
if getResp.IsFailed() {
log.WithField("NodeID", t.NodeID).Warnf("agent operation failed, err: %s", err.Error())
return
}

if getResp.Body.IPFSState != consts.IPFS_STATE_OK {
log.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", getResp.Body.IPFSState)

err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE)
if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error())
}
return
}

// TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal
err = mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_NORMAL)
if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error())
}
} }


func init() { func init() {


+ 43
- 6
internal/event/agent_check_storage.go View File

@@ -9,7 +9,7 @@ import (
"gitlink.org.cn/cloudream/db/model" "gitlink.org.cn/cloudream/db/model"
mysql "gitlink.org.cn/cloudream/db/sql" mysql "gitlink.org.cn/cloudream/db/sql"
agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent"
agtevt "gitlink.org.cn/cloudream/rabbitmq/message/agent/event"
agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event" scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
"gitlink.org.cn/cloudream/scanner/internal/config" "gitlink.org.cn/cloudream/scanner/internal/config"
) )
@@ -96,20 +96,57 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
} }


// 投递任务 // 投递任务
agentClient, err := agtcli.NewAgentClient(stg.NodeID, &config.Cfg().RabbitMQ)
agentClient, err := agtcli.NewClient(stg.NodeID, &config.Cfg().RabbitMQ)
if err != nil { if err != nil {
log.WithField("NodeID", stg.NodeID).Warnf("create agent client failed, err: %s", err.Error()) log.WithField("NodeID", stg.NodeID).Warnf("create agent client failed, err: %s", err.Error())
return return
} }
defer agentClient.Close() defer agentClient.Close()


err = agentClient.PostEvent(
agtevt.NewCheckStorage(stg.StorageID, stg.Directory, isComplete, objects),
execCtx.Option.IsEmergency, // 继承本任务的执行选项
execCtx.Option.DontMerge)
checkResp, err := agentClient.CheckStorage(agtmsg.NewCheckStorageBody(stg.StorageID, stg.Directory, isComplete, objects))
if err != nil { if err != nil {
log.WithField("NodeID", stg.NodeID).Warnf("request to agent failed, err: %s", err.Error()) log.WithField("NodeID", stg.NodeID).Warnf("request to agent failed, err: %s", err.Error())
} }
if checkResp.IsFailed() {
log.WithField("NodeID", stg.NodeID).Warnf("agent operation failed, err: %s", err.Error())
return
}

var chkObjIDs []int
for _, entry := range checkResp.Body.Entries {
switch entry.Operation {
case agtmsg.CHECK_STORAGE_RESP_OP_DELETE:
err := mysql.StorageObject.Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID)
if err != nil {
log.WithField("StorageID", t.StorageID).
WithField("ObjectID", entry.ObjectID).
Warnf("delete storage object failed, err: %s", err.Error())
}
chkObjIDs = append(chkObjIDs, entry.ObjectID)

log.WithField("StorageID", t.StorageID).
WithField("ObjectID", entry.ObjectID).
WithField("UserID", entry.UserID).
Debugf("delete storage object")

case agtmsg.CHECK_STORAGE_RESP_OP_SET_NORMAL:
err := mysql.StorageObject.SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID)
if err != nil {
log.WithField("StorageID", t.StorageID).
WithField("ObjectID", entry.ObjectID).
Warnf("change storage object state failed, err: %s", err.Error())
}

log.WithField("StorageID", t.StorageID).
WithField("ObjectID", entry.ObjectID).
WithField("UserID", entry.UserID).
Debugf("set storage object normal")
}
}

if len(chkObjIDs) > 0 {
execCtx.Executor.Post(NewCheckObject(chkObjIDs))
}
} }


func init() { func init() {


+ 0
- 47
internal/event/update_agent_state.go View File

@@ -1,47 +0,0 @@
package event

import (
"gitlink.org.cn/cloudream/common/consts"
"gitlink.org.cn/cloudream/common/pkg/logger"
mysql "gitlink.org.cn/cloudream/db/sql"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
)

type UpdateAgentState struct {
scevt.UpdateAgentState
}

func NewUpdateAgentState(nodeID int, ipfsState string) *UpdateAgentState {
return &UpdateAgentState{
UpdateAgentState: scevt.NewUpdateAgentState(nodeID, ipfsState),
}
}

func (t *UpdateAgentState) TryMerge(other Event) bool {
return false
}

func (t *UpdateAgentState) Execute(execCtx ExecuteContext) {
log := logger.WithType[UpdateAgentState]("Event")
log.Debugf("begin with %v", logger.FormatStruct(t))

if t.IPFSState != consts.IPFS_STATUS_OK {
log.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", t.IPFSState)

err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE)
if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error())
}
return
}

// TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal
err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_NORMAL)
if err != nil {
log.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error())
}
}

func init() {
RegisterMessageConvertor(func(msg scevt.UpdateAgentState) Event { return NewUpdateAgentState(msg.NodeID, msg.IPFSState) })
}

+ 0
- 70
internal/event/update_cache.go View File

@@ -1,70 +0,0 @@
package event

import (
evtcst "gitlink.org.cn/cloudream/common/consts/event"
"gitlink.org.cn/cloudream/common/pkg/logger"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
)

type UpdateCacheEntry = scevt.UpdateCacheEntry

type UpdateCache struct {
scevt.UpdateCache
}

func NewUpdateCache(nodeID int, entries []scevt.UpdateCacheEntry) *UpdateCache {
return &UpdateCache{
UpdateCache: scevt.NewUpdateCache(nodeID, entries),
}
}

func (t *UpdateCache) TryMerge(other Event) bool {
event, ok := other.(*UpdateCache)
if !ok {
return false
}
if event.NodeID != t.NodeID {
return false
}

// TODO 可以考虑合并同FileHash和NodeID的记录
t.Entries = append(t.Entries, event.Entries...)
return true
}

func (t *UpdateCache) Execute(execCtx ExecuteContext) {
log := logger.WithType[UpdateCache]("Event")
log.Debugf("begin with %v", logger.FormatStruct(t))

for _, entry := range t.Entries {
switch entry.Operation {
case evtcst.UPDATE_CACHE_DELETE_TEMP:
err := execCtx.Args.DB.Cache().DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID)
if err != nil {
log.WithField("FileHash", entry.FileHash).
WithField("NodeID", t.NodeID).
Warnf("delete temp cache failed, err: %s", err.Error())
}

log.WithField("FileHash", entry.FileHash).
WithField("NodeID", t.NodeID).
Debugf("delete temp cache")

case evtcst.UPDATE_CACHE_CREATE_TEMP:
err := execCtx.Args.DB.Cache().CreateTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID)
if err != nil {
log.WithField("FileHash", entry.FileHash).
WithField("NodeID", t.NodeID).
Warnf("create temp cache failed, err: %s", err.Error())
}

log.WithField("FileHash", entry.FileHash).
WithField("NodeID", t.NodeID).
Debugf("create temp cache")
}
}
}

func init() {
RegisterMessageConvertor(func(msg scevt.UpdateCache) Event { return NewUpdateCache(msg.NodeID, msg.Entries) })
}

+ 0
- 87
internal/event/update_storage.go View File

@@ -1,87 +0,0 @@
package event

import (
tskcst "gitlink.org.cn/cloudream/common/consts/event"
"gitlink.org.cn/cloudream/common/pkg/logger"
mysql "gitlink.org.cn/cloudream/db/sql"
scevt "gitlink.org.cn/cloudream/rabbitmq/message/scanner/event"
)

type UpdateStorageEntry = scevt.UpdateStorageEntry
type UpdateStorage struct {
scevt.UpdateStorage
}

func NewUpdateStorage(storageID int, dirState string, entries []UpdateStorageEntry) *UpdateStorage {
return &UpdateStorage{
UpdateStorage: scevt.NewUpdateStorage(storageID, dirState, entries),
}
}

func (t *UpdateStorage) TryMerge(other Event) bool {
event, ok := other.(*UpdateStorage)
if !ok {
return false
}
if event.StorageID != t.StorageID {
return false
}

// 后投递的任务的状态更新一些
t.DirectoryState = event.DirectoryState
// TODO 可以考虑合并同FileHash和NodeID的记录
t.Entries = append(t.Entries, event.Entries...)
return true
}

func (t *UpdateStorage) Execute(execCtx ExecuteContext) {
log := logger.WithType[UpdateStorage]("Event")
log.Debugf("begin with %v", logger.FormatStruct(t))

err := mysql.Storage.ChangeState(execCtx.Args.DB.SQLCtx(), t.StorageID, t.DirectoryState)
if err != nil {
log.WithField("StorageID", t.StorageID).Warnf("change storage state failed, err: %s", err.Error())
}

var chkObjIDs []int
for _, entry := range t.Entries {
switch entry.Operation {
case tskcst.UPDATE_STORAGE_DELETE:
err := mysql.StorageObject.Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID)
if err != nil {
log.WithField("StorageID", t.StorageID).
WithField("ObjectID", entry.ObjectID).
Warnf("delete storage object failed, err: %s", err.Error())
}
chkObjIDs = append(chkObjIDs, entry.ObjectID)

log.WithField("StorageID", t.StorageID).
WithField("ObjectID", entry.ObjectID).
WithField("UserID", entry.UserID).
Debugf("delete storage object")

case tskcst.UPDATE_STORAGE_SET_NORMAL:
err := mysql.StorageObject.SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID)
if err != nil {
log.WithField("StorageID", t.StorageID).
WithField("ObjectID", entry.ObjectID).
Warnf("change storage object state failed, err: %s", err.Error())
}

log.WithField("StorageID", t.StorageID).
WithField("ObjectID", entry.ObjectID).
WithField("UserID", entry.UserID).
Debugf("set storage object normal")
}
}

if len(chkObjIDs) > 0 {
execCtx.Executor.Post(NewCheckObject(chkObjIDs))
}
}

func init() {
RegisterMessageConvertor(func(msg scevt.UpdateStorage) Event {
return NewUpdateStorage(msg.StorageID, msg.DirectoryState, msg.Entries)
})
}

+ 2
- 2
main.go View File

@@ -38,7 +38,7 @@ func main() {
eventExecutor := event.NewExecutor(db) eventExecutor := event.NewExecutor(db)
go serveEventExecutor(&eventExecutor, &wg) go serveEventExecutor(&eventExecutor, &wg)


agtSvr, err := scsvr.NewScannerServer(services.NewService(&eventExecutor), &config.Cfg().RabbitMQ)
agtSvr, err := scsvr.NewServer(services.NewService(&eventExecutor), &config.Cfg().RabbitMQ)
if err != nil { if err != nil {
log.Fatalf("new agent server failed, err: %s", err.Error()) log.Fatalf("new agent server failed, err: %s", err.Error())
} }
@@ -70,7 +70,7 @@ func serveEventExecutor(executor *event.Executor, wg *sync.WaitGroup) {
wg.Done() wg.Done()
} }


func serveScannerServer(server *scsvr.ScannerServer, wg *sync.WaitGroup) {
func serveScannerServer(server *scsvr.Server, wg *sync.WaitGroup) {
log.Info("start serving scanner server") log.Info("start serving scanner server")


err := server.Serve() err := server.Serve()


Loading…
Cancel
Save