diff --git a/client/internal/cmdline/scanner.go b/client/internal/cmdline/scanner.go index ff3f55f..3a13b7f 100644 --- a/client/internal/cmdline/scanner.go +++ b/client/internal/cmdline/scanner.go @@ -25,9 +25,9 @@ func ScannerPostEvent(ctx CommandContext, args []string) error { } func init() { - parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCacheGC, reflect2.TypeNameOf[scevt.AgentCacheGC]()) + parseScannerEventCmdTrie.MustAdd(scevt.NewAgentShardStoreGC, reflect2.TypeNameOf[scevt.AgentShardStoreGC]()) - parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCheckCache, reflect2.TypeNameOf[scevt.AgentCheckCache]()) + parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCheckShardStore, reflect2.TypeNameOf[scevt.AgentCheckShardStore]()) parseScannerEventCmdTrie.MustAdd(scevt.NewAgentCheckState, reflect2.TypeNameOf[scevt.AgentCheckState]()) diff --git a/client/internal/http/node.go b/client/internal/http/hub.go similarity index 100% rename from client/internal/http/node.go rename to client/internal/http/hub.go diff --git a/client/internal/services/node.go b/client/internal/services/hub.go similarity index 100% rename from client/internal/services/node.go rename to client/internal/services/hub.go diff --git a/common/pkgs/db2/node.go b/common/pkgs/db2/hub.go similarity index 100% rename from common/pkgs/db2/node.go rename to common/pkgs/db2/hub.go diff --git a/common/pkgs/db2/node_connectivity.go b/common/pkgs/db2/hub_connectivity.go similarity index 100% rename from common/pkgs/db2/node_connectivity.go rename to common/pkgs/db2/hub_connectivity.go diff --git a/common/pkgs/distlock/lockprovider/ipfs_lock.go b/common/pkgs/distlock/lockprovider/shard_store.go similarity index 50% rename from common/pkgs/distlock/lockprovider/ipfs_lock.go rename to common/pkgs/distlock/lockprovider/shard_store.go index aada660..f5aaaa1 100644 --- a/common/pkgs/distlock/lockprovider/ipfs_lock.go +++ b/common/pkgs/distlock/lockprovider/shard_store.go @@ -8,27 +8,27 @@ import ( ) const ( - IPFSLockPathPrefix = "IPFS" - IPFSHubIDPathIndex = 1 - IPFSBuzyLock = "Buzy" - IPFSGCLock = "GC" + ShardStoreLockPathPrefix = "ShardStore" + ShardStoreStorageIDPathIndex = 1 + ShardStoreBuzyLock = "Buzy" + ShardStoreGCLock = "GC" ) -type IPFSLock struct { - nodeLocks map[string]*IPFSNodeLock - dummyLock *IPFSNodeLock +type ShardStoreLock struct { + stgLocks map[string]*ShardStoreStorageLock + dummyLock *ShardStoreStorageLock } -func NewIPFSLock() *IPFSLock { - return &IPFSLock{ - nodeLocks: make(map[string]*IPFSNodeLock), - dummyLock: NewIPFSNodeLock(), +func NewShardStoreLock() *ShardStoreLock { + return &ShardStoreLock{ + stgLocks: make(map[string]*ShardStoreStorageLock), + dummyLock: NewShardStoreStorageLock(), } } // CanLock 判断这个锁能否锁定成功 -func (l *IPFSLock) CanLock(lock distlock.Lock) error { - nodeLock, ok := l.nodeLocks[lock.Path[IPFSHubIDPathIndex]] +func (l *ShardStoreLock) CanLock(lock distlock.Lock) error { + nodeLock, ok := l.stgLocks[lock.Path[ShardStoreStorageIDPathIndex]] if !ok { // 不能直接返回nil,因为如果锁数据的格式不对,也不能获取锁。 // 这里使用一个空Provider来进行检查。 @@ -39,23 +39,23 @@ func (l *IPFSLock) CanLock(lock distlock.Lock) error { } // 锁定。在内部可以不用判断能否加锁,外部需要保证调用此函数前调用了CanLock进行检查 -func (l *IPFSLock) Lock(reqID string, lock distlock.Lock) error { - hubID := lock.Path[IPFSHubIDPathIndex] +func (l *ShardStoreLock) Lock(reqID string, lock distlock.Lock) error { + stgID := lock.Path[ShardStoreStorageIDPathIndex] - nodeLock, ok := l.nodeLocks[hubID] + nodeLock, ok := l.stgLocks[stgID] if !ok { - nodeLock = NewIPFSNodeLock() - l.nodeLocks[hubID] = nodeLock + nodeLock = NewShardStoreStorageLock() + l.stgLocks[stgID] = nodeLock } return nodeLock.Lock(reqID, lock) } // 解锁 -func (l *IPFSLock) Unlock(reqID string, lock distlock.Lock) error { - hubID := lock.Path[IPFSHubIDPathIndex] +func (l *ShardStoreLock) Unlock(reqID string, lock distlock.Lock) error { + stgID := lock.Path[ShardStoreStorageIDPathIndex] - nodeLock, ok := l.nodeLocks[hubID] + nodeLock, ok := l.stgLocks[stgID] if !ok { return nil } @@ -64,38 +64,38 @@ func (l *IPFSLock) Unlock(reqID string, lock distlock.Lock) error { } // GetTargetString 将锁对象序列化为字符串,方便存储到ETCD -func (l *IPFSLock) GetTargetString(target any) (string, error) { +func (l *ShardStoreLock) GetTargetString(target any) (string, error) { tar := target.(StringLockTarget) return StringLockTargetToString(&tar) } // ParseTargetString 解析字符串格式的锁对象数据 -func (l *IPFSLock) ParseTargetString(targetStr string) (any, error) { +func (l *ShardStoreLock) ParseTargetString(targetStr string) (any, error) { return StringLockTargetFromString(targetStr) } // Clear 清除内部所有状态 -func (l *IPFSLock) Clear() { - l.nodeLocks = make(map[string]*IPFSNodeLock) +func (l *ShardStoreLock) Clear() { + l.stgLocks = make(map[string]*ShardStoreStorageLock) } -type IPFSNodeLock struct { +type ShardStoreStorageLock struct { buzyReqIDs []string gcReqIDs []string lockCompatibilityTable *LockCompatibilityTable } -func NewIPFSNodeLock() *IPFSNodeLock { +func NewShardStoreStorageLock() *ShardStoreStorageLock { compTable := &LockCompatibilityTable{} - ipfsLock := IPFSNodeLock{ + sdLock := ShardStoreStorageLock{ lockCompatibilityTable: compTable, } compTable. - Column(IPFSBuzyLock, func() bool { return len(ipfsLock.buzyReqIDs) > 0 }). - Column(IPFSGCLock, func() bool { return len(ipfsLock.gcReqIDs) > 0 }) + Column(ShardStoreBuzyLock, func() bool { return len(sdLock.buzyReqIDs) > 0 }). + Column(ShardStoreGCLock, func() bool { return len(sdLock.gcReqIDs) > 0 }) comp := LockCompatible() uncp := LockUncompatible() @@ -103,20 +103,20 @@ func NewIPFSNodeLock() *IPFSNodeLock { compTable.MustRow(comp, uncp) compTable.MustRow(uncp, comp) - return &ipfsLock + return &sdLock } // CanLock 判断这个锁能否锁定成功 -func (l *IPFSNodeLock) CanLock(lock distlock.Lock) error { +func (l *ShardStoreStorageLock) CanLock(lock distlock.Lock) error { return l.lockCompatibilityTable.Test(lock) } // 锁定 -func (l *IPFSNodeLock) Lock(reqID string, lock distlock.Lock) error { +func (l *ShardStoreStorageLock) Lock(reqID string, lock distlock.Lock) error { switch lock.Name { - case IPFSBuzyLock: + case ShardStoreBuzyLock: l.buzyReqIDs = append(l.buzyReqIDs, reqID) - case IPFSGCLock: + case ShardStoreGCLock: l.gcReqIDs = append(l.gcReqIDs, reqID) default: return fmt.Errorf("unknow lock name: %s", lock.Name) @@ -126,11 +126,11 @@ func (l *IPFSNodeLock) Lock(reqID string, lock distlock.Lock) error { } // 解锁 -func (l *IPFSNodeLock) Unlock(reqID string, lock distlock.Lock) error { +func (l *ShardStoreStorageLock) Unlock(reqID string, lock distlock.Lock) error { switch lock.Name { - case IPFSBuzyLock: + case ShardStoreBuzyLock: l.buzyReqIDs = lo2.Remove(l.buzyReqIDs, reqID) - case IPFSGCLock: + case ShardStoreGCLock: l.gcReqIDs = lo2.Remove(l.gcReqIDs, reqID) default: return fmt.Errorf("unknow lock name: %s", lock.Name) diff --git a/common/pkgs/distlock/lockprovider/ipfs_lock_test.go b/common/pkgs/distlock/lockprovider/shard_store_test.go similarity index 62% rename from common/pkgs/distlock/lockprovider/ipfs_lock_test.go rename to common/pkgs/distlock/lockprovider/shard_store_test.go index 0ec9f8a..c38854d 100644 --- a/common/pkgs/distlock/lockprovider/ipfs_lock_test.go +++ b/common/pkgs/distlock/lockprovider/shard_store_test.go @@ -7,7 +7,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/distlock" ) -func Test_IPFSLock(t *testing.T) { +func Test_ShardStoreLock(t *testing.T) { cases := []struct { title string initLocks []distlock.Lock @@ -18,13 +18,13 @@ func Test_IPFSLock(t *testing.T) { title: "同节点,同一个Buzy锁", initLocks: []distlock.Lock{ { - Path: []string{IPFSLockPathPrefix, "hub1"}, - Name: IPFSBuzyLock, + Path: []string{ShardStoreLockPathPrefix, "hub1"}, + Name: ShardStoreBuzyLock, }, }, doLock: distlock.Lock{ - Path: []string{IPFSLockPathPrefix, "hub1"}, - Name: IPFSBuzyLock, + Path: []string{ShardStoreLockPathPrefix, "hub1"}, + Name: ShardStoreBuzyLock, }, wantOK: true, }, @@ -32,13 +32,13 @@ func Test_IPFSLock(t *testing.T) { title: "同节点,同一个GC锁", initLocks: []distlock.Lock{ { - Path: []string{IPFSLockPathPrefix, "hub1"}, - Name: IPFSGCLock, + Path: []string{ShardStoreLockPathPrefix, "hub1"}, + Name: ShardStoreGCLock, }, }, doLock: distlock.Lock{ - Path: []string{IPFSLockPathPrefix, "hub1"}, - Name: IPFSGCLock, + Path: []string{ShardStoreLockPathPrefix, "hub1"}, + Name: ShardStoreGCLock, }, wantOK: true, }, @@ -46,14 +46,14 @@ func Test_IPFSLock(t *testing.T) { title: "同时设置Buzy和GC", initLocks: []distlock.Lock{ { - Path: []string{IPFSLockPathPrefix, "hub1"}, - Name: IPFSBuzyLock, + Path: []string{ShardStoreLockPathPrefix, "hub1"}, + Name: ShardStoreBuzyLock, Target: *NewStringLockTarget(), }, }, doLock: distlock.Lock{ - Path: []string{IPFSLockPathPrefix, "hub1"}, - Name: IPFSGCLock, + Path: []string{ShardStoreLockPathPrefix, "hub1"}, + Name: ShardStoreGCLock, Target: *NewStringLockTarget(), }, wantOK: false, @@ -62,7 +62,7 @@ func Test_IPFSLock(t *testing.T) { for _, ca := range cases { Convey(ca.title, t, func() { - ipfsLock := NewIPFSLock() + ipfsLock := NewShardStoreLock() for _, l := range ca.initLocks { ipfsLock.Lock("req1", l) @@ -78,11 +78,11 @@ func Test_IPFSLock(t *testing.T) { } Convey("解锁", t, func() { - ipfsLock := NewIPFSLock() + ipfsLock := NewShardStoreLock() lock := distlock.Lock{ - Path: []string{IPFSLockPathPrefix, "hub1"}, - Name: IPFSBuzyLock, + Path: []string{ShardStoreLockPathPrefix, "hub1"}, + Name: ShardStoreBuzyLock, } ipfsLock.Lock("req1", lock) @@ -93,8 +93,8 @@ func Test_IPFSLock(t *testing.T) { ipfsLock.Unlock("req1", lock) lock = distlock.Lock{ - Path: []string{IPFSLockPathPrefix, "hub1"}, - Name: IPFSGCLock, + Path: []string{ShardStoreLockPathPrefix, "hub1"}, + Name: ShardStoreGCLock, } err = ipfsLock.CanLock(lock) So(err, ShouldBeNil) diff --git a/common/pkgs/distlock/reqbuilder/ipfs.go b/common/pkgs/distlock/reqbuilder/shard_store.go similarity index 84% rename from common/pkgs/distlock/reqbuilder/ipfs.go rename to common/pkgs/distlock/reqbuilder/shard_store.go index 2942830..23c631a 100644 --- a/common/pkgs/distlock/reqbuilder/ipfs.go +++ b/common/pkgs/distlock/reqbuilder/shard_store.go @@ -18,7 +18,7 @@ func (b *LockRequestBuilder) Shard() *ShardStoreLockReqBuilder { func (b *ShardStoreLockReqBuilder) Buzy(stgID cdssdk.StorageID) *ShardStoreLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(stgID), - Name: lockprovider.IPFSBuzyLock, + Name: lockprovider.ShardStoreBuzyLock, Target: *lockprovider.NewStringLockTarget(), }) return b @@ -27,12 +27,12 @@ func (b *ShardStoreLockReqBuilder) Buzy(stgID cdssdk.StorageID) *ShardStoreLockR func (b *ShardStoreLockReqBuilder) GC(stgID cdssdk.StorageID) *ShardStoreLockReqBuilder { b.locks = append(b.locks, distlock.Lock{ Path: b.makePath(stgID), - Name: lockprovider.IPFSGCLock, + Name: lockprovider.ShardStoreGCLock, Target: *lockprovider.NewStringLockTarget(), }) return b } func (b *ShardStoreLockReqBuilder) makePath(hubID cdssdk.StorageID) []string { - return []string{lockprovider.IPFSLockPathPrefix, strconv.FormatInt(int64(hubID), 10)} + return []string{lockprovider.ShardStoreLockPathPrefix, strconv.FormatInt(int64(hubID), 10)} } diff --git a/common/pkgs/distlock/service.go b/common/pkgs/distlock/service.go index 2e4029c..0c1333c 100644 --- a/common/pkgs/distlock/service.go +++ b/common/pkgs/distlock/service.go @@ -52,7 +52,7 @@ func initMetadataLockProviders() []distlock.PathProvider { func initIPFSLockProviders() []distlock.PathProvider { return []distlock.PathProvider{ - distlock.NewPathProvider(lockprovider.NewIPFSLock(), lockprovider.IPFSLockPathPrefix, trie.WORD_ANY), + distlock.NewPathProvider(lockprovider.NewShardStoreLock(), lockprovider.ShardStoreLockPathPrefix, trie.WORD_ANY), } } diff --git a/common/pkgs/mq/coordinator/node.go b/common/pkgs/mq/coordinator/hub.go similarity index 100% rename from common/pkgs/mq/coordinator/node.go rename to common/pkgs/mq/coordinator/hub.go diff --git a/common/pkgs/mq/scanner/event/agent_check_cache.go b/common/pkgs/mq/scanner/event/agent_check_shardstore.go similarity index 50% rename from common/pkgs/mq/scanner/event/agent_check_cache.go rename to common/pkgs/mq/scanner/event/agent_check_shardstore.go index 508a466..6494b4e 100644 --- a/common/pkgs/mq/scanner/event/agent_check_cache.go +++ b/common/pkgs/mq/scanner/event/agent_check_shardstore.go @@ -2,17 +2,17 @@ package event import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -type AgentCheckCache struct { +type AgentCheckShardStore struct { EventBase StorageID cdssdk.StorageID `json:"storageID"` } -func NewAgentCheckCache(stgID cdssdk.StorageID) *AgentCheckCache { - return &AgentCheckCache{ +func NewAgentCheckShardStore(stgID cdssdk.StorageID) *AgentCheckShardStore { + return &AgentCheckShardStore{ StorageID: stgID, } } func init() { - Register[*AgentCheckCache]() + Register[*AgentCheckShardStore]() } diff --git a/common/pkgs/mq/scanner/event/agent_cache_gc.go b/common/pkgs/mq/scanner/event/agent_shardstore_gc.go similarity index 52% rename from common/pkgs/mq/scanner/event/agent_cache_gc.go rename to common/pkgs/mq/scanner/event/agent_shardstore_gc.go index 68112b6..868b8d0 100644 --- a/common/pkgs/mq/scanner/event/agent_cache_gc.go +++ b/common/pkgs/mq/scanner/event/agent_shardstore_gc.go @@ -2,17 +2,17 @@ package event import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" -type AgentCacheGC struct { +type AgentShardStoreGC struct { EventBase StorageID cdssdk.StorageID `json:"storageID"` } -func NewAgentCacheGC(stgID cdssdk.StorageID) *AgentCacheGC { - return &AgentCacheGC{ +func NewAgentShardStoreGC(stgID cdssdk.StorageID) *AgentShardStoreGC { + return &AgentShardStoreGC{ StorageID: stgID, } } func init() { - Register[*AgentCacheGC]() + Register[*AgentShardStoreGC]() } diff --git a/coordinator/internal/mq/node.go b/coordinator/internal/mq/hub.go similarity index 100% rename from coordinator/internal/mq/node.go rename to coordinator/internal/mq/hub.go diff --git a/scanner/internal/event/agent_check_cache.go b/scanner/internal/event/agent_check_shardstore.go similarity index 77% rename from scanner/internal/event/agent_check_cache.go rename to scanner/internal/event/agent_check_shardstore.go index 868d570..e3447ff 100644 --- a/scanner/internal/event/agent_check_cache.go +++ b/scanner/internal/event/agent_check_shardstore.go @@ -14,22 +14,22 @@ import ( scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) -// AgentCheckCache 代表一个用于处理代理缓存检查事件的结构体 -type AgentCheckCache struct { - *scevt.AgentCheckCache +// AgentCheckShardStore 代表一个用于处理代理缓存检查事件的结构体 +type AgentCheckShardStore struct { + *scevt.AgentCheckShardStore } -// NewAgentCheckCache 创建一个新的 AgentCheckCache 实例 -func NewAgentCheckCache(evt *scevt.AgentCheckCache) *AgentCheckCache { - return &AgentCheckCache{ - AgentCheckCache: evt, +// NewAgentCheckShardStore 创建一个新的 AgentCheckCache 实例 +func NewAgentCheckShardStore(evt *scevt.AgentCheckShardStore) *AgentCheckShardStore { + return &AgentCheckShardStore{ + AgentCheckShardStore: evt, } } // TryMerge 尝试合并当前事件与另一个事件 // 如果另一个事件类型不匹配或节点ID不同,则不进行合并 -func (t *AgentCheckCache) TryMerge(other Event) bool { - event, ok := other.(*AgentCheckCache) +func (t *AgentCheckShardStore) TryMerge(other Event) bool { + event, ok := other.(*AgentCheckShardStore) if !ok { return false } @@ -42,10 +42,10 @@ func (t *AgentCheckCache) TryMerge(other Event) bool { } // Execute 执行缓存检查操作,对比本地缓存与代理返回的缓存信息,更新数据库中的缓存记录 -func (t *AgentCheckCache) Execute(execCtx ExecuteContext) { - log := logger.WithType[AgentCheckCache]("Event") +func (t *AgentCheckShardStore) Execute(execCtx ExecuteContext) { + log := logger.WithType[AgentCheckShardStore]("Event") startTime := time.Now() - log.Debugf("begin with %v", logger.FormatStruct(t.AgentCheckCache)) + log.Debugf("begin with %v", logger.FormatStruct(t.AgentCheckShardStore)) defer func() { log.Debugf("end, time: %v", time.Since(startTime)) }() @@ -83,8 +83,8 @@ func (t *AgentCheckCache) Execute(execCtx ExecuteContext) { } // checkCache 对比Cache表中的记录,根据实际存在的文件哈希值,进行增加或删除操作 -func (t *AgentCheckCache) checkCache(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { - log := logger.WithType[AgentCheckCache]("Event") +func (t *AgentCheckShardStore) checkCache(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { + log := logger.WithType[AgentCheckShardStore]("Event") caches, err := execCtx.Args.DB.Cache().GetByStorageID(tx, t.StorageID) if err != nil { @@ -123,8 +123,8 @@ func (t *AgentCheckCache) checkCache(execCtx ExecuteContext, tx db2.SQLContext, } // checkPinnedObject 对比PinnedObject表,若实际文件不存在,则进行删除操作 -func (t *AgentCheckCache) checkPinnedObject(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { - log := logger.WithType[AgentCheckCache]("Event") +func (t *AgentCheckShardStore) checkPinnedObject(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { + log := logger.WithType[AgentCheckShardStore]("Event") objs, err := execCtx.Args.DB.PinnedObject().GetObjectsByStorageID(tx, t.StorageID) if err != nil { @@ -149,8 +149,8 @@ func (t *AgentCheckCache) checkPinnedObject(execCtx ExecuteContext, tx db2.SQLCo } // checkObjectBlock 对比ObjectBlock表,若实际文件不存在,则进行删除操作 -func (t *AgentCheckCache) checkObjectBlock(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { - log := logger.WithType[AgentCheckCache]("Event") +func (t *AgentCheckShardStore) checkObjectBlock(execCtx ExecuteContext, tx db2.SQLContext, realFileHashes map[cdssdk.FileHash]bool) { + log := logger.WithType[AgentCheckShardStore]("Event") blocks, err := execCtx.Args.DB.ObjectBlock().GetByStorageID(tx, t.StorageID) if err != nil { @@ -176,5 +176,5 @@ func (t *AgentCheckCache) checkObjectBlock(execCtx ExecuteContext, tx db2.SQLCon // init 注册AgentCheckCache消息转换器 func init() { - RegisterMessageConvertor(NewAgentCheckCache) + RegisterMessageConvertor(NewAgentCheckShardStore) } diff --git a/scanner/internal/event/agent_cache_gc.go b/scanner/internal/event/agent_shardstore_gc.go similarity index 81% rename from scanner/internal/event/agent_cache_gc.go rename to scanner/internal/event/agent_shardstore_gc.go index bf1b301..6093265 100644 --- a/scanner/internal/event/agent_cache_gc.go +++ b/scanner/internal/event/agent_shardstore_gc.go @@ -15,24 +15,24 @@ import ( scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" ) -// AgentCacheGC 类封装了扫描器事件中的AgentCacheGC结构。 -type AgentCacheGC struct { - *scevt.AgentCacheGC +// AgentShardStoreGC 类封装了扫描器事件中的AgentShardStoreGC结构。 +type AgentShardStoreGC struct { + *scevt.AgentShardStoreGC } -// NewAgentCacheGC 创建一个新的AgentCacheGC实例。 +// NewAgentShardStoreGC 创建一个新的AgentCacheGC实例。 // evt: 传入的扫描器事件中的AgentCacheGC实例。 -func NewAgentCacheGC(evt *scevt.AgentCacheGC) *AgentCacheGC { - return &AgentCacheGC{ - AgentCacheGC: evt, +func NewAgentShardStoreGC(evt *scevt.AgentShardStoreGC) *AgentShardStoreGC { + return &AgentShardStoreGC{ + AgentShardStoreGC: evt, } } // TryMerge 尝试合并当前事件与另一个事件。 // other: 待合并的另一个事件。 // 返回值表示是否成功合并。 -func (t *AgentCacheGC) TryMerge(other Event) bool { - event, ok := other.(*AgentCacheGC) +func (t *AgentShardStoreGC) TryMerge(other Event) bool { + event, ok := other.(*AgentShardStoreGC) if !ok { return false } @@ -46,10 +46,10 @@ func (t *AgentCacheGC) TryMerge(other Event) bool { // Execute 执行垃圾回收操作。 // execCtx: 执行上下文,包含执行所需的各种参数和环境。 -func (t *AgentCacheGC) Execute(execCtx ExecuteContext) { - log := logger.WithType[AgentCacheGC]("Event") +func (t *AgentShardStoreGC) Execute(execCtx ExecuteContext) { + log := logger.WithType[AgentShardStoreGC]("Event") startTime := time.Now() - log.Debugf("begin with %v", logger.FormatStruct(t.AgentCacheGC)) + log.Debugf("begin with %v", logger.FormatStruct(t.AgentShardStoreGC)) defer func() { log.Debugf("end, time: %v", time.Since(startTime)) }() @@ -120,5 +120,5 @@ func (t *AgentCacheGC) Execute(execCtx ExecuteContext) { // 注册消息转换器,使系统能够处理AgentCacheGC消息。 func init() { - RegisterMessageConvertor(NewAgentCacheGC) + RegisterMessageConvertor(NewAgentShardStoreGC) } diff --git a/scanner/internal/tickevent/batch_all_agent_check_cache.go b/scanner/internal/tickevent/batch_all_agent_check_shardstore.go similarity index 65% rename from scanner/internal/tickevent/batch_all_agent_check_cache.go rename to scanner/internal/tickevent/batch_all_agent_check_shardstore.go index 252e5d6..73cac98 100644 --- a/scanner/internal/tickevent/batch_all_agent_check_cache.go +++ b/scanner/internal/tickevent/batch_all_agent_check_shardstore.go @@ -9,20 +9,20 @@ import ( const AGENT_CHECK_CACHE_BATCH_SIZE = 2 -type BatchAllAgentCheckCache struct { +type BatchAllAgentCheckShardStore struct { stgIDs []cdssdk.StorageID } -func NewBatchAllAgentCheckCache() *BatchAllAgentCheckCache { - return &BatchAllAgentCheckCache{} +func NewBatchAllAgentCheckShardStore() *BatchAllAgentCheckShardStore { + return &BatchAllAgentCheckShardStore{} } -func (e *BatchAllAgentCheckCache) Execute(ctx ExecuteContext) { - log := logger.WithType[BatchAllAgentCheckCache]("TickEvent") +func (e *BatchAllAgentCheckShardStore) Execute(ctx ExecuteContext) { + log := logger.WithType[BatchAllAgentCheckShardStore]("TickEvent") log.Debugf("begin") defer log.Debugf("end") - if e.stgIDs == nil || len(e.stgIDs) == 0 { + if len(e.stgIDs) == 0 { ids, err := ctx.Args.DB.Storage().GetAllIDs(ctx.Args.DB.DefCtx()) if err != nil { log.Warnf("get all storages failed, err: %s", err.Error()) @@ -36,7 +36,7 @@ func (e *BatchAllAgentCheckCache) Execute(ctx ExecuteContext) { checkedCnt := 0 for ; checkedCnt < len(e.stgIDs) && checkedCnt < AGENT_CHECK_CACHE_BATCH_SIZE; checkedCnt++ { // nil代表进行全量检查 - ctx.Args.EventExecutor.Post(event.NewAgentCheckCache(scevt.NewAgentCheckCache(e.stgIDs[checkedCnt]))) + ctx.Args.EventExecutor.Post(event.NewAgentCheckShardStore(scevt.NewAgentCheckShardStore(e.stgIDs[checkedCnt]))) } e.stgIDs = e.stgIDs[checkedCnt:] } diff --git a/scanner/main.go b/scanner/main.go index 8e71558..c35b1be 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -139,7 +139,7 @@ func startTickEvent(tickExecutor *tickevent.Executor) { interval := 5 * 60 * 1000 - tickExecutor.Start(tickevent.NewBatchAllAgentCheckCache(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) + tickExecutor.Start(tickevent.NewBatchAllAgentCheckShardStore(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000}) tickExecutor.Start(tickevent.NewBatchCheckAllPackage(), interval, tickevent.StartOption{RandomStartDelayMs: 60 * 1000})