|
- package event
-
- import (
- "database/sql"
- "time"
-
- "github.com/samber/lo"
- "gitlink.org.cn/cloudream/common/pkgs/logger"
- "gitlink.org.cn/cloudream/common/pkgs/mq"
- "gitlink.org.cn/cloudream/storage-common/consts"
- "gitlink.org.cn/cloudream/storage-common/globals"
- "gitlink.org.cn/cloudream/storage-common/pkgs/db/model"
- "gitlink.org.cn/cloudream/storage-common/pkgs/distlock/reqbuilder"
- agtmq "gitlink.org.cn/cloudream/storage-common/pkgs/mq/agent"
- scevt "gitlink.org.cn/cloudream/storage-common/pkgs/mq/scanner/event"
- )
-
- type AgentCheckStorage struct {
- scevt.AgentCheckStorage
- }
-
- func NewAgentCheckStorage(storageID int64, packageIDs []int64) *AgentCheckStorage {
- return &AgentCheckStorage{
- AgentCheckStorage: scevt.NewAgentCheckStorage(storageID, packageIDs),
- }
- }
-
- func (t *AgentCheckStorage) TryMerge(other Event) bool {
- event, ok := other.(*AgentCheckStorage)
- if !ok {
- return false
- }
-
- if t.StorageID != event.StorageID {
- return false
- }
-
- // PackageIDs为nil时代表全量检查
- if event.PackageIDs == nil {
- t.PackageIDs = nil
- } else if t.PackageIDs != nil {
- t.PackageIDs = lo.Union(t.PackageIDs, event.PackageIDs)
- }
-
- return true
- }
-
- func (t *AgentCheckStorage) Execute(execCtx ExecuteContext) {
- log := logger.WithType[AgentCheckStorage]("Event")
- log.Debugf("begin with %v", logger.FormatStruct(t))
- defer log.Debugf("end")
-
- // 读取数据的地方就不加锁了,因为check任务会反复执行,单次失败问题不大
-
- stg, err := execCtx.Args.DB.Storage().GetByID(execCtx.Args.DB.SQLCtx(), t.StorageID)
- if err != nil {
- if err != sql.ErrNoRows {
- log.WithField("StorageID", t.StorageID).Warnf("get storage failed, err: %s", err.Error())
- }
- return
- }
-
- node, err := execCtx.Args.DB.Node().GetByID(execCtx.Args.DB.SQLCtx(), stg.NodeID)
- if err != nil {
- if err != sql.ErrNoRows {
- log.WithField("StorageID", t.StorageID).Warnf("get storage node failed, err: %s", err.Error())
- }
- return
- }
-
- // TODO unavailable的节点需不需要发送任务?
- if node.State != consts.NodeStateNormal {
- return
- }
-
- if t.PackageIDs == nil {
- t.checkComplete(execCtx, stg)
- } else {
- t.checkIncrement(execCtx, stg)
- }
- }
-
- func (t *AgentCheckStorage) checkComplete(execCtx ExecuteContext, stg model.Storage) {
- log := logger.WithType[AgentCheckStorage]("Event")
-
- mutex, err := reqbuilder.NewBuilder().
- Metadata().
- // 全量模式下查询、修改Move记录
- StoragePackage().WriteAny().
- Storage().
- // 全量模式下删除对象文件
- WriteAnyPackage(t.StorageID).
- MutexLock(execCtx.Args.DistLock)
- if err != nil {
- log.Warnf("acquire locks failed, err: %s", err.Error())
- return
- }
- defer mutex.Unlock()
-
- packages, err := execCtx.Args.DB.StoragePackage().GetAllByStorageID(execCtx.Args.DB.SQLCtx(), t.StorageID)
- if err != nil {
- log.WithField("StorageID", t.StorageID).Warnf("get storage packages failed, err: %s", err.Error())
- return
- }
-
- t.startCheck(execCtx, stg, true, packages)
- }
-
- func (t *AgentCheckStorage) checkIncrement(execCtx ExecuteContext, stg model.Storage) {
- log := logger.WithType[AgentCheckStorage]("Event")
-
- mutex, err := reqbuilder.NewBuilder().
- Metadata().
- // 全量模式下查询、修改Move记录。因为可能有多个User Move相同的文件,所以只能用集合Write锁
- StoragePackage().WriteAny().
- Storage().
- // 全量模式下删除对象文件。因为可能有多个User Move相同的文件,所以只能用集合Write锁
- WriteAnyPackage(t.StorageID).
- MutexLock(execCtx.Args.DistLock)
- if err != nil {
- log.Warnf("acquire locks failed, err: %s", err.Error())
- return
- }
- defer mutex.Unlock()
-
- var packages []model.StoragePackage
- for _, objID := range t.PackageIDs {
- objs, err := execCtx.Args.DB.StoragePackage().GetAllByStorageAndPackageID(execCtx.Args.DB.SQLCtx(), t.StorageID, objID)
- if err != nil {
- log.WithField("StorageID", t.StorageID).
- WithField("PackageID", objID).
- Warnf("get storage package failed, err: %s", err.Error())
- return
- }
-
- packages = append(packages, objs...)
- }
-
- t.startCheck(execCtx, stg, false, packages)
- }
-
- func (t *AgentCheckStorage) startCheck(execCtx ExecuteContext, stg model.Storage, isComplete bool, packages []model.StoragePackage) {
- log := logger.WithType[AgentCheckStorage]("Event")
-
- // 投递任务
- agentClient, err := globals.AgentMQPool.Acquire(stg.NodeID)
- if err != nil {
- log.WithField("NodeID", stg.NodeID).Warnf("create agent client failed, err: %s", err.Error())
- return
- }
- defer agentClient.Close()
-
- checkResp, err := agentClient.StorageCheck(agtmq.NewStorageCheck(stg.StorageID, stg.Directory, isComplete, packages), mq.RequestOption{Timeout: time.Minute})
- if err != nil {
- log.WithField("NodeID", stg.NodeID).Warnf("checking storage: %s", err.Error())
- return
- }
-
- // 根据返回结果修改数据库
- var chkObjIDs []int64
- for _, entry := range checkResp.Entries {
- switch entry.Operation {
- case agtmq.CHECK_STORAGE_RESP_OP_DELETE:
- err := execCtx.Args.DB.StoragePackage().Delete(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.PackageID, entry.UserID)
- if err != nil {
- log.WithField("StorageID", t.StorageID).
- WithField("PackageID", entry.PackageID).
- Warnf("delete storage package failed, err: %s", err.Error())
- }
- chkObjIDs = append(chkObjIDs, entry.PackageID)
-
- log.WithField("StorageID", t.StorageID).
- WithField("PackageID", entry.PackageID).
- WithField("UserID", entry.UserID).
- Debugf("delete storage package")
-
- case agtmq.CHECK_STORAGE_RESP_OP_SET_NORMAL:
- err := execCtx.Args.DB.StoragePackage().SetStateNormal(execCtx.Args.DB.SQLCtx(), t.StorageID, entry.PackageID, entry.UserID)
- if err != nil {
- log.WithField("StorageID", t.StorageID).
- WithField("PackageID", entry.PackageID).
- Warnf("change storage package state failed, err: %s", err.Error())
- }
-
- log.WithField("StorageID", t.StorageID).
- WithField("PackageID", entry.PackageID).
- WithField("UserID", entry.UserID).
- Debugf("set storage package normal")
- }
- }
-
- if len(chkObjIDs) > 0 {
- execCtx.Executor.Post(NewCheckPackage(chkObjIDs))
- }
- }
-
- func init() {
- RegisterMessageConvertor(func(msg scevt.AgentCheckStorage) Event { return NewAgentCheckStorage(msg.StorageID, msg.PackageIDs) })
- }
|