| @@ -5,7 +5,7 @@ import ( | |||||
| log "gitlink.org.cn/cloudream/common/pkgs/logger" | log "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| c "gitlink.org.cn/cloudream/common/utils/config" | c "gitlink.org.cn/cloudream/common/utils/config" | ||||
| db "gitlink.org.cn/cloudream/storage-common/pkgs/db/config" | db "gitlink.org.cn/cloudream/storage-common/pkgs/db/config" | ||||
| racfg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/config" | |||||
| stgmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq" | |||||
| ) | ) | ||||
| type Config struct { | type Config struct { | ||||
| @@ -14,7 +14,7 @@ type Config struct { | |||||
| Logger log.Config `json:"logger"` | Logger log.Config `json:"logger"` | ||||
| DB db.Config `json:"db"` | DB db.Config `json:"db"` | ||||
| RabbitMQ racfg.Config `json:"rabbitMQ"` | |||||
| RabbitMQ stgmq.Config `json:"rabbitMQ"` | |||||
| DistLock distlock.Config `json:"distlock"` | DistLock distlock.Config `json:"distlock"` | ||||
| } | } | ||||
| @@ -11,9 +11,8 @@ import ( | |||||
| "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | ||||
| "gitlink.org.cn/cloudream/storage-scanner/internal/config" | "gitlink.org.cn/cloudream/storage-scanner/internal/config" | ||||
| agtcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/agent" | |||||
| agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent" | |||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event" | |||||
| agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" | |||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event" | |||||
| ) | ) | ||||
| type AgentCheckCache struct { | type AgentCheckCache struct { | ||||
| @@ -127,14 +126,14 @@ func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, ca | |||||
| log := logger.WithType[AgentCheckCache]("Event") | log := logger.WithType[AgentCheckCache]("Event") | ||||
| // 然后向代理端发送移动文件的请求 | // 然后向代理端发送移动文件的请求 | ||||
| agentClient, err := agtcli.NewClient(t.NodeID, &config.Cfg().RabbitMQ) | |||||
| agentClient, err := agtmq.NewClient(t.NodeID, &config.Cfg().RabbitMQ) | |||||
| if err != nil { | if err != nil { | ||||
| log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) | log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) | ||||
| return | return | ||||
| } | } | ||||
| defer agentClient.Close() | defer agentClient.Close() | ||||
| checkResp, err := agentClient.CheckIPFS(agtmsg.NewCheckIPFS(isComplete, caches), mq.RequestOption{Timeout: time.Minute}) | |||||
| checkResp, err := agentClient.CheckIPFS(agtmq.NewCheckIPFS(isComplete, caches), mq.RequestOption{Timeout: time.Minute}) | |||||
| if err != nil { | if err != nil { | ||||
| log.WithField("NodeID", t.NodeID).Warnf("checking ipfs: %s", err.Error()) | log.WithField("NodeID", t.NodeID).Warnf("checking ipfs: %s", err.Error()) | ||||
| return | return | ||||
| @@ -143,7 +142,7 @@ func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, ca | |||||
| // 根据返回结果修改数据库 | // 根据返回结果修改数据库 | ||||
| for _, entry := range checkResp.Entries { | for _, entry := range checkResp.Entries { | ||||
| switch entry.Operation { | switch entry.Operation { | ||||
| case agtmsg.CHECK_IPFS_RESP_OP_DELETE_TEMP: | |||||
| case agtmq.CHECK_IPFS_RESP_OP_DELETE_TEMP: | |||||
| err := execCtx.Args.DB.Cache().DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) | err := execCtx.Args.DB.Cache().DeleteTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) | ||||
| if err != nil { | if err != nil { | ||||
| log.WithField("FileHash", entry.FileHash). | log.WithField("FileHash", entry.FileHash). | ||||
| @@ -155,7 +154,7 @@ func (t *AgentCheckCache) startCheck(execCtx ExecuteContext, isComplete bool, ca | |||||
| WithField("NodeID", t.NodeID). | WithField("NodeID", t.NodeID). | ||||
| Debugf("delete temp cache") | Debugf("delete temp cache") | ||||
| case agtmsg.CHECK_IPFS_RESP_OP_CREATE_TEMP: | |||||
| case agtmq.CHECK_IPFS_RESP_OP_CREATE_TEMP: | |||||
| err := execCtx.Args.DB.Cache().CreateTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) | err := execCtx.Args.DB.Cache().CreateTemp(execCtx.Args.DB.SQLCtx(), entry.FileHash, t.NodeID) | ||||
| if err != nil { | if err != nil { | ||||
| log.WithField("FileHash", entry.FileHash). | log.WithField("FileHash", entry.FileHash). | ||||
| @@ -10,9 +10,8 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | "gitlink.org.cn/cloudream/common/pkgs/mq" | ||||
| "gitlink.org.cn/cloudream/storage-common/consts" | "gitlink.org.cn/cloudream/storage-common/consts" | ||||
| "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | ||||
| agtcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/agent" | |||||
| agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent" | |||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event" | |||||
| agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" | |||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event" | |||||
| "gitlink.org.cn/cloudream/storage-scanner/internal/config" | "gitlink.org.cn/cloudream/storage-scanner/internal/config" | ||||
| ) | ) | ||||
| @@ -61,14 +60,14 @@ func (t *AgentCheckState) Execute(execCtx ExecuteContext) { | |||||
| return | return | ||||
| } | } | ||||
| agentClient, err := agtcli.NewClient(t.NodeID, &config.Cfg().RabbitMQ) | |||||
| agentClient, err := agtmq.NewClient(t.NodeID, &config.Cfg().RabbitMQ) | |||||
| if err != nil { | if err != nil { | ||||
| log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) | log.WithField("NodeID", t.NodeID).Warnf("create agent client failed, err: %s", err.Error()) | ||||
| return | return | ||||
| } | } | ||||
| defer agentClient.Close() | defer agentClient.Close() | ||||
| getResp, err := agentClient.GetState(agtmsg.NewGetState(), mq.RequestOption{Timeout: time.Second * 30}) | |||||
| getResp, err := agentClient.GetState(agtmq.NewGetState(), mq.RequestOption{Timeout: time.Second * 30}) | |||||
| if err != nil { | if err != nil { | ||||
| log.WithField("NodeID", t.NodeID).Warnf("getting state: %s", err.Error()) | log.WithField("NodeID", t.NodeID).Warnf("getting state: %s", err.Error()) | ||||
| @@ -10,9 +10,8 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | "gitlink.org.cn/cloudream/common/pkgs/mq" | ||||
| "gitlink.org.cn/cloudream/storage-common/consts" | "gitlink.org.cn/cloudream/storage-common/consts" | ||||
| "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | ||||
| agtcli "gitlink.org.cn/cloudream/storage-common/pkgs/mq/client/agent" | |||||
| agtmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/agent" | |||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event" | |||||
| agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent" | |||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event" | |||||
| "gitlink.org.cn/cloudream/storage-scanner/internal/config" | "gitlink.org.cn/cloudream/storage-scanner/internal/config" | ||||
| ) | ) | ||||
| @@ -20,9 +19,9 @@ type AgentCheckStorage struct { | |||||
| scevt.AgentCheckStorage | scevt.AgentCheckStorage | ||||
| } | } | ||||
| func NewAgentCheckStorage(storageID int64, objectIDs []int64) *AgentCheckStorage { | |||||
| func NewAgentCheckStorage(storageID int64, packageIDs []int64) *AgentCheckStorage { | |||||
| return &AgentCheckStorage{ | return &AgentCheckStorage{ | ||||
| AgentCheckStorage: scevt.NewAgentCheckStorage(storageID, objectIDs), | |||||
| AgentCheckStorage: scevt.NewAgentCheckStorage(storageID, packageIDs), | |||||
| } | } | ||||
| } | } | ||||
| @@ -36,11 +35,11 @@ func (t *AgentCheckStorage) TryMerge(other Event) bool { | |||||
| return false | return false | ||||
| } | } | ||||
| // ObjectIDs为nil时代表全量检查 | |||||
| if event.ObjectIDs == nil { | |||||
| t.ObjectIDs = nil | |||||
| } else if t.ObjectIDs != nil { | |||||
| t.ObjectIDs = lo.Union(t.ObjectIDs, event.ObjectIDs) | |||||
| // PackageIDs为nil时代表全量检查 | |||||
| if event.PackageIDs == nil { | |||||
| t.PackageIDs = nil | |||||
| } else if t.PackageIDs != nil { | |||||
| t.PackageIDs = lo.Union(t.PackageIDs, event.PackageIDs) | |||||
| } | } | ||||
| return true | return true | ||||
| @@ -74,7 +73,7 @@ func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) { | |||||
| return | return | ||||
| } | } | ||||
| if t.ObjectIDs == nil { | |||||
| if t.PackageIDs == nil { | |||||
| t.checkComplete(execCtx, stg) | t.checkComplete(execCtx, stg) | ||||
| } else { | } else { | ||||
| t.checkIncrement(execCtx, stg) | t.checkIncrement(execCtx, stg) | ||||
| @@ -87,10 +86,10 @@ func (t *AgentCheckStorage) checkComplete(execCtx ExecuteContext, stg model.Stor | |||||
| mutex, err := reqbuilder.NewBuilder(). | mutex, err := reqbuilder.NewBuilder(). | ||||
| Metadata(). | Metadata(). | ||||
| // 全量模式下查询、修改Move记录 | // 全量模式下查询、修改Move记录 | ||||
| StorageObject().WriteAny(). | |||||
| StoragePackage().WriteAny(). | |||||
| Storage(). | Storage(). | ||||
| // 全量模式下删除对象文件 | // 全量模式下删除对象文件 | ||||
| WriteAnyObject(t.StorageID). | |||||
| WriteAnyPackage(t.StorageID). | |||||
| MutexLock(execCtx.Args.DistLock) | MutexLock(execCtx.Args.DistLock) | ||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("acquire locks failed, err: %s", err.Error()) | log.Warnf("acquire locks failed, err: %s", err.Error()) | ||||
| @@ -98,13 +97,13 @@ func (t *AgentCheckStorage) checkComplete(execCtx ExecuteContext, stg model.Stor | |||||
| } | } | ||||
| defer mutex.Unlock() | defer mutex.Unlock() | ||||
| objects, err := execCtx.Args.DB.StorageObject().GetAllByStorageID(execCtx.Args.DB.SQLCtx(), t.StorageID) | |||||
| packages, err := execCtx.Args.DB.StoragePackage().GetAllByStorageID(execCtx.Args.DB.SQLCtx(), t.StorageID) | |||||
| if err != nil { | if err != nil { | ||||
| log.WithField("StorageID", t.StorageID).Warnf("get storage objects failed, err: %s", err.Error()) | |||||
| log.WithField("StorageID", t.StorageID).Warnf("get storage packages failed, err: %s", err.Error()) | |||||
| return | return | ||||
| } | } | ||||
| t.startCheck(execCtx, stg, true, objects) | |||||
| t.startCheck(execCtx, stg, true, packages) | |||||
| } | } | ||||
| func (t *AgentCheckStorage) checkIncrement(execCtx ExecuteContext, stg model.Storage) { | func (t *AgentCheckStorage) checkIncrement(execCtx ExecuteContext, stg model.Storage) { | ||||
| @@ -113,10 +112,10 @@ func (t *AgentCheckStorage) checkIncrement(execCtx ExecuteContext, stg model.Sto | |||||
| mutex, err := reqbuilder.NewBuilder(). | mutex, err := reqbuilder.NewBuilder(). | ||||
| Metadata(). | Metadata(). | ||||
| // 全量模式下查询、修改Move记录。因为可能有多个User Move相同的文件,所以只能用集合Write锁 | // 全量模式下查询、修改Move记录。因为可能有多个User Move相同的文件,所以只能用集合Write锁 | ||||
| StorageObject().WriteAny(). | |||||
| StoragePackage().WriteAny(). | |||||
| Storage(). | Storage(). | ||||
| // 全量模式下删除对象文件。因为可能有多个User Move相同的文件,所以只能用集合Write锁 | // 全量模式下删除对象文件。因为可能有多个User Move相同的文件,所以只能用集合Write锁 | ||||
| WriteAnyObject(t.StorageID). | |||||
| WriteAnyPackage(t.StorageID). | |||||
| MutexLock(execCtx.Args.DistLock) | MutexLock(execCtx.Args.DistLock) | ||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("acquire locks failed, err: %s", err.Error()) | log.Warnf("acquire locks failed, err: %s", err.Error()) | ||||
| @@ -124,34 +123,34 @@ func (t *AgentCheckStorage) checkIncrement(execCtx ExecuteContext, stg model.Sto | |||||
| } | } | ||||
| defer mutex.Unlock() | defer mutex.Unlock() | ||||
| var objects []model.StorageObject | |||||
| for _, objID := range t.ObjectIDs { | |||||
| objs, err := execCtx.Args.DB.StorageObject().GetAllByStorageAndObjectID(execCtx.Args.DB.SQLCtx(), t.StorageID, objID) | |||||
| var packages []model.StoragePackage | |||||
| for _, objID := range t.PackageIDs { | |||||
| objs, err := execCtx.Args.DB.StoragePackage().GetAllByStorageAndPackageID(execCtx.Args.DB.SQLCtx(), t.StorageID, objID) | |||||
| if err != nil { | if err != nil { | ||||
| log.WithField("StorageID", t.StorageID). | log.WithField("StorageID", t.StorageID). | ||||
| WithField("ObjectID", objID). | |||||
| Warnf("get storage object failed, err: %s", err.Error()) | |||||
| WithField("PackageID", objID). | |||||
| Warnf("get storage package failed, err: %s", err.Error()) | |||||
| return | return | ||||
| } | } | ||||
| objects = append(objects, objs...) | |||||
| packages = append(packages, objs...) | |||||
| } | } | ||||
| t.startCheck(execCtx, stg, false, objects) | |||||
| t.startCheck(execCtx, stg, false, packages) | |||||
| } | } | ||||
| func (t *AgentCheckStorage) startCheck(execCtx ExecuteContext, stg model.Storage, isComplete bool, objects []model.StorageObject) { | |||||
| func (t *AgentCheckStorage) startCheck(execCtx ExecuteContext, stg model.Storage, isComplete bool, packages []model.StoragePackage) { | |||||
| log := logger.WithType[AgentCheckStorage]("Event") | log := logger.WithType[AgentCheckStorage]("Event") | ||||
| // 投递任务 | // 投递任务 | ||||
| agentClient, err := agtcli.NewClient(stg.NodeID, &config.Cfg().RabbitMQ) | |||||
| agentClient, err := agtmq.NewClient(stg.NodeID, &config.Cfg().RabbitMQ) | |||||
| if err != nil { | if err != nil { | ||||
| log.WithField("NodeID", stg.NodeID).Warnf("create agent client failed, err: %s", err.Error()) | log.WithField("NodeID", stg.NodeID).Warnf("create agent client failed, err: %s", err.Error()) | ||||
| return | return | ||||
| } | } | ||||
| defer agentClient.Close() | defer agentClient.Close() | ||||
| checkResp, err := agentClient.StorageCheck(agtmsg.NewStorageCheck(stg.StorageID, stg.Directory, isComplete, objects), mq.RequestOption{Timeout: time.Minute}) | |||||
| checkResp, err := agentClient.StorageCheck(agtmq.NewStorageCheck(stg.StorageID, stg.Directory, isComplete, packages), mq.RequestOption{Timeout: time.Minute}) | |||||
| if err != nil { | if err != nil { | ||||
| log.WithField("NodeID", stg.NodeID).Warnf("checking storage: %s", err.Error()) | log.WithField("NodeID", stg.NodeID).Warnf("checking storage: %s", err.Error()) | ||||
| return | return | ||||
| @@ -161,40 +160,40 @@ func (t *AgentCheckStorage) startCheck(execCtx ExecuteContext, stg model.Storage | |||||
| var chkObjIDs []int64 | var chkObjIDs []int64 | ||||
| for _, entry := range checkResp.Entries { | for _, entry := range checkResp.Entries { | ||||
| switch entry.Operation { | switch entry.Operation { | ||||
| case agtmsg.CHECK_STORAGE_RESP_OP_DELETE: | |||||
| err := execCtx.Args.DB.StorageObject().Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID) | |||||
| case agtmq.CHECK_STORAGE_RESP_OP_DELETE: | |||||
| err := execCtx.Args.DB.StoragePackage().Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.PackageID, entry.UserID) | |||||
| if err != nil { | if err != nil { | ||||
| log.WithField("StorageID", t.StorageID). | log.WithField("StorageID", t.StorageID). | ||||
| WithField("ObjectID", entry.ObjectID). | |||||
| Warnf("delete storage object failed, err: %s", err.Error()) | |||||
| WithField("PackageID", entry.PackageID). | |||||
| Warnf("delete storage package failed, err: %s", err.Error()) | |||||
| } | } | ||||
| chkObjIDs = append(chkObjIDs, entry.ObjectID) | |||||
| chkObjIDs = append(chkObjIDs, entry.PackageID) | |||||
| log.WithField("StorageID", t.StorageID). | log.WithField("StorageID", t.StorageID). | ||||
| WithField("ObjectID", entry.ObjectID). | |||||
| WithField("PackageID", entry.PackageID). | |||||
| WithField("UserID", entry.UserID). | WithField("UserID", entry.UserID). | ||||
| Debugf("delete storage object") | |||||
| Debugf("delete storage package") | |||||
| case agtmsg.CHECK_STORAGE_RESP_OP_SET_NORMAL: | |||||
| err := execCtx.Args.DB.StorageObject().SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.ObjectID, entry.UserID) | |||||
| case agtmq.CHECK_STORAGE_RESP_OP_SET_NORMAL: | |||||
| err := execCtx.Args.DB.StoragePackage().SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.PackageID, entry.UserID) | |||||
| if err != nil { | if err != nil { | ||||
| log.WithField("StorageID", t.StorageID). | log.WithField("StorageID", t.StorageID). | ||||
| WithField("ObjectID", entry.ObjectID). | |||||
| Warnf("change storage object state failed, err: %s", err.Error()) | |||||
| WithField("PackageID", entry.PackageID). | |||||
| Warnf("change storage package state failed, err: %s", err.Error()) | |||||
| } | } | ||||
| log.WithField("StorageID", t.StorageID). | log.WithField("StorageID", t.StorageID). | ||||
| WithField("ObjectID", entry.ObjectID). | |||||
| WithField("PackageID", entry.PackageID). | |||||
| WithField("UserID", entry.UserID). | WithField("UserID", entry.UserID). | ||||
| Debugf("set storage object normal") | |||||
| Debugf("set storage package normal") | |||||
| } | } | ||||
| } | } | ||||
| if len(chkObjIDs) > 0 { | if len(chkObjIDs) > 0 { | ||||
| execCtx.Executor.Post(NewCheckObject(chkObjIDs)) | |||||
| execCtx.Executor.Post(NewCheckPackage(chkObjIDs)) | |||||
| } | } | ||||
| } | } | ||||
| func init() { | func init() { | ||||
| RegisterMessageConvertor(func(msg scevt.AgentCheckStorage) Event { return NewAgentCheckStorage(msg.StorageID, msg.ObjectIDs) }) | |||||
| RegisterMessageConvertor(func(msg scevt.AgentCheckStorage) Event { return NewAgentCheckStorage(msg.StorageID, msg.PackageIDs) }) | |||||
| } | } | ||||
| @@ -8,7 +8,7 @@ import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| "gitlink.org.cn/cloudream/storage-common/consts" | "gitlink.org.cn/cloudream/storage-common/consts" | ||||
| "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | ||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event" | |||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event" | |||||
| ) | ) | ||||
| type CheckCache struct { | type CheckCache struct { | ||||
| @@ -1,57 +0,0 @@ | |||||
| package event | |||||
| import ( | |||||
| "github.com/samber/lo" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event" | |||||
| ) | |||||
| type CheckObject struct { | |||||
| scevt.CheckObject | |||||
| } | |||||
| func NewCheckObject(objIDs []int64) *CheckObject { | |||||
| return &CheckObject{ | |||||
| CheckObject: scevt.NewCheckObject(objIDs), | |||||
| } | |||||
| } | |||||
| func (t *CheckObject) TryMerge(other Event) bool { | |||||
| event, ok := other.(*CheckObject) | |||||
| if !ok { | |||||
| return false | |||||
| } | |||||
| t.ObjectIDs = lo.Union(t.ObjectIDs, event.ObjectIDs) | |||||
| return true | |||||
| } | |||||
| func (t *CheckObject) Execute(execCtx ExecuteContext) { | |||||
| log := logger.WithType[CheckObject]("Event") | |||||
| log.Debugf("begin with %v", logger.FormatStruct(t)) | |||||
| defer log.Debugf("end") | |||||
| // 检查对象是否没有被引用的时候,需要读取StorageObject表 | |||||
| builder := reqbuilder.NewBuilder().Metadata().StorageObject().ReadAny() | |||||
| for _, objID := range t.ObjectIDs { | |||||
| builder.Metadata().Object().WriteOne(objID) | |||||
| } | |||||
| mutex, err := builder.MutexLock(execCtx.Args.DistLock) | |||||
| if err != nil { | |||||
| log.Warnf("acquire locks failed, err: %s", err.Error()) | |||||
| return | |||||
| } | |||||
| defer mutex.Unlock() | |||||
| for _, objID := range t.ObjectIDs { | |||||
| err := execCtx.Args.DB.Object().DeleteUnused(execCtx.Args.DB.SQLCtx(), objID) | |||||
| if err != nil { | |||||
| log.WithField("ObjectID", objID).Warnf("delete unused object failed, err: %s", err.Error()) | |||||
| } | |||||
| } | |||||
| } | |||||
| func init() { | |||||
| RegisterMessageConvertor(func(msg scevt.CheckObject) Event { return NewCheckObject(msg.ObjectIDs) }) | |||||
| } | |||||
| @@ -0,0 +1,57 @@ | |||||
| package event | |||||
| import ( | |||||
| "github.com/samber/lo" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/distlock/reqbuilder" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event" | |||||
| ) | |||||
| type CheckPackage struct { | |||||
| scevt.CheckPackage | |||||
| } | |||||
| func NewCheckPackage(objIDs []int64) *CheckPackage { | |||||
| return &CheckPackage{ | |||||
| CheckPackage: scevt.NewCheckPackage(objIDs), | |||||
| } | |||||
| } | |||||
| func (t *CheckPackage) TryMerge(other Event) bool { | |||||
| event, ok := other.(*CheckPackage) | |||||
| if !ok { | |||||
| return false | |||||
| } | |||||
| t.PackageIDs = lo.Union(t.PackageIDs, event.PackageIDs) | |||||
| return true | |||||
| } | |||||
| func (t *CheckPackage) Execute(execCtx ExecuteContext) { | |||||
| log := logger.WithType[CheckPackage]("Event") | |||||
| log.Debugf("begin with %v", logger.FormatStruct(t)) | |||||
| defer log.Debugf("end") | |||||
| // 检查对象是否没有被引用的时候,需要读取StoragePackage表 | |||||
| builder := reqbuilder.NewBuilder().Metadata().StoragePackage().ReadAny() | |||||
| for _, objID := range t.PackageIDs { | |||||
| builder.Metadata().Package().WriteOne(objID) | |||||
| } | |||||
| mutex, err := builder.MutexLock(execCtx.Args.DistLock) | |||||
| if err != nil { | |||||
| log.Warnf("acquire locks failed, err: %s", err.Error()) | |||||
| return | |||||
| } | |||||
| defer mutex.Unlock() | |||||
| for _, objID := range t.PackageIDs { | |||||
| err := execCtx.Args.DB.Package().DeleteUnused(execCtx.Args.DB.SQLCtx(), objID) | |||||
| if err != nil { | |||||
| log.WithField("PackageID", objID).Warnf("delete unused package failed, err: %s", err.Error()) | |||||
| } | |||||
| } | |||||
| } | |||||
| func init() { | |||||
| RegisterMessageConvertor(func(msg scevt.CheckPackage) Event { return NewCheckPackage(msg.PackageIDs) }) | |||||
| } | |||||
| @@ -13,7 +13,7 @@ import ( | |||||
| "gitlink.org.cn/cloudream/storage-scanner/internal/config" | "gitlink.org.cn/cloudream/storage-scanner/internal/config" | ||||
| "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | "gitlink.org.cn/cloudream/storage-common/pkgs/db/model" | ||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event" | |||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event" | |||||
| ) | ) | ||||
| type CheckRepCount struct { | type CheckRepCount struct { | ||||
| @@ -2,12 +2,12 @@ package services | |||||
| import ( | import ( | ||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | "gitlink.org.cn/cloudream/common/pkgs/logger" | ||||
| scmsg "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner" | |||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/message/scanner/event" | |||||
| scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" | |||||
| scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event" | |||||
| "gitlink.org.cn/cloudream/storage-scanner/internal/event" | "gitlink.org.cn/cloudream/storage-scanner/internal/event" | ||||
| ) | ) | ||||
| func (svc *Service) PostEvent(msg *scmsg.PostEvent) { | |||||
| func (svc *Service) PostEvent(msg *scmq.PostEvent) { | |||||
| evtMsg, err := scevt.MapToMessage(msg.Event) | evtMsg, err := scevt.MapToMessage(msg.Event) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -1,39 +0,0 @@ | |||||
| package tickevent | |||||
| import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| "gitlink.org.cn/cloudream/storage-scanner/internal/event" | |||||
| ) | |||||
| const CHECK_OBJECT_BATCH_SIZE = 100 | |||||
| type BatchCheckAllObject struct { | |||||
| lastCheckStart int | |||||
| } | |||||
| func NewBatchCheckAllObject() *BatchCheckAllObject { | |||||
| return &BatchCheckAllObject{} | |||||
| } | |||||
| func (e *BatchCheckAllObject) Execute(ctx ExecuteContext) { | |||||
| log := logger.WithType[BatchCheckAllObject]("TickEvent") | |||||
| log.Debugf("begin") | |||||
| defer log.Debugf("end") | |||||
| objectIDs, err := ctx.Args.DB.Object().BatchGetAllObjectIDs(ctx.Args.DB.SQLCtx(), e.lastCheckStart, CHECK_OBJECT_BATCH_SIZE) | |||||
| if err != nil { | |||||
| log.Warnf("batch get object ids failed, err: %s", err.Error()) | |||||
| return | |||||
| } | |||||
| ctx.Args.EventExecutor.Post(event.NewCheckObject(objectIDs)) | |||||
| // 如果结果的长度小于预期的长度,则认为已经查询了所有,下次从头再来 | |||||
| if len(objectIDs) < CHECK_OBJECT_BATCH_SIZE { | |||||
| e.lastCheckStart = 0 | |||||
| log.Debugf("all object checked, next time will start check at 0") | |||||
| } else { | |||||
| e.lastCheckStart += CHECK_OBJECT_BATCH_SIZE | |||||
| } | |||||
| } | |||||
| @@ -0,0 +1,39 @@ | |||||
| package tickevent | |||||
| import ( | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| "gitlink.org.cn/cloudream/storage-scanner/internal/event" | |||||
| ) | |||||
| const CheckPackageBatchSize = 100 | |||||
| type BatchCheckAllPackage struct { | |||||
| lastCheckStart int | |||||
| } | |||||
| func NewBatchCheckAllPackage() *BatchCheckAllPackage { | |||||
| return &BatchCheckAllPackage{} | |||||
| } | |||||
| func (e *BatchCheckAllPackage) Execute(ctx ExecuteContext) { | |||||
| log := logger.WithType[BatchCheckAllPackage]("TickEvent") | |||||
| log.Debugf("begin") | |||||
| defer log.Debugf("end") | |||||
| packageIDs, err := ctx.Args.DB.Package().BatchGetAllPackageIDs(ctx.Args.DB.SQLCtx(), e.lastCheckStart, CheckPackageBatchSize) | |||||
| if err != nil { | |||||
| log.Warnf("batch get package ids failed, err: %s", err.Error()) | |||||
| return | |||||
| } | |||||
| ctx.Args.EventExecutor.Post(event.NewCheckPackage(packageIDs)) | |||||
| // 如果结果的长度小于预期的长度,则认为已经查询了所有,下次从头再来 | |||||
| if len(packageIDs) < CheckPackageBatchSize { | |||||
| e.lastCheckStart = 0 | |||||
| log.Debugf("all package checked, next time will start check at 0") | |||||
| } else { | |||||
| e.lastCheckStart += CheckPackageBatchSize | |||||
| } | |||||
| } | |||||
| @@ -6,9 +6,9 @@ import ( | |||||
| "sync" | "sync" | ||||
| distlocksvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" | distlocksvc "gitlink.org.cn/cloudream/common/pkgs/distlock/service" | ||||
| log "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||||
| "gitlink.org.cn/cloudream/storage-common/pkgs/db" | "gitlink.org.cn/cloudream/storage-common/pkgs/db" | ||||
| scsvr "gitlink.org.cn/cloudream/storage-common/pkgs/mq/server/scanner" | |||||
| scmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner" | |||||
| "gitlink.org.cn/cloudream/storage-scanner/internal/config" | "gitlink.org.cn/cloudream/storage-scanner/internal/config" | ||||
| "gitlink.org.cn/cloudream/storage-scanner/internal/event" | "gitlink.org.cn/cloudream/storage-scanner/internal/event" | ||||
| "gitlink.org.cn/cloudream/storage-scanner/internal/services" | "gitlink.org.cn/cloudream/storage-scanner/internal/services" | ||||
| @@ -22,7 +22,7 @@ func main() { | |||||
| os.Exit(1) | os.Exit(1) | ||||
| } | } | ||||
| err = log.Init(&config.Cfg().Logger) | |||||
| err = logger.Init(&config.Cfg().Logger) | |||||
| if err != nil { | if err != nil { | ||||
| fmt.Printf("init logger failed, err: %s", err.Error()) | fmt.Printf("init logger failed, err: %s", err.Error()) | ||||
| os.Exit(1) | os.Exit(1) | ||||
| @@ -30,7 +30,7 @@ func main() { | |||||
| db, err := db.NewDB(&config.Cfg().DB) | db, err := db.NewDB(&config.Cfg().DB) | ||||
| if err != nil { | if err != nil { | ||||
| log.Fatalf("new db failed, err: %s", err.Error()) | |||||
| logger.Fatalf("new db failed, err: %s", err.Error()) | |||||
| } | } | ||||
| wg := sync.WaitGroup{} | wg := sync.WaitGroup{} | ||||
| @@ -38,7 +38,7 @@ func main() { | |||||
| distlockSvc, err := distlocksvc.NewService(&config.Cfg().DistLock) | distlockSvc, err := distlocksvc.NewService(&config.Cfg().DistLock) | ||||
| if err != nil { | if err != nil { | ||||
| log.Warnf("new distlock service failed, err: %s", err.Error()) | |||||
| logger.Warnf("new distlock service failed, err: %s", err.Error()) | |||||
| os.Exit(1) | os.Exit(1) | ||||
| } | } | ||||
| go serveDistLock(distlockSvc, &wg) | go serveDistLock(distlockSvc, &wg) | ||||
| @@ -46,12 +46,12 @@ func main() { | |||||
| eventExecutor := event.NewExecutor(db, distlockSvc) | eventExecutor := event.NewExecutor(db, distlockSvc) | ||||
| go serveEventExecutor(&eventExecutor, &wg) | go serveEventExecutor(&eventExecutor, &wg) | ||||
| agtSvr, err := scsvr.NewServer(services.NewService(&eventExecutor), &config.Cfg().RabbitMQ) | |||||
| agtSvr, err := scmq.NewServer(services.NewService(&eventExecutor), &config.Cfg().RabbitMQ) | |||||
| if err != nil { | if err != nil { | ||||
| log.Fatalf("new agent server failed, err: %s", err.Error()) | |||||
| logger.Fatalf("new agent server failed, err: %s", err.Error()) | |||||
| } | } | ||||
| agtSvr.OnError = func(err error) { | agtSvr.OnError = func(err error) { | ||||
| log.Warnf("agent server err: %s", err.Error()) | |||||
| logger.Warnf("agent server err: %s", err.Error()) | |||||
| } | } | ||||
| go serveScannerServer(agtSvr, &wg) | go serveScannerServer(agtSvr, &wg) | ||||
| @@ -65,43 +65,43 @@ func main() { | |||||
| } | } | ||||
| func serveEventExecutor(executor *event.Executor, wg *sync.WaitGroup) { | func serveEventExecutor(executor *event.Executor, wg *sync.WaitGroup) { | ||||
| log.Info("start serving event executor") | |||||
| logger.Info("start serving event executor") | |||||
| err := executor.Execute() | err := executor.Execute() | ||||
| if err != nil { | if err != nil { | ||||
| log.Errorf("event executor stopped with error: %s", err.Error()) | |||||
| logger.Errorf("event executor stopped with error: %s", err.Error()) | |||||
| } | } | ||||
| log.Info("event executor stopped") | |||||
| logger.Info("event executor stopped") | |||||
| wg.Done() | wg.Done() | ||||
| } | } | ||||
| func serveScannerServer(server *scsvr.Server, wg *sync.WaitGroup) { | |||||
| log.Info("start serving scanner server") | |||||
| func serveScannerServer(server *scmq.Server, wg *sync.WaitGroup) { | |||||
| logger.Info("start serving scanner server") | |||||
| err := server.Serve() | err := server.Serve() | ||||
| if err != nil { | if err != nil { | ||||
| log.Errorf("scanner server stopped with error: %s", err.Error()) | |||||
| logger.Errorf("scanner server stopped with error: %s", err.Error()) | |||||
| } | } | ||||
| log.Info("scanner server stopped") | |||||
| logger.Info("scanner server stopped") | |||||
| wg.Done() | wg.Done() | ||||
| } | } | ||||
| func serveDistLock(svc *distlocksvc.Service, wg *sync.WaitGroup) { | func serveDistLock(svc *distlocksvc.Service, wg *sync.WaitGroup) { | ||||
| log.Info("start serving distlock") | |||||
| logger.Info("start serving distlock") | |||||
| err := svc.Serve() | err := svc.Serve() | ||||
| if err != nil { | if err != nil { | ||||
| log.Errorf("distlock stopped with error: %s", err.Error()) | |||||
| logger.Errorf("distlock stopped with error: %s", err.Error()) | |||||
| } | } | ||||
| log.Info("distlock stopped") | |||||
| logger.Info("distlock stopped") | |||||
| wg.Done() | wg.Done() | ||||
| } | } | ||||
| @@ -113,7 +113,7 @@ func startTickEvent(tickExecutor *tickevent.Executor) { | |||||
| tickExecutor.Start(tickevent.NewBatchAllAgentCheckCache(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) | tickExecutor.Start(tickevent.NewBatchAllAgentCheckCache(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) | ||||
| tickExecutor.Start(tickevent.NewBatchCheckAllObject(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) | |||||
| tickExecutor.Start(tickevent.NewBatchCheckAllPackage(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) | |||||
| tickExecutor.Start(tickevent.NewBatchCheckAllRepCount(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) | tickExecutor.Start(tickevent.NewBatchCheckAllRepCount(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) | ||||