| @@ -72,7 +72,7 @@ func (db *ObjectBlockDB) DeleteInPackage(ctx SQLContext, packageID types.Package | |||
| return ctx.Table("ObjectBlock").Where("ObjectID IN (SELECT ObjectID FROM Object WHERE PackageID = ?)", packageID).Delete(&types.ObjectBlock{}).Error | |||
| } | |||
| func (db *ObjectBlockDB) StorageBatchDelete(ctx SQLContext, spaceID types.UserSpaceID, fileHashes []types.FileHash) error { | |||
| func (db *ObjectBlockDB) BatchDeleteByFileHash(ctx SQLContext, spaceID types.UserSpaceID, fileHashes []types.FileHash) error { | |||
| if len(fileHashes) == 0 { | |||
| return nil | |||
| } | |||
| @@ -112,7 +112,7 @@ func (*PinnedObjectDB) DeleteInPackageAtStorage(ctx SQLContext, packageID types. | |||
| return err | |||
| } | |||
| func (*PinnedObjectDB) StorageBatchDelete(ctx SQLContext, spaceID types.UserSpaceID, objectIDs []types.ObjectID) error { | |||
| func (*PinnedObjectDB) BatchDelete(ctx SQLContext, spaceID types.UserSpaceID, objectIDs []types.ObjectID) error { | |||
| if len(objectIDs) == 0 { | |||
| return nil | |||
| } | |||
| @@ -0,0 +1,138 @@ | |||
| package ticktock | |||
| import ( | |||
| "fmt" | |||
| "time" | |||
| "github.com/samber/lo" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| "gitlink.org.cn/cloudream/common/utils/reflect2" | |||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | |||
| clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" | |||
| stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" | |||
| hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" | |||
| ) | |||
| // CheckShardStore 代表一个用于处理代理缓存检查事件的结构体 | |||
| type CheckShardStore struct { | |||
| } | |||
| func (j *CheckShardStore) Name() string { | |||
| return reflect2.TypeNameOf[CheckShardStore]() | |||
| } | |||
| // Execute 执行缓存检查操作,对比本地缓存与代理返回的缓存信息,更新数据库中的缓存记录 | |||
| func (j *CheckShardStore) Execute(t *TickTock) { | |||
| log := logger.WithType[CheckShardStore]("TickTock") | |||
| startTime := time.Now() | |||
| log.Debugf("job start") | |||
| defer func() { | |||
| log.Debugf("job end, time: %v", time.Since(startTime)) | |||
| }() | |||
| db2 := t.db | |||
| spaceIDs, err := db2.UserSpace().GetAllIDs(db2.DefCtx()) | |||
| if err != nil { | |||
| log.Warnf("getting all user space ids: %s", err.Error()) | |||
| return | |||
| } | |||
| for _, spaceID := range spaceIDs { | |||
| detail := t.spaceMeta.Get(spaceID) | |||
| if detail == nil { | |||
| continue | |||
| } | |||
| err := j.checkOne(t, detail) | |||
| if err != nil { | |||
| log.Warnf("checking user space %v: %v", detail.String(), err) | |||
| continue | |||
| } | |||
| } | |||
| } | |||
| func (j *CheckShardStore) checkOne(t *TickTock, space *clitypes.UserSpaceDetail) error { | |||
| log := logger.WithType[CheckShardStore]("TickTock") | |||
| if space.MasterHub == nil { | |||
| log.Infof("user space %v has no master hub", space.UserSpace) | |||
| return nil | |||
| } | |||
| agtCli, err := stgglb.HubMQPool.Acquire(space.MasterHub.HubID) | |||
| if err != nil { | |||
| return fmt.Errorf("new hub mq client: %w", err) | |||
| } | |||
| defer stgglb.HubMQPool.Release(agtCli) | |||
| checkResp, err := agtCli.CheckCache(hubmq.NewCheckCache(*space), mq.RequestOption{Timeout: time.Minute}) | |||
| if err != nil { | |||
| return fmt.Errorf("request to check cache: %w", err) | |||
| } | |||
| realFileHashes := lo.SliceToMap(checkResp.FileHashes, func(hash clitypes.FileHash) (clitypes.FileHash, bool) { return hash, true }) | |||
| // 在事务中执行缓存更新操作 | |||
| t.db.DoTx(func(tx db.SQLContext) error { | |||
| j.checkPinnedObject(t, tx, space, realFileHashes) | |||
| j.checkObjectBlock(t, tx, space, realFileHashes) | |||
| return nil | |||
| }) | |||
| return nil | |||
| } | |||
| // checkPinnedObject 对比PinnedObject表,若实际文件不存在,则进行删除操作 | |||
| func (*CheckShardStore) checkPinnedObject(t *TickTock, tx db.SQLContext, space *clitypes.UserSpaceDetail, realFileHashes map[clitypes.FileHash]bool) { | |||
| log := logger.WithType[CheckShardStore]("TickTock") | |||
| objs, err := t.db.PinnedObject().GetObjectsByUserSpaceID(tx, space.UserSpace.UserSpaceID) | |||
| if err != nil { | |||
| log.Warnf("getting pinned objects by user space id %v: %v", space.UserSpace, err) | |||
| return | |||
| } | |||
| var rms []clitypes.ObjectID | |||
| for _, c := range objs { | |||
| if realFileHashes[c.FileHash] { | |||
| continue | |||
| } | |||
| rms = append(rms, c.ObjectID) | |||
| } | |||
| if len(rms) > 0 { | |||
| err = t.db.PinnedObject().BatchDelete(tx, space.UserSpace.UserSpaceID, rms) | |||
| if err != nil { | |||
| log.Warnf("batch delete user space %v pinned objects: %v", space.UserSpace, err) | |||
| } | |||
| } | |||
| } | |||
| // checkObjectBlock 对比ObjectBlock表,若实际文件不存在,则进行删除操作 | |||
| func (*CheckShardStore) checkObjectBlock(t *TickTock, tx db.SQLContext, space *clitypes.UserSpaceDetail, realFileHashes map[clitypes.FileHash]bool) { | |||
| log := logger.WithType[CheckShardStore]("TickTock") | |||
| blocks, err := t.db.ObjectBlock().GetByUserSpaceID(tx, space.UserSpace.UserSpaceID) | |||
| if err != nil { | |||
| log.Warnf("getting object blocks by user space id %v: %v", space.UserSpace, err) | |||
| return | |||
| } | |||
| var rms []clitypes.FileHash | |||
| for _, b := range blocks { | |||
| if realFileHashes[b.FileHash] { | |||
| continue | |||
| } | |||
| rms = append(rms, b.FileHash) | |||
| } | |||
| if len(rms) > 0 { | |||
| err = t.db.ObjectBlock().BatchDeleteByFileHash(tx, space.UserSpace.UserSpaceID, rms) | |||
| if err != nil { | |||
| log.Warnf("batch delete user space %v object blocks: %v", space.UserSpace, err) | |||
| } | |||
| } | |||
| } | |||
| @@ -0,0 +1,106 @@ | |||
| package ticktock | |||
| import ( | |||
| "fmt" | |||
| "time" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| "gitlink.org.cn/cloudream/common/utils/reflect2" | |||
| "gitlink.org.cn/cloudream/jcs-pub/client/internal/db" | |||
| "gitlink.org.cn/cloudream/jcs-pub/client/types" | |||
| stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" | |||
| hubmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" | |||
| ) | |||
| type ShardStoreGC struct { | |||
| } | |||
| func (j *ShardStoreGC) Name() string { | |||
| return reflect2.TypeNameOf[ShardStoreGC]() | |||
| } | |||
| // Execute 执行垃圾回收操作。 | |||
| func (j *ShardStoreGC) Execute(t *TickTock) { | |||
| log := logger.WithType[ShardStoreGC]("Event") | |||
| startTime := time.Now() | |||
| log.Debugf("job start") | |||
| defer func() { | |||
| log.Debugf("job end, time: %v", time.Since(startTime)) | |||
| }() | |||
| // TODO 加锁 | |||
| // // 使用分布式锁进行资源锁定 | |||
| // mutex, err := reqbuilder.NewBuilder(). | |||
| // // 执行IPFS垃圾回收 | |||
| // Shard().GC(j.StorageID). | |||
| // MutexLock(execCtx.Args.DistLock) | |||
| // if err != nil { | |||
| // log.Warnf("acquire locks failed, err: %s", err.Error()) | |||
| // return | |||
| // } | |||
| // defer mutex.Unlock() | |||
| spaceIDs, err := t.db.UserSpace().GetAllIDs(t.db.DefCtx()) | |||
| if err != nil { | |||
| log.Warnf("getting user space ids: %v", err) | |||
| return | |||
| } | |||
| for _, spaceID := range spaceIDs { | |||
| detail := t.spaceMeta.Get(spaceID) | |||
| if detail == nil { | |||
| continue | |||
| } | |||
| err := j.gcOne(t, detail) | |||
| if err != nil { | |||
| log.Warnf("gc one user space: %v: %v", spaceID, err) | |||
| continue | |||
| } | |||
| } | |||
| } | |||
| func (j *ShardStoreGC) gcOne(t *TickTock, space *types.UserSpaceDetail) error { | |||
| db2 := t.db | |||
| // 收集需要进行垃圾回收的文件哈希值 | |||
| var allFileHashes []types.FileHash | |||
| err := db2.DoTx(func(tx db.SQLContext) error { | |||
| blocks, err := db2.ObjectBlock().GetByUserSpaceID(tx, space.UserSpace.UserSpaceID) | |||
| if err != nil { | |||
| return fmt.Errorf("getting object blocks by hub id: %w", err) | |||
| } | |||
| for _, c := range blocks { | |||
| allFileHashes = append(allFileHashes, c.FileHash) | |||
| } | |||
| objs, err := db2.PinnedObject().GetObjectsByUserSpaceID(tx, space.UserSpace.UserSpaceID) | |||
| if err != nil { | |||
| return fmt.Errorf("getting pinned objects by hub id: %w", err) | |||
| } | |||
| for _, o := range objs { | |||
| allFileHashes = append(allFileHashes, o.FileHash) | |||
| } | |||
| return nil | |||
| }) | |||
| if err != nil { | |||
| return err | |||
| } | |||
| // 获取与节点通信的代理客户端 | |||
| agtCli, err := stgglb.HubMQPool.Acquire(space.MasterHub.HubID) | |||
| if err != nil { | |||
| return fmt.Errorf("new hub mq client: %w", err) | |||
| } | |||
| defer stgglb.HubMQPool.Release(agtCli) | |||
| // 向代理发送垃圾回收请求 | |||
| _, err = agtCli.CacheGC(hubmq.ReqCacheGC(*space, allFileHashes), mq.RequestOption{Timeout: time.Minute}) | |||
| if err != nil { | |||
| return fmt.Errorf("request to cache gc: %w", err) | |||
| } | |||
| return nil | |||
| } | |||
| @@ -80,4 +80,12 @@ func (t *TickTock) initJobs() { | |||
| t.addJob(&ChangeRedundancy{}, gocron.DailyJob(1, gocron.NewAtTimes( | |||
| gocron.NewAtTime(0, 0, 0), | |||
| ))) | |||
| t.addJob(&CheckShardStore{}, gocron.DailyJob(1, gocron.NewAtTimes( | |||
| gocron.NewAtTime(1, 0, 0), | |||
| ))) | |||
| t.addJob(&ShardStoreGC{}, gocron.DailyJob(1, gocron.NewAtTimes( | |||
| gocron.NewAtTime(2, 0, 0), | |||
| ))) | |||
| } | |||
| @@ -0,0 +1,61 @@ | |||
| package hub | |||
| import ( | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" | |||
| ) | |||
| type CacheService interface { | |||
| CheckCache(msg *CheckCache) (*CheckCacheResp, *mq.CodeMessage) | |||
| CacheGC(msg *CacheGC) (*CacheGCResp, *mq.CodeMessage) | |||
| } | |||
| // 检查节点上的IPFS | |||
| var _ = Register(Service.CheckCache) | |||
| type CheckCache struct { | |||
| mq.MessageBodyBase | |||
| UserSpace clitypes.UserSpaceDetail `json:"userSpace"` | |||
| } | |||
| type CheckCacheResp struct { | |||
| mq.MessageBodyBase | |||
| FileHashes []clitypes.FileHash `json:"fileHashes"` | |||
| } | |||
| func NewCheckCache(space clitypes.UserSpaceDetail) *CheckCache { | |||
| return &CheckCache{UserSpace: space} | |||
| } | |||
| func NewCheckCacheResp(fileHashes []clitypes.FileHash) *CheckCacheResp { | |||
| return &CheckCacheResp{ | |||
| FileHashes: fileHashes, | |||
| } | |||
| } | |||
| func (client *Client) CheckCache(msg *CheckCache, opts ...mq.RequestOption) (*CheckCacheResp, error) { | |||
| return mq.Request(Service.CheckCache, client.rabbitCli, msg, opts...) | |||
| } | |||
| // 清理Cache中不用的文件 | |||
| var _ = Register(Service.CacheGC) | |||
| type CacheGC struct { | |||
| mq.MessageBodyBase | |||
| UserSpace clitypes.UserSpaceDetail `json:"userSpace"` | |||
| Avaiables []clitypes.FileHash `json:"avaiables"` | |||
| } | |||
| type CacheGCResp struct { | |||
| mq.MessageBodyBase | |||
| } | |||
| func ReqCacheGC(space clitypes.UserSpaceDetail, avaiables []clitypes.FileHash) *CacheGC { | |||
| return &CacheGC{ | |||
| UserSpace: space, | |||
| Avaiables: avaiables, | |||
| } | |||
| } | |||
| func RespCacheGC() *CacheGCResp { | |||
| return &CacheGCResp{} | |||
| } | |||
| func (client *Client) CacheGC(msg *CacheGC, opts ...mq.RequestOption) (*CacheGCResp, error) { | |||
| return mq.Request(Service.CacheGC, client.rabbitCli, msg, opts...) | |||
| } | |||
| @@ -10,7 +10,7 @@ import ( | |||
| type Service interface { | |||
| // UserSpaceService | |||
| // CacheService | |||
| CacheService | |||
| HubService | |||
| } | |||
| @@ -0,0 +1,43 @@ | |||
| package mq | |||
| import ( | |||
| "fmt" | |||
| "gitlink.org.cn/cloudream/common/consts/errorcode" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| clitypes "gitlink.org.cn/cloudream/jcs-pub/client/types" | |||
| agtmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" | |||
| ) | |||
| func (svc *Service) CheckCache(msg *agtmq.CheckCache) (*agtmq.CheckCacheResp, *mq.CodeMessage) { | |||
| store, err := svc.stgPool.GetShardStore(&msg.UserSpace) | |||
| if err != nil { | |||
| return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of user space %v: %v", msg.UserSpace, err)) | |||
| } | |||
| infos, err := store.ListAll() | |||
| if err != nil { | |||
| return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("listting file in shard store: %v", err)) | |||
| } | |||
| var fileHashes []clitypes.FileHash | |||
| for _, info := range infos { | |||
| fileHashes = append(fileHashes, info.Hash) | |||
| } | |||
| return mq.ReplyOK(agtmq.NewCheckCacheResp(fileHashes)) | |||
| } | |||
| func (svc *Service) CacheGC(msg *agtmq.CacheGC) (*agtmq.CacheGCResp, *mq.CodeMessage) { | |||
| store, err := svc.stgPool.GetShardStore(&msg.UserSpace) | |||
| if err != nil { | |||
| return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("get shard store of user space %v: %v", msg.UserSpace, err)) | |||
| } | |||
| err = store.GC(msg.Avaiables) | |||
| if err != nil { | |||
| return nil, mq.Failed(errorcode.OperationFailed, fmt.Sprintf("purging cache: %v", err)) | |||
| } | |||
| return mq.ReplyOK(agtmq.RespCacheGC()) | |||
| } | |||
| @@ -1,180 +0,0 @@ | |||
| package event | |||
| import ( | |||
| "time" | |||
| "github.com/samber/lo" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" | |||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/db2" | |||
| agtmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" | |||
| scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" | |||
| ) | |||
| // HubCheckShardStore 代表一个用于处理代理缓存检查事件的结构体 | |||
| type HubCheckShardStore struct { | |||
| *scevt.HubCheckShardStore | |||
| } | |||
| // NewHubCheckShardStore 创建一个新的 HubCheckCache 实例 | |||
| func NewHubCheckShardStore(evt *scevt.HubCheckShardStore) *HubCheckShardStore { | |||
| return &HubCheckShardStore{ | |||
| HubCheckShardStore: evt, | |||
| } | |||
| } | |||
| // TryMerge 尝试合并当前事件与另一个事件 | |||
| // 如果另一个事件类型不匹配或节点ID不同,则不进行合并 | |||
| func (t *HubCheckShardStore) TryMerge(other Event) bool { | |||
| event, ok := other.(*HubCheckShardStore) | |||
| if !ok { | |||
| return false | |||
| } | |||
| if event.StorageID != t.StorageID { | |||
| return false | |||
| } | |||
| return true | |||
| } | |||
| // Execute 执行缓存检查操作,对比本地缓存与代理返回的缓存信息,更新数据库中的缓存记录 | |||
| func (t *HubCheckShardStore) Execute(execCtx ExecuteContext) { | |||
| log := logger.WithType[HubCheckShardStore]("Event") | |||
| startTime := time.Now() | |||
| log.Debugf("begin with %v", logger.FormatStruct(t.HubCheckShardStore)) | |||
| defer func() { | |||
| log.Debugf("end, time: %v", time.Since(startTime)) | |||
| }() | |||
| stg, err := execCtx.Args.DB.Storage().GetByID(execCtx.Args.DB.DefCtx(), t.StorageID) | |||
| if err != nil { | |||
| log.WithField("StorageID", t.StorageID).Warnf("getting shard storage by storage id: %s", err.Error()) | |||
| return | |||
| } | |||
| agtCli, err := stgglb.HubMQPool.Acquire(stg.MasterHub) | |||
| if err != nil { | |||
| log.WithField("StorageID", t.StorageID).Warnf("create hub client failed, err: %s", err.Error()) | |||
| return | |||
| } | |||
| defer stgglb.HubMQPool.Release(agtCli) | |||
| checkResp, err := agtCli.CheckCache(agtmq.NewCheckCache(t.StorageID), mq.RequestOption{Timeout: time.Minute}) | |||
| if err != nil { | |||
| log.WithField("StorageID", t.StorageID).Warnf("checking shard store: %s", err.Error()) | |||
| return | |||
| } | |||
| realFileHashes := lo.SliceToMap(checkResp.FileHashes, func(hash cdssdk.FileHash) (cdssdk.FileHash, bool) { return hash, true }) | |||
| // 在事务中执行缓存更新操作 | |||
| execCtx.Args.DB.DoTx(func(tx db2.SQLContext) error { | |||
| t.checkCache(execCtx, tx, realFileHashes) | |||
| t.checkPinnedObject(execCtx, tx, realFileHashes) | |||
| t.checkObjectBlock(execCtx, tx, realFileHashes) | |||
| return nil | |||
| }) | |||
| } | |||
| // checkCache 对比Cache表中的记录,根据实际存在的文件哈希值,进行增加或删除操作 | |||
| func (t *HubCheckShardStore) checkCache(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { | |||
| log := logger.WithType[HubCheckShardStore]("Event") | |||
| caches, err := execCtx.Args.DB.Cache().GetByStorageID(tx, t.StorageID) | |||
| if err != nil { | |||
| log.WithField("StorageID", t.StorageID).Warnf("getting caches by storage id: %s", err.Error()) | |||
| return | |||
| } | |||
| realFileHashesCp := make(map[cdssdk.FileHash]bool) | |||
| for k, v := range realFileHashes { | |||
| realFileHashesCp[k] = v | |||
| } | |||
| var rms []cdssdk.FileHash | |||
| for _, c := range caches { | |||
| if realFileHashesCp[c.FileHash] { | |||
| delete(realFileHashesCp, c.FileHash) | |||
| continue | |||
| } | |||
| rms = append(rms, c.FileHash) | |||
| } | |||
| if len(rms) > 0 { | |||
| err = execCtx.Args.DB.Cache().StorageBatchDelete(tx, t.StorageID, rms) | |||
| if err != nil { | |||
| log.Warnf("batch delete storage caches: %w", err.Error()) | |||
| } | |||
| } | |||
| if len(realFileHashesCp) > 0 { | |||
| err = execCtx.Args.DB.Cache().BatchCreateOnSameStorage(tx, lo.Keys(realFileHashesCp), t.StorageID, 0) | |||
| if err != nil { | |||
| log.Warnf("batch create storage caches: %w", err) | |||
| return | |||
| } | |||
| } | |||
| } | |||
| // checkPinnedObject 对比PinnedObject表,若实际文件不存在,则进行删除操作 | |||
| func (t *HubCheckShardStore) checkPinnedObject(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { | |||
| log := logger.WithType[HubCheckShardStore]("Event") | |||
| objs, err := execCtx.Args.DB.PinnedObject().GetObjectsByStorageID(tx, t.StorageID) | |||
| if err != nil { | |||
| log.WithField("StorageID", t.StorageID).Warnf("getting pinned objects by storage id: %s", err.Error()) | |||
| return | |||
| } | |||
| var rms []cdssdk.ObjectID | |||
| for _, c := range objs { | |||
| if realFileHashes[c.FileHash] { | |||
| continue | |||
| } | |||
| rms = append(rms, c.ObjectID) | |||
| } | |||
| if len(rms) > 0 { | |||
| err = execCtx.Args.DB.PinnedObject().StorageBatchDelete(tx, t.StorageID, rms) | |||
| if err != nil { | |||
| log.Warnf("batch delete storage pinned objects: %s", err.Error()) | |||
| } | |||
| } | |||
| } | |||
| // checkObjectBlock 对比ObjectBlock表,若实际文件不存在,则进行删除操作 | |||
| func (t *HubCheckShardStore) checkObjectBlock(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { | |||
| log := logger.WithType[HubCheckShardStore]("Event") | |||
| blocks, err := execCtx.Args.DB.ObjectBlock().GetByStorageID(tx, t.StorageID) | |||
| if err != nil { | |||
| log.WithField("StorageID", t.StorageID).Warnf("getting object blocks by storage id: %s", err.Error()) | |||
| return | |||
| } | |||
| var rms []cdssdk.FileHash | |||
| for _, b := range blocks { | |||
| if realFileHashes[b.FileHash] { | |||
| continue | |||
| } | |||
| rms = append(rms, b.FileHash) | |||
| } | |||
| if len(rms) > 0 { | |||
| err = execCtx.Args.DB.ObjectBlock().StorageBatchDelete(tx, t.StorageID, rms) | |||
| if err != nil { | |||
| log.Warnf("batch delete storage object blocks: %s", err.Error()) | |||
| } | |||
| } | |||
| } | |||
| // init 注册HubCheckCache消息转换器 | |||
| func init() { | |||
| RegisterMessageConvertor(NewHubCheckShardStore) | |||
| } | |||
| @@ -1,124 +0,0 @@ | |||
| package event | |||
| import ( | |||
| "fmt" | |||
| "time" | |||
| "gitlink.org.cn/cloudream/common/pkgs/logger" | |||
| "gitlink.org.cn/cloudream/common/pkgs/mq" | |||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | |||
| stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" | |||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/db2" | |||
| "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/distlock/reqbuilder" | |||
| agtmq "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/hub" | |||
| scevt "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/mq/scanner/event" | |||
| ) | |||
| // HubShardStoreGC 类封装了扫描器事件中的HubShardStoreGC结构。 | |||
| type HubShardStoreGC struct { | |||
| *scevt.HubShardStoreGC | |||
| } | |||
| // NewHubShardStoreGC 创建一个新的HubCacheGC实例。 | |||
| // evt: 传入的扫描器事件中的HubCacheGC实例。 | |||
| func NewHubShardStoreGC(evt *scevt.HubShardStoreGC) *HubShardStoreGC { | |||
| return &HubShardStoreGC{ | |||
| HubShardStoreGC: evt, | |||
| } | |||
| } | |||
| // TryMerge 尝试合并当前事件与另一个事件。 | |||
| // other: 待合并的另一个事件。 | |||
| // 返回值表示是否成功合并。 | |||
| func (t *HubShardStoreGC) TryMerge(other Event) bool { | |||
| event, ok := other.(*HubShardStoreGC) | |||
| if !ok { | |||
| return false | |||
| } | |||
| if event.StorageID != t.StorageID { | |||
| return false | |||
| } | |||
| return true | |||
| } | |||
| // Execute 执行垃圾回收操作。 | |||
| // execCtx: 执行上下文,包含执行所需的各种参数和环境。 | |||
| func (t *HubShardStoreGC) Execute(execCtx ExecuteContext) { | |||
| log := logger.WithType[HubShardStoreGC]("Event") | |||
| startTime := time.Now() | |||
| log.Debugf("begin with %v", logger.FormatStruct(t.HubShardStoreGC)) | |||
| defer func() { | |||
| log.Debugf("end, time: %v", time.Since(startTime)) | |||
| }() | |||
| // 使用分布式锁进行资源锁定 | |||
| mutex, err := reqbuilder.NewBuilder(). | |||
| // 执行IPFS垃圾回收 | |||
| Shard().GC(t.StorageID). | |||
| MutexLock(execCtx.Args.DistLock) | |||
| if err != nil { | |||
| log.Warnf("acquire locks failed, err: %s", err.Error()) | |||
| return | |||
| } | |||
| defer mutex.Unlock() | |||
| // 收集需要进行垃圾回收的文件哈希值 | |||
| var allFileHashes []cdssdk.FileHash | |||
| var masterHub cdssdk.Hub | |||
| err = execCtx.Args.DB.DoTx(func(tx db2.SQLContext) error { | |||
| stg, err := execCtx.Args.DB.Storage().GetByID(tx, t.StorageID) | |||
| if err != nil { | |||
| return fmt.Errorf("getting storage by id: %w", err) | |||
| } | |||
| masterHub, err = execCtx.Args.DB.Hub().GetByID(tx, stg.MasterHub) | |||
| if err != nil { | |||
| return fmt.Errorf("getting master hub by id: %w", err) | |||
| } | |||
| blocks, err := execCtx.Args.DB.ObjectBlock().GetByStorageID(tx, t.StorageID) | |||
| if err != nil { | |||
| return fmt.Errorf("getting object blocks by hub id: %w", err) | |||
| } | |||
| for _, c := range blocks { | |||
| allFileHashes = append(allFileHashes, c.FileHash) | |||
| } | |||
| objs, err := execCtx.Args.DB.PinnedObject().GetObjectsByStorageID(tx, t.StorageID) | |||
| if err != nil { | |||
| return fmt.Errorf("getting pinned objects by hub id: %w", err) | |||
| } | |||
| for _, o := range objs { | |||
| allFileHashes = append(allFileHashes, o.FileHash) | |||
| } | |||
| return nil | |||
| }) | |||
| if err != nil { | |||
| log.WithField("HubID", t.StorageID).Warn(err.Error()) | |||
| return | |||
| } | |||
| // 获取与节点通信的代理客户端 | |||
| agtCli, err := stgglb.HubMQPool.Acquire(masterHub.HubID) | |||
| if err != nil { | |||
| log.WithField("HubID", t.StorageID).Warnf("create hub client failed, err: %s", err.Error()) | |||
| return | |||
| } | |||
| defer stgglb.HubMQPool.Release(agtCli) | |||
| // 向代理发送垃圾回收请求 | |||
| _, err = agtCli.CacheGC(agtmq.ReqCacheGC(t.StorageID, allFileHashes), mq.RequestOption{Timeout: time.Minute}) | |||
| if err != nil { | |||
| log.WithField("HubID", t.StorageID).Warnf("ipfs gc: %s", err.Error()) | |||
| return | |||
| } | |||
| } | |||
| // 注册消息转换器,使系统能够处理HubCacheGC消息。 | |||
| func init() { | |||
| RegisterMessageConvertor(NewHubShardStoreGC) | |||
| } | |||