diff --git a/internal/task/agent_check_cache.go b/internal/event/agent_check_cache.go similarity index 67% rename from internal/task/agent_check_cache.go rename to internal/event/agent_check_cache.go index 1460841..c895bcc 100644 --- a/internal/task/agent_check_cache.go +++ b/internal/event/agent_check_cache.go @@ -1,4 +1,4 @@ -package task +package event import ( "database/sql" @@ -11,38 +11,38 @@ import ( agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" - agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/task" + agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" ) -type AgentCheckCacheTask struct { +type AgentCheckCache struct { NodeID int FileHashes []string // 需要检查的FileHash列表,如果为nil(不是为空),则代表进行全量检查 } -func NewAgentCheckCacheTask(nodeID int, fileHashes []string) *AgentCheckCacheTask { - return &AgentCheckCacheTask{ +func NewAgentCheckCache(nodeID int, fileHashes []string) *AgentCheckCache { + return &AgentCheckCache{ NodeID: nodeID, FileHashes: fileHashes, } } -func (t *AgentCheckCacheTask) TryMerge(other Task) bool { - task, ok := other.(*AgentCheckCacheTask) +func (t *AgentCheckCache) TryMerge(other Event) bool { + event, ok := other.(*AgentCheckCache) if !ok { return false } // FileHashes为nil时代表全量检查 - if task.FileHashes == nil { + if event.FileHashes == nil { t.FileHashes = nil } else if t.FileHashes != nil { - t.FileHashes = lo.Union(t.FileHashes, task.FileHashes) + t.FileHashes = lo.Union(t.FileHashes, event.FileHashes) } return true } -func (t *AgentCheckCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { +func (t *AgentCheckCache) Execute(execCtx ExecuteContext) { var isComplete bool var caches []model.Cache @@ -50,7 +50,7 @@ func (t *AgentCheckCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteO if t.FileHashes == nil { var err error - caches, err = mysql.Cache.GetNodeCaches(execCtx.DB.SQLCtx(), t.NodeID) + caches, err = mysql.Cache.GetNodeCaches(execCtx.Args.DB.SQLCtx(), t.NodeID) if err != nil { logger.WithField("NodeID", t.NodeID).Warnf("get node caches failed, err: %s", err.Error()) return @@ -59,7 +59,7 @@ func (t *AgentCheckCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteO } else { for _, hash := range t.FileHashes { - ch, err := mysql.Cache.Get(execCtx.DB.SQLCtx(), hash, t.NodeID) + ch, err := mysql.Cache.Get(execCtx.Args.DB.SQLCtx(), hash, t.NodeID) // 记录不存在则跳过 if err == sql.ErrNoRows { continue @@ -83,10 +83,10 @@ func (t *AgentCheckCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteO } defer agentClient.Close() - err = agentClient.PostTask(agtmsg.NewPostTaskBody( - agttsk.NewCheckCacheTask(isComplete, caches), - execOpts.IsEmergency, // 继承本任务的执行选项 - execOpts.DontMerge)) + err = agentClient.PostEvent(agtmsg.NewPostEventBody( + agttsk.NewCheckCache(isComplete, caches), + execCtx.Option.IsEmergency, // 继承本任务的执行选项 + execCtx.Option.DontMerge)) if err != nil { logger.WithField("NodeID", t.NodeID).Warnf("request to agent failed, err: %s", err.Error()) return diff --git a/internal/task/agent_check_state.go b/internal/event/agent_check_state.go similarity index 64% rename from internal/task/agent_check_state.go rename to internal/event/agent_check_state.go index a9a7dd7..ec1e947 100644 --- a/internal/task/agent_check_state.go +++ b/internal/event/agent_check_state.go @@ -1,4 +1,4 @@ -package task +package event import ( "database/sql" @@ -11,33 +11,33 @@ import ( mysql "gitlink.org.cn/cloudream/db/sql" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" - agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/task" + agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" "gitlink.org.cn/cloudream/scanner/internal/config" ) -type AgentCheckStateTask struct { +type AgentCheckState struct { NodeIDs []int } -func NewAgentCheckStateTask(nodeIDs []int) AgentCheckStateTask { - return AgentCheckStateTask{ +func NewAgentCheckState(nodeIDs []int) AgentCheckState { + return AgentCheckState{ NodeIDs: nodeIDs, } } -func (t *AgentCheckStateTask) TryMerge(other Task) bool { - task, ok := other.(*AgentCheckStateTask) +func (t *AgentCheckState) TryMerge(other Event) bool { + event, ok := other.(*AgentCheckState) if !ok { return false } - t.NodeIDs = lo.Union(t.NodeIDs, task.NodeIDs) + t.NodeIDs = lo.Union(t.NodeIDs, event.NodeIDs) return true } -func (t *AgentCheckStateTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { +func (t *AgentCheckState) Execute(execCtx ExecuteContext) { for _, nodeID := range t.NodeIDs { - node, err := mysql.Node.GetByID(execCtx.DB.SQLCtx(), nodeID) + node, err := mysql.Node.GetByID(execCtx.Args.DB.SQLCtx(), nodeID) if err == sql.ErrNoRows { continue } @@ -53,20 +53,20 @@ func (t *AgentCheckStateTask) Execute(execCtx *ExecuteContext, execOpts ExecuteO // 检查上次上报时间,超时的设置为不可用 if time.Since(node.LastReportTime) > time.Duration(config.Cfg().NodeUnavailableSeconds)*time.Second { - err := mysql.Node.ChangeState(execCtx.DB.SQLCtx(), nodeID, consts.NODE_STATE_UNAVAILABLE) + err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), nodeID, consts.NODE_STATE_UNAVAILABLE) if err != nil { logger.WithField("NodeID", nodeID).Warnf("set node state failed, err: %s", err.Error()) continue } - caches, err := mysql.Cache.GetNodeCaches(execCtx.DB.SQLCtx(), nodeID) + caches, err := mysql.Cache.GetNodeCaches(execCtx.Args.DB.SQLCtx(), nodeID) if err != nil { logger.WithField("NodeID", nodeID).Warnf("get node caches failed, err: %s", err.Error()) continue } // 补充备份数 - execCtx.Executor.Post(NewCheckRepCountTask(lo.Map(caches, func(ch model.Cache, index int) string { return ch.HashValue }))) + execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.HashValue }))) continue } @@ -78,7 +78,7 @@ func (t *AgentCheckStateTask) Execute(execCtx *ExecuteContext, execOpts ExecuteO defer agentClient.Close() // 紧急任务 - err = agentClient.PostTask(agtmsg.NewPostTaskBody(agttsk.NewCheckStateTask(), true, true)) + err = agentClient.PostEvent(agtmsg.NewPostEventBody(agttsk.NewCheckState(), true, true)) if err != nil { logger.WithField("NodeID", nodeID).Warnf("request to agent failed, err: %s", err.Error()) } diff --git a/internal/task/agent_check_storage.go b/internal/event/agent_check_storage.go similarity index 69% rename from internal/task/agent_check_storage.go rename to internal/event/agent_check_storage.go index cdd6bb9..05a0705 100644 --- a/internal/task/agent_check_storage.go +++ b/internal/event/agent_check_storage.go @@ -1,4 +1,4 @@ -package task +package event import ( "database/sql" @@ -10,37 +10,37 @@ import ( mysql "gitlink.org.cn/cloudream/db/sql" agtcli "gitlink.org.cn/cloudream/rabbitmq/client/agent" agtmsg "gitlink.org.cn/cloudream/rabbitmq/message/agent" - agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/task" + agttsk "gitlink.org.cn/cloudream/rabbitmq/message/agent/event" "gitlink.org.cn/cloudream/scanner/internal/config" ) -type AgentCheckStorageTask struct { +type AgentCheckStorage struct { StorageID int ObjectIDs []int // 需要检查的Object文件列表,如果为nil(不是为空),则代表进行全量检查 } -func (t *AgentCheckStorageTask) TryMerge(other Task) bool { - task, ok := other.(*AgentCheckStorageTask) +func (t *AgentCheckStorage) TryMerge(other Event) bool { + event, ok := other.(*AgentCheckStorage) if !ok { return false } - if t.StorageID != task.StorageID { + if t.StorageID != event.StorageID { return false } // ObjectIDs为nil时代表全量检查 - if task.ObjectIDs == nil { + if event.ObjectIDs == nil { t.ObjectIDs = nil } else if t.ObjectIDs != nil { - t.ObjectIDs = lo.Union(t.ObjectIDs, task.ObjectIDs) + t.ObjectIDs = lo.Union(t.ObjectIDs, event.ObjectIDs) } return true } -func (t *AgentCheckStorageTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { - stg, err := mysql.Storage.GetByID(execCtx.DB.SQLCtx(), t.StorageID) +func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { + stg, err := mysql.Storage.GetByID(execCtx.Args.DB.SQLCtx(), t.StorageID) if err != nil { if err != sql.ErrNoRows { logger.WithField("StorageID", t.StorageID).Warnf("get storage failed, err: %s", err.Error()) @@ -48,7 +48,7 @@ func (t *AgentCheckStorageTask) Execute(execCtx *ExecuteContext, execOpts Execut return } - node, err := mysql.Node.GetByID(execCtx.DB.SQLCtx(), stg.NodeID) + node, err := mysql.Node.GetByID(execCtx.Args.DB.SQLCtx(), stg.NodeID) if err != nil { if err != sql.ErrNoRows { logger.WithField("StorageID", t.StorageID).Warnf("get storage node failed, err: %s", err.Error()) @@ -66,7 +66,7 @@ func (t *AgentCheckStorageTask) Execute(execCtx *ExecuteContext, execOpts Execut var objects []model.StorageObject if t.ObjectIDs == nil { var err error - objects, err = mysql.StorageObject.GetAllByStorageID(execCtx.DB.SQLCtx(), t.StorageID) + objects, err = mysql.StorageObject.GetAllByStorageID(execCtx.Args.DB.SQLCtx(), t.StorageID) if err != nil { logger.WithField("StorageID", t.StorageID).Warnf("get storage objects failed, err: %s", err.Error()) return @@ -74,7 +74,7 @@ func (t *AgentCheckStorageTask) Execute(execCtx *ExecuteContext, execOpts Execut isComplete = true } else { for _, objID := range t.ObjectIDs { - obj, err := mysql.StorageObject.Get(execCtx.DB.SQLCtx(), t.StorageID, objID) + obj, err := mysql.StorageObject.Get(execCtx.Args.DB.SQLCtx(), t.StorageID, objID) if err == sql.ErrNoRows { continue } @@ -98,10 +98,10 @@ func (t *AgentCheckStorageTask) Execute(execCtx *ExecuteContext, execOpts Execut } defer agentClient.Close() - err = agentClient.PostTask(agtmsg.NewPostTaskBody( - agttsk.NewCheckStorageTask(stg.Directory, isComplete, objects), - execOpts.IsEmergency, // 继承本任务的执行选项 - execOpts.DontMerge)) + err = agentClient.PostEvent(agtmsg.NewPostEventBody( + agttsk.NewCheckStorage(stg.Directory, isComplete, objects), + execCtx.Option.IsEmergency, // 继承本任务的执行选项 + execCtx.Option.DontMerge)) if err != nil { logger.WithField("NodeID", stg.NodeID).Warnf("request to agent failed, err: %s", stg.NodeID, err.Error()) } diff --git a/internal/event/check_object.go b/internal/event/check_object.go new file mode 100644 index 0000000..bb986c9 --- /dev/null +++ b/internal/event/check_object.go @@ -0,0 +1,36 @@ +package event + +import ( + "github.com/samber/lo" + "gitlink.org.cn/cloudream/common/utils/logger" + mysql "gitlink.org.cn/cloudream/db/sql" +) + +type CheckObject struct { + ObjectIDs []int +} + +func NewCheckObject(objIDs []int) CheckObject { + return CheckObject{ + ObjectIDs: objIDs, + } +} + +func (t *CheckObject) TryMerge(other Event) bool { + event, ok := other.(*CheckObject) + if !ok { + return false + } + + t.ObjectIDs = lo.Union(t.ObjectIDs, event.ObjectIDs) + return true +} + +func (t *CheckObject) Execute(execCtx ExecuteContext) { + for _, objID := range t.ObjectIDs { + err := mysql.Object.DeleteUnused(execCtx.Args.DB.SQLCtx(), objID) + if err != nil { + logger.WithField("ObjectID", objID).Warnf("delete unused object failed, err: %s", err.Error()) + } + } +} diff --git a/internal/task/check_rep_count.go b/internal/event/check_rep_count.go similarity index 90% rename from internal/task/check_rep_count.go rename to internal/event/check_rep_count.go index 14f35e4..69bcdf4 100644 --- a/internal/task/check_rep_count.go +++ b/internal/event/check_rep_count.go @@ -1,4 +1,4 @@ -package task +package event import ( "database/sql" @@ -17,27 +17,27 @@ import ( mysql "gitlink.org.cn/cloudream/db/sql" ) -type CheckRepCountTask struct { +type CheckRepCount struct { FileHashes []string } -func NewCheckRepCountTask(fileHashes []string) *CheckRepCountTask { - return &CheckRepCountTask{ +func NewCheckRepCount(fileHashes []string) *CheckRepCount { + return &CheckRepCount{ FileHashes: fileHashes, } } -func (t *CheckRepCountTask) TryMerge(other Task) bool { - task, ok := other.(*CheckRepCountTask) +func (t *CheckRepCount) TryMerge(other Event) bool { + event, ok := other.(*CheckRepCount) if !ok { return false } - t.FileHashes = lo.Union(t.FileHashes, task.FileHashes) + t.FileHashes = lo.Union(t.FileHashes, event.FileHashes) return true } -func (t *CheckRepCountTask) Execute(execCtx *ExecuteContext, myOpts ExecuteOption) { +func (t *CheckRepCount) Execute(execCtx ExecuteContext) { updatedNodeAndHashes := make(map[int][]string) for _, fileHash := range t.FileHashes { @@ -55,13 +55,13 @@ func (t *CheckRepCountTask) Execute(execCtx *ExecuteContext, myOpts ExecuteOptio for nodeID, hashes := range updatedNodeAndHashes { // 新任务继承本任务的执行设定(紧急任务依然保持紧急任务) - execCtx.Executor.Post(NewAgentCheckCacheTask(nodeID, hashes), myOpts) + execCtx.Executor.Post(NewAgentCheckCache(nodeID, hashes), execCtx.Option) } } -func (t *CheckRepCountTask) checkOneRepCount(fileHash string, execCtx *ExecuteContext) ([]int, error) { +func (t *CheckRepCount) checkOneRepCount(fileHash string, execCtx ExecuteContext) ([]int, error) { var updatedNodeIDs []int - err := execCtx.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { + err := execCtx.Args.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { repMaxCnt, err := mysql.ObjectRep.GetFileMaxRepCount(tx, fileHash) if err != nil { return fmt.Errorf("get file max rep count failed, err: %w", err) diff --git a/internal/task/check_rep_count_test.go b/internal/event/check_rep_count_test.go similarity index 99% rename from internal/task/check_rep_count_test.go rename to internal/event/check_rep_count_test.go index 830b75d..3c0beab 100644 --- a/internal/task/check_rep_count_test.go +++ b/internal/event/check_rep_count_test.go @@ -1,4 +1,4 @@ -package task +package event import ( "testing" diff --git a/internal/task/check_unavailable_cache.go b/internal/event/check_unavailable_cache.go similarity index 62% rename from internal/task/check_unavailable_cache.go rename to internal/event/check_unavailable_cache.go index 13bc18f..189b4f4 100644 --- a/internal/task/check_unavailable_cache.go +++ b/internal/event/check_unavailable_cache.go @@ -1,4 +1,4 @@ -package task +package event import ( "database/sql" @@ -12,30 +12,30 @@ import ( mysql "gitlink.org.cn/cloudream/db/sql" ) -type CheckUnavailableCacheTask struct { +type CheckUnavailableCache struct { NodeID int } -func NewCheckUnavailableCacheTask(nodeID int) CheckUnavailableCacheTask { - return CheckUnavailableCacheTask{ +func NewCheckUnavailableCache(nodeID int) CheckUnavailableCache { + return CheckUnavailableCache{ NodeID: nodeID, } } -func (t *CheckUnavailableCacheTask) TryMerge(other Task) bool { - task, ok := other.(*CheckUnavailableCacheTask) +func (t *CheckUnavailableCache) TryMerge(other Event) bool { + event, ok := other.(*CheckUnavailableCache) if !ok { return false } - if task.NodeID != t.NodeID { + if event.NodeID != t.NodeID { return false } return true } -func (t *CheckUnavailableCacheTask) Execute(execCtx *ExecuteContext, myOpts ExecuteOption) { - err := execCtx.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { +func (t *CheckUnavailableCache) Execute(execCtx ExecuteContext) { + err := execCtx.Args.DB.DoTx(sql.LevelSerializable, func(tx *sqlx.Tx) error { node, err := mysql.Node.GetByID(tx, t.NodeID) if err == sql.ErrNoRows { return nil @@ -58,7 +58,7 @@ func (t *CheckUnavailableCacheTask) Execute(execCtx *ExecuteContext, myOpts Exec return fmt.Errorf("delete node all caches failed, err: %w", err) } - execCtx.Executor.Post(NewCheckRepCountTask(lo.Map(caches, func(ch model.Cache, index int) string { return ch.HashValue }))) + execCtx.Executor.Post(NewCheckRepCount(lo.Map(caches, func(ch model.Cache, index int) string { return ch.HashValue }))) return nil }) diff --git a/internal/event/task.go b/internal/event/task.go new file mode 100644 index 0000000..a1bd1f5 --- /dev/null +++ b/internal/event/task.go @@ -0,0 +1,16 @@ +package event + +import ( + event "gitlink.org.cn/cloudream/common/pkg/event" + mydb "gitlink.org.cn/cloudream/db" +) + +type ExecuteArgs struct { + DB *mydb.DB +} + +type Executor = event.Executor[ExecuteArgs] + +type ExecuteContext = event.ExecuteContext[ExecuteArgs] + +type Event = event.Event[ExecuteArgs] diff --git a/internal/task/update_agent_state.go b/internal/event/update_agent_state.go similarity index 64% rename from internal/task/update_agent_state.go rename to internal/event/update_agent_state.go index 4dcb17c..24d6f46 100644 --- a/internal/task/update_agent_state.go +++ b/internal/event/update_agent_state.go @@ -1,4 +1,4 @@ -package task +package event import ( "gitlink.org.cn/cloudream/common/consts" @@ -6,20 +6,20 @@ import ( mysql "gitlink.org.cn/cloudream/db/sql" ) -type UpdateAgentStateTask struct { +type UpdateAgentState struct { NodeID int IPFSStatus string } -func (t *UpdateAgentStateTask) TryMerge(other Task) bool { +func (t *UpdateAgentState) TryMerge(other Event) bool { return false } -func (t *UpdateAgentStateTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { +func (t *UpdateAgentState) Execute(execCtx ExecuteContext) { if t.IPFSStatus != consts.IPFS_STATUS_OK { logger.WithField("NodeID", t.NodeID).Warnf("IPFS status is %s, set node state unavailable", t.IPFSStatus) - err := mysql.Node.ChangeState(execCtx.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) + err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_UNAVAILABLE) if err != nil { logger.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error()) } @@ -27,7 +27,7 @@ func (t *UpdateAgentStateTask) Execute(execCtx *ExecuteContext, execOpts Execute } // TODO 如果以后还有其他的状态,要判断哪些状态下能设置Normal - err := mysql.Node.ChangeState(execCtx.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_NORMAL) + err := mysql.Node.ChangeState(execCtx.Args.DB.SQLCtx(), t.NodeID, consts.NODE_STATE_NORMAL) if err != nil { logger.WithField("NodeID", t.NodeID).Warnf("change node state failed, err: %s", err.Error()) } diff --git a/internal/event/update_cache.go b/internal/event/update_cache.go new file mode 100644 index 0000000..b3ac62f --- /dev/null +++ b/internal/event/update_cache.go @@ -0,0 +1,60 @@ +package event + +import ( + tskcst "gitlink.org.cn/cloudream/common/consts/event" + "gitlink.org.cn/cloudream/common/utils/logger" + mysql "gitlink.org.cn/cloudream/db/sql" +) + +type UpdateCacheEntry struct { + FileHash string + Operation string +} + +func NewUpdateCacheEntry(fileHash string, op string) UpdateCacheEntry { + return UpdateCacheEntry{ + FileHash: fileHash, + Operation: op, + } +} + +type UpdateCache struct { + NodeID int + Entries []UpdateCacheEntry +} + +func NewUpdateCache(nodeID int, entries []UpdateCacheEntry) UpdateCache { + return UpdateCache{ + NodeID: nodeID, + Entries: entries, + } +} + +func (t *UpdateCache) TryMerge(other Event) bool { + event, ok := other.(*UpdateCache) + if !ok { + return false + } + if event.NodeID != t.NodeID { + return false + } + + // TODO 可以考虑合并同FileHash和NodeID的记录 + t.Entries = append(t.Entries, event.Entries...) + return true +} + +func (t *UpdateCache) Execute(execCtx ExecuteContext) { + for _, entry := range t.Entries { + switch entry.Operation { + case tskcst.UPDATE_CACHE_OP_UNTEMP: + err := mysql.Cache.DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) + + if err != nil { + logger.WithField("FileHash", entry.FileHash). + WithField("NodeID", t.NodeID). + Warnf("delete temp cache failed, err: %s", err.Error()) + } + } + } +} diff --git a/internal/task/update_storage.go b/internal/event/update_storage.go similarity index 52% rename from internal/task/update_storage.go rename to internal/event/update_storage.go index 9f5ff5a..848285e 100644 --- a/internal/task/update_storage.go +++ b/internal/event/update_storage.go @@ -1,55 +1,55 @@ -package task +package event import ( - tskcst "gitlink.org.cn/cloudream/common/consts/task" + tskcst "gitlink.org.cn/cloudream/common/consts/event" "gitlink.org.cn/cloudream/common/utils/logger" mysql "gitlink.org.cn/cloudream/db/sql" ) -type UpdateStorageTaskEntry struct { +type UpdateStorageEntry struct { ObjectID int Operation string } -func NewUpdateStorageTaskEntry(objectID int, op string) UpdateStorageTaskEntry { - return UpdateStorageTaskEntry{ +func NewUpdateStorageEntry(objectID int, op string) UpdateStorageEntry { + return UpdateStorageEntry{ ObjectID: objectID, Operation: op, } } -type UpdateStorageTask struct { +type UpdateStorage struct { StorageID int DirectoryStatus string - Entries []UpdateStorageTaskEntry + Entries []UpdateStorageEntry } -func NewUpdateStorageTask(dirStatus string, entries []UpdateStorageTaskEntry) UpdateStorageTask { - return UpdateStorageTask{ +func NewUpdateStorage(dirStatus string, entries []UpdateStorageEntry) UpdateStorage { + return UpdateStorage{ DirectoryStatus: dirStatus, Entries: entries, } } -func (t *UpdateStorageTask) TryMerge(other Task) bool { - task, ok := other.(*UpdateStorageTask) +func (t *UpdateStorage) TryMerge(other Event) bool { + event, ok := other.(*UpdateStorage) if !ok { return false } - if task.StorageID != t.StorageID { + if event.StorageID != t.StorageID { return false } // 后投递的任务的状态更新一些 - t.DirectoryStatus = task.DirectoryStatus + t.DirectoryStatus = event.DirectoryStatus // TODO 可以考虑合并同FileHash和NodeID的记录 - t.Entries = append(t.Entries, task.Entries...) + t.Entries = append(t.Entries, event.Entries...) return true } -func (t *UpdateStorageTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { +func (t *UpdateStorage) Execute(execCtx ExecuteContext) { - err := mysql.Storage.ChangeState(execCtx.DB.SQLCtx(), t.StorageID, t.DirectoryStatus) + err := mysql.Storage.ChangeState(execCtx.Args.DB.SQLCtx(), t.StorageID, t.DirectoryStatus) if err != nil { logger.WithField("StorageID", t.StorageID).Warnf("change storage state failed, err: %s", err.Error()) } @@ -57,7 +57,7 @@ func (t *UpdateStorageTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOpt for _, entry := range t.Entries { switch entry.Operation { case tskcst.UPDATE_STORAGE_DELETE: - err := mysql.StorageObject.Delete(execCtx.DB.SQLCtx(), t.StorageID, entry.ObjectID) + err := mysql.StorageObject.Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID) if err != nil { logger.WithField("StorageID", t.StorageID). WithField("ObjectID", entry.ObjectID). @@ -65,7 +65,7 @@ func (t *UpdateStorageTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOpt } case tskcst.UPDATE_STORAGE_SET_NORMAL: - err := mysql.StorageObject.SetStateNormal(execCtx.DB.SQLCtx(), t.StorageID, entry.ObjectID) + err := mysql.StorageObject.SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID) if err != nil { logger.WithField("StorageID", t.StorageID). WithField("ObjectID", entry.ObjectID). diff --git a/internal/task/check_object.go b/internal/task/check_object.go deleted file mode 100644 index 80463bf..0000000 --- a/internal/task/check_object.go +++ /dev/null @@ -1,36 +0,0 @@ -package task - -import ( - "github.com/samber/lo" - "gitlink.org.cn/cloudream/common/utils/logger" - mysql "gitlink.org.cn/cloudream/db/sql" -) - -type CheckObjectTask struct { - ObjectIDs []int -} - -func NewCheckObjectTask(objIDs []int) CheckObjectTask { - return CheckObjectTask{ - ObjectIDs: objIDs, - } -} - -func (t *CheckObjectTask) TryMerge(other Task) bool { - task, ok := other.(*CheckObjectTask) - if !ok { - return false - } - - t.ObjectIDs = lo.Union(t.ObjectIDs, task.ObjectIDs) - return true -} - -func (t *CheckObjectTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { - for _, objID := range t.ObjectIDs { - err := mysql.Object.DeleteUnused(execCtx.DB.SQLCtx(), objID) - if err != nil { - logger.WithField("ObjectID", objID).Warnf("delete unused object failed, err: %s", err.Error()) - } - } -} diff --git a/internal/task/executor.go b/internal/task/executor.go deleted file mode 100644 index 35f867f..0000000 --- a/internal/task/executor.go +++ /dev/null @@ -1,101 +0,0 @@ -package task - -import ( - "context" - "sync" - - "github.com/zyedidia/generic/list" - mydb "gitlink.org.cn/cloudream/db" - "golang.org/x/sync/semaphore" -) - -type ExecuteOption struct { - IsEmergency bool - DontMerge bool -} -type ExecuteContext struct { - Executor *Executor - DB *mydb.DB -} -type postedTask struct { - Task Task - Option ExecuteOption -} - -type Executor struct { - tasks list.List[postedTask] - locker sync.Mutex - taskSema semaphore.Weighted - execCtx *ExecuteContext -} - -func (e *Executor) Post(task Task, opts ...ExecuteOption) { - opt := ExecuteOption{ - IsEmergency: false, - DontMerge: false, - } - - if len(opts) > 0 { - opt = opts[0] - } - - e.locker.Lock() - defer e.locker.Unlock() - - // 紧急任务直接插入到队头,不进行合并 - if opt.IsEmergency { - e.tasks.PushFront(postedTask{ - Task: task, - Option: opt, - }) - e.taskSema.Release(1) - return - } - - // 合并任务 - if opt.DontMerge { - ptr := e.tasks.Front - for ptr != nil { - // 只与非紧急任务,且允许合并的任务进行合并 - if !ptr.Value.Option.IsEmergency && !ptr.Value.Option.DontMerge { - if ptr.Value.Task.TryMerge(task) { - return - } - } - - ptr = ptr.Next - } - } - - e.tasks.PushBack(postedTask{ - Task: task, - Option: opt, - }) - e.taskSema.Release(1) -} - -// Execute 开始执行任务 -func (e *Executor) Execute() error { - for { - // TODO 打印错误日志 - e.taskSema.Acquire(context.Background(), 1) - - task := e.popFrontTask() - if task == nil { - continue - } - - task.Task.Execute(e.execCtx, task.Option) - } -} - -func (e *Executor) popFrontTask() *postedTask { - e.locker.Lock() - defer e.locker.Unlock() - - if e.tasks.Front == nil { - return nil - } - - return &e.tasks.Front.Value -} diff --git a/internal/task/task.go b/internal/task/task.go deleted file mode 100644 index 40547df..0000000 --- a/internal/task/task.go +++ /dev/null @@ -1,6 +0,0 @@ -package task - -type Task interface { - TryMerge(other Task) bool // 尝试将other任务与自身合并,如果成功返回true - Execute(ctx *ExecuteContext, myOpts ExecuteOption) -} diff --git a/internal/task/update_cache.go b/internal/task/update_cache.go deleted file mode 100644 index 243f1cf..0000000 --- a/internal/task/update_cache.go +++ /dev/null @@ -1,60 +0,0 @@ -package task - -import ( - tskcst "gitlink.org.cn/cloudream/common/consts/task" - "gitlink.org.cn/cloudream/common/utils/logger" - mysql "gitlink.org.cn/cloudream/db/sql" -) - -type UpdateCacheTaskEntry struct { - FileHash string - Operation string -} - -func NewUpdateCacheTaskEntry(fileHash string, op string) UpdateCacheTaskEntry { - return UpdateCacheTaskEntry{ - FileHash: fileHash, - Operation: op, - } -} - -type UpdateCacheTask struct { - NodeID int - Entries []UpdateCacheTaskEntry -} - -func NewUpdateCacheTask(nodeID int, entries []UpdateCacheTaskEntry) UpdateCacheTask { - return UpdateCacheTask{ - NodeID: nodeID, - Entries: entries, - } -} - -func (t *UpdateCacheTask) TryMerge(other Task) bool { - task, ok := other.(*UpdateCacheTask) - if !ok { - return false - } - if task.NodeID != t.NodeID { - return false - } - - // TODO 可以考虑合并同FileHash和NodeID的记录 - t.Entries = append(t.Entries, task.Entries...) - return true -} - -func (t *UpdateCacheTask) Execute(execCtx *ExecuteContext, execOpts ExecuteOption) { - for _, entry := range t.Entries { - switch entry.Operation { - case tskcst.UPDATE_CACHE_OP_UNTEMP: - err := mysql.Cache.DeleteTemp(execCtx.DB.SQLCtx(), entry.FileHash, t.NodeID) - - if err != nil { - logger.WithField("FileHash", entry.FileHash). - WithField("NodeID", t.NodeID). - Warnf("delete temp cache failed, err: %s", err.Error()) - } - } - } -}