From b954e4e38128da1d03247d097cd3c6c08a9272dd Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Thu, 13 Feb 2025 14:56:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=E4=BA=8B=E4=BB=B6=E7=9A=84=E4=BA=A7=E7=94=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent/internal/cmd/serve.go | 9 + agent/internal/tickevent/report_hub_stats.go | 7 +- client/internal/cmdline/sysevent.go | 60 ++++ client/internal/services/object.go | 2 + common/models/datamap.go | 221 ++++++++++--- common/pkgs/db2/object.go | 82 +---- common/pkgs/ioswitch2/agent_worker.go | 2 +- common/pkgs/ioswitch2/http_hub_worker.go | 2 +- common/pkgs/mq/coordinator/object.go | 7 +- common/pkgs/mq/coordinator/package.go | 8 +- common/pkgs/sysevent/watcher.go | 14 + common/pkgs/uploader/create_load.go | 2 +- common/pkgs/uploader/update.go | 2 +- coordinator/internal/cmd/serve.go | 48 ++- coordinator/internal/mq/bucket.go | 9 + coordinator/internal/mq/object.go | 146 ++++++++- coordinator/internal/mq/package.go | 51 ++- coordinator/internal/mq/service.go | 9 +- coordinator/internal/mq/utils.go | 23 ++ datamap/internal/models/blockdistribution.go | 2 +- datamap/internal/models/object.go | 2 +- .../event/check_package_redundancy.go | 294 ++++++++++++++++-- scanner/internal/event/clean_pinned.go | 192 +++++++++++- scanner/internal/event/event.go | 5 +- scanner/main.go | 48 ++- 25 files changed, 1033 insertions(+), 214 deletions(-) create mode 100644 client/internal/cmdline/sysevent.go create mode 100644 coordinator/internal/mq/utils.go diff --git a/agent/internal/cmd/serve.go b/agent/internal/cmd/serve.go index c2f21de..654db1a 100644 --- a/agent/internal/cmd/serve.go +++ b/agent/internal/cmd/serve.go @@ -55,6 +55,8 @@ func serve(configPath string) { stgglb.InitLocal(&config.Cfg().Local) stgglb.InitMQPool(config.Cfg().RabbitMQ) stgglb.InitAgentRPCPool(&agtrpc.PoolConfig{}) + stgglb.Stats.SetupHubStorageTransfer(*config.Cfg().Local.HubID) + stgglb.Stats.SetupHubTransfer(*config.Cfg().Local.HubID) // 获取Hub配置 hubCfg := downloadHubConfig() @@ -159,6 +161,7 @@ func serve(configPath string) { // 初始化定时任务执行器 sch := setupTickTask(stgAgts, evtPub) + sch.Start() defer sch.Shutdown() // 启动命令服务器 @@ -259,6 +262,12 @@ func setupTickTask(agtPool *agtpool.AgentPool, evtPub *sysevent.Publisher) gocro gocron.NewAtTime(0, 0, 2), )), gocron.NewTask(tickevent.ReportHubStorageTransferStats, evtPub)) + // sch.NewJob(gocron.DurationJob(time.Minute), gocron.NewTask(tickevent.ReportStorageStats, agtPool, evtPub)) + + // sch.NewJob(gocron.DurationJob(time.Minute), gocron.NewTask(tickevent.ReportHubTransferStats, evtPub)) + + // sch.NewJob(gocron.DurationJob(time.Minute), gocron.NewTask(tickevent.ReportHubStorageTransferStats, agtPool, evtPub)) + return sch } diff --git a/agent/internal/tickevent/report_hub_stats.go b/agent/internal/tickevent/report_hub_stats.go index 7c3c6d1..4c744f3 100644 --- a/agent/internal/tickevent/report_hub_stats.go +++ b/agent/internal/tickevent/report_hub_stats.go @@ -1,6 +1,7 @@ package tickevent import ( + "gitlink.org.cn/cloudream/common/utils/math2" stgglb "gitlink.org.cn/cloudream/storage/common/globals" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" @@ -23,7 +24,7 @@ func ReportHubTransferStats(evtPub *sysevent.Publisher) { TotalTransfer: entry.OutputBytes, RequestCount: entry.TotalOutput, FailedRequestCount: entry.TotalInput - entry.SuccessInput, - AvgTransfer: entry.OutputBytes / entry.TotalOutput, + AvgTransfer: math2.DivOrDefault(entry.OutputBytes, entry.TotalOutput, 0), MinTransfer: entry.MinOutputBytes, MaxTransfer: entry.MaxOutputBytes, }, @@ -49,7 +50,9 @@ func ReportHubStorageTransferStats(stgAgts *agtpool.AgentPool, evtPub *sysevent. TotalTransfer: stg.OutputBytes, RequestCount: stg.TotalOutput, FailedRequestCount: stg.TotalInput - stg.SuccessInput, - AvgTransfer: stg.OutputBytes / stg.TotalOutput, + AvgTransfer: math2.DivOrDefault(stg.OutputBytes, stg.TotalOutput, 0), + MinTransfer: stg.MinOutputBytes, + MaxTransfer: stg.MaxOutputBytes, }, StartTimestamp: data.StartTime, EndTimestamp: endTime, diff --git a/client/internal/cmdline/sysevent.go b/client/internal/cmdline/sysevent.go new file mode 100644 index 0000000..a0bd0bb --- /dev/null +++ b/client/internal/cmdline/sysevent.go @@ -0,0 +1,60 @@ +package cmdline + +import ( + "context" + "fmt" + + "github.com/spf13/cobra" + "gitlink.org.cn/cloudream/storage/client/internal/config" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" +) + +func init() { + cmd := &cobra.Command{ + Use: "sysevent", + } + + rootCmd.AddCommand(cmd) + + cmd.AddCommand(&cobra.Command{ + Use: "watch", + Short: "Watch system events", + Run: func(cmd *cobra.Command, args []string) { + watchSysEvent(GetCmdCtx(cmd)) + }, + }) +} + +func watchSysEvent(cmdCtx *CommandContext) { + host, err := sysevent.NewWatcherHost(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ)) + if err != nil { + fmt.Println(err) + return + } + + ch := host.Start() + host.AddWatcherFn(func(event sysevent.SysEvent) { + fmt.Println(event.String()) + }) + for { + e, err := ch.Receive().Wait(context.Background()) + if err != nil { + fmt.Println(err) + return + } + + switch e := e.(type) { + case sysevent.PublishError: + fmt.Printf("Publish error: %v\n", e.Err) + + case sysevent.PublisherExited: + if e.Err != nil { + fmt.Printf("Publisher exited with error: %v\n", e.Err) + } + return + + case sysevent.OtherError: + fmt.Printf("Other error: %v\n", e.Err) + } + } +} diff --git a/client/internal/services/object.go b/client/internal/services/object.go index b155ece..4c2478d 100644 --- a/client/internal/services/object.go +++ b/client/internal/services/object.go @@ -64,6 +64,8 @@ func (svc *ObjectService) UpdateInfo(userID cdssdk.UserID, updatings []cdsapi.Up return nil, fmt.Errorf("requsting to coodinator: %w", err) } + // TODO 考虑产生Update事件 + return resp.Successes, nil } diff --git a/common/models/datamap.go b/common/models/datamap.go index c61b4ac..5b3b47a 100644 --- a/common/models/datamap.go +++ b/common/models/datamap.go @@ -1,6 +1,7 @@ package stgmod import ( + "fmt" "time" "gitlink.org.cn/cloudream/common/pkgs/types" @@ -15,6 +16,10 @@ type SysEvent struct { Body SysEventBody `json:"body"` } +func (e *SysEvent) String() string { + return fmt.Sprintf("%v [%v] %+v", e.Timestamp.Format("2006-01-02 15:04:05"), e.Source, e.Body) +} + // 事件源 type SysEventSource interface { GetSourceType() string @@ -35,6 +40,14 @@ func (s *SourceCoordinator) GetSourceType() string { return "Coordinator" } +func (s *SourceCoordinator) OnUnionSerializing() { + s.Type = s.GetSourceType() +} + +func (s *SourceCoordinator) String() string { + return "Coordinator" +} + type SourceScanner struct { serder.Metadata `union:"Scanner"` Type string `json:"type"` @@ -44,6 +57,14 @@ func (s *SourceScanner) GetSourceType() string { return "Scanner" } +func (s *SourceScanner) OnUnionSerializing() { + s.Type = s.GetSourceType() +} + +func (s *SourceScanner) String() string { + return "Scanner" +} + type SourceHub struct { serder.Metadata `union:"Hub"` Type string `json:"type"` @@ -55,6 +76,14 @@ func (s *SourceHub) GetSourceType() string { return "Hub" } +func (s *SourceHub) OnUnionSerializing() { + s.Type = s.GetSourceType() +} + +func (s *SourceHub) String() string { + return fmt.Sprintf("Hub(%d, %s)", s.HubID, s.HubName) +} + // 事件体 type SysEventBody interface { GetBodyType() string @@ -75,11 +104,12 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SysEven (*BodyBlockTransfer)(nil), (*BodyBlockDistribution)(nil), - (*BodyNewObject)(nil), - (*BodyObjectUpdated)(nil), + (*BodyNewOrUpdateObject)(nil), + (*BodyObjectInfoUpdated)(nil), (*BodyObjectDeleted)(nil), (*BodyNewPackage)(nil), + (*BodyPackageCloned)(nil), (*BodyPackageDeleted)(nil), (*BodyNewBucket)(nil), @@ -97,6 +127,10 @@ func (b *BodyNewHub) GetBodyType() string { return "NewHub" } +func (b *BodyNewHub) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // Hub信息更新的事件 type BodyHubUpdated struct { serder.Metadata `union:"HubUpdated"` @@ -108,6 +142,10 @@ func (b *BodyHubUpdated) GetBodyType() string { return "HubUpdated" } +func (b *BodyHubUpdated) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // Hub删除的事件 type BodyHubDeleted struct { serder.Metadata `union:"HubDeleted"` @@ -119,6 +157,10 @@ func (b *BodyHubDeleted) GetBodyType() string { return "HubDeleted" } +func (b *BodyHubDeleted) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // 新增Storage的事件 type BodyNewStorage struct { serder.Metadata `union:"NewStorage"` @@ -130,6 +172,10 @@ func (b *BodyNewStorage) GetBodyType() string { return "NewStorage" } +func (b *BodyNewStorage) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // Storage信息更新的事件 type BodyStorageUpdated struct { serder.Metadata `union:"StorageUpdated"` @@ -141,6 +187,10 @@ func (b *BodyStorageUpdated) GetBodyType() string { return "StorageUpdated" } +func (b *BodyStorageUpdated) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // Storage删除的事件 type BodyStorageDeleted struct { serder.Metadata `union:"StorageDeleted"` @@ -152,6 +202,10 @@ func (b *BodyStorageDeleted) GetBodyType() string { return "StorageDeleted" } +func (b *BodyStorageDeleted) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // Storage统计信息的事件 type BodyStorageStats struct { serder.Metadata `union:"StorageStats"` @@ -164,6 +218,10 @@ func (b *BodyStorageStats) GetBodyType() string { return "StorageStats" } +func (b *BodyStorageStats) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // Hub数据传输统计信息的事件 type BodyHubTransferStats struct { serder.Metadata `union:"HubTransferStats"` @@ -179,6 +237,10 @@ func (b *BodyHubTransferStats) GetBodyType() string { return "HubTransferStats" } +func (b *BodyHubTransferStats) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + type DataTrans struct { TotalTransfer int64 `json:"totalTransfer"` RequestCount int64 `json:"requestCount"` @@ -204,6 +266,10 @@ func (b *BodyHubStorageTransferStats) GetBodyType() string { return "HubStorageTransferStats" } +func (b *BodyHubStorageTransferStats) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // 块传输的事件 type BodyBlockTransfer struct { serder.Metadata `union:"BlockTransfer"` @@ -217,6 +283,10 @@ func (b *BodyBlockTransfer) GetBodyType() string { return "BlockTransfer" } +func (b *BodyBlockTransfer) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // 块变化类型 type BlockChange interface { GetBlockChangeType() string @@ -226,47 +296,60 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[BlockCh (*BlockChangeClone)(nil), (*BlockChangeDeleted)(nil), (*BlockChangeEnDecode)(nil), - // (*BlockChangeUpdated)(nil), )), "type") +const ( + BlockTypeRaw = "Raw" + BlockTypeEC = "EC" + BlockTypeSegment = "Segment" +) + type Block struct { BlockType string `json:"blockType"` - Index string `json:"index"` + Index int `json:"index"` StorageID cdssdk.StorageID `json:"storageID"` } type DataTransfer struct { - SourceStorageID cdssdk.StorageID `json:"sourceStorageID"` - TargetStorageID cdssdk.StorageID `json:"targetStorageID"` - DataTransferCount string `json:"dataTransferCount"` + SourceStorageID cdssdk.StorageID `json:"sourceStorageID"` + TargetStorageID cdssdk.StorageID `json:"targetStorageID"` + TransferBytes int64 `json:"transferBytes"` } type BlockChangeClone struct { - serder.Metadata `union:"BlockChangeClone"` - Type string `json:"type"` - BlockType string `json:"blockType"` - Index string `json:"index"` - SourceStorageID cdssdk.StorageID `json:"sourceStorageID"` - TargetStorageID cdssdk.StorageID `json:"targetStorageID"` - DataTransferCount string `json:"dataTransferCount"` + serder.Metadata `union:"Clone"` + Type string `json:"type"` + BlockType string `json:"blockType"` + Index int `json:"index"` + SourceStorageID cdssdk.StorageID `json:"sourceStorageID"` + TargetStorageID cdssdk.StorageID `json:"targetStorageID"` + TransferBytes int64 `json:"transferBytes"` } func (b *BlockChangeClone) GetBlockChangeType() string { return "Clone" } +func (b *BlockChangeClone) OnUnionSerializing() { + b.Type = b.GetBlockChangeType() +} + type BlockChangeDeleted struct { - serder.Metadata `union:"BlockChangeDeleted"` - Type string `json:"type"` - Index string `json:"index"` - StorageID string `json:"storageID"` + serder.Metadata `union:"Deleted"` + Type string `json:"type"` + Index int `json:"index"` + StorageID cdssdk.StorageID `json:"storageID"` } func (b *BlockChangeDeleted) GetBlockChangeType() string { return "Deleted" } +func (b *BlockChangeDeleted) OnUnionSerializing() { + b.Type = b.GetBlockChangeType() +} + type BlockChangeEnDecode struct { - serder.Metadata `union:"BlockChangeEnDecode"` + serder.Metadata `union:"EnDecode"` Type string `json:"type"` SourceBlocks []Block `json:"sourceBlocks,omitempty"` TargetBlocks []Block `json:"targetBlocks,omitempty"` @@ -277,16 +360,9 @@ func (b *BlockChangeEnDecode) GetBlockChangeType() string { return "EnDecode" } -// TODO 块更新应该是说对象被重新上传,此时事件内应该包含全部对象块的信息,因此应该使用ObjectUpdated事件 -// type BlockChangeUpdated struct { -// serder.Metadata `union:"BlockChangeUpdated"` -// Type string `json:"type"` -// Blocks []Block `json:"blocks"` -// } - -// func (b *BlockChangeUpdated) GetBlockChangeType() string { -// return "Updated" -// } +func (b *BlockChangeEnDecode) OnUnionSerializing() { + b.Type = b.GetBlockChangeType() +} // 块分布的事件 type BodyBlockDistribution struct { @@ -296,10 +372,10 @@ type BodyBlockDistribution struct { PackageID cdssdk.PackageID `json:"packageID"` Path string `json:"path"` Size int64 `json:"size"` - FileHash string `json:"fileHash"` - FaultTolerance string `json:"faultTolerance"` - Redundancy string `json:"redundancy"` - AvgAccessCost string `json:"avgAccessCost"` + FileHash cdssdk.FileHash `json:"fileHash"` + FaultTolerance float64 `json:"faultTolerance"` + Redundancy float64 `json:"redundancy"` + AvgAccessCost float64 `json:"avgAccessCost"` BlockDistribution []BlockDistributionObjectInfo `json:"blockDistribution"` DataTransfers []DataTransfer `json:"dataTransfers"` } @@ -308,34 +384,45 @@ func (b *BodyBlockDistribution) GetBodyType() string { return "BlockDistribution" } +func (b *BodyBlockDistribution) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + type BlockDistributionObjectInfo struct { - Type string `json:"type"` - Index string `json:"index"` - StorageID string `json:"storageID"` + BlockType string `json:"type"` + Index int `json:"index"` + StorageID cdssdk.StorageID `json:"storageID"` } -// 新增Object的事件 -type BodyNewObject struct { - serder.Metadata `union:"NewObject"` +// 新增或者重新上传Object的事件 +type BodyNewOrUpdateObject struct { + serder.Metadata `union:"NewOrUpdateObject"` Type string `json:"type"` Info cdssdk.Object `json:"info"` BlockDistribution []BlockDistributionObjectInfo `json:"blockDistribution"` } -func (b *BodyNewObject) GetBodyType() string { - return "NewObject" +func (b *BodyNewOrUpdateObject) GetBodyType() string { + return "NewOrUpdateObject" } -// Object更新的事件 -type BodyObjectUpdated struct { - serder.Metadata `union:"ObjectUpdated"` - Type string `json:"type"` - Info cdssdk.Object `json:"info"` - BlockDistribution []BlockDistributionObjectInfo `json:"blockDistribution"` +func (b *BodyNewOrUpdateObject) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + +// Object的基本信息更新的事件 +type BodyObjectInfoUpdated struct { + serder.Metadata `union:"ObjectInfoUpdated"` + Type string `json:"type"` + Object cdssdk.Object `json:"object"` } -func (b *BodyObjectUpdated) GetBodyType() string { - return "ObjectUpdated" +func (b *BodyObjectInfoUpdated) GetBodyType() string { + return "ObjectInfoUpdated" +} + +func (b *BodyObjectInfoUpdated) OnUnionSerializing() { + b.Type = b.GetBodyType() } // Object删除的事件 @@ -349,6 +436,10 @@ func (b *BodyObjectDeleted) GetBodyType() string { return "ObjectDeleted" } +func (b *BodyObjectDeleted) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // 新增Package的事件 type BodyNewPackage struct { serder.Metadata `union:"NewPackage"` @@ -360,6 +451,26 @@ func (b *BodyNewPackage) GetBodyType() string { return "NewPackage" } +func (b *BodyNewPackage) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + +// Package克隆的事件 +type BodyPackageCloned struct { + serder.Metadata `union:"PackageCloned"` + Type string `json:"type"` + SourcePackageID cdssdk.PackageID `json:"sourcePackageID"` + NewPackage cdssdk.Package `json:"newPackage"` +} + +func (b *BodyPackageCloned) GetBodyType() string { + return "PackageCloned" +} + +func (b *BodyPackageCloned) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // Package删除的事件 type BodyPackageDeleted struct { serder.Metadata `union:"PackageDeleted"` @@ -371,6 +482,10 @@ func (b *BodyPackageDeleted) GetBodyType() string { return "PackageDeleted" } +func (b *BodyPackageDeleted) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // 新增Bucket的事件 type BodyNewBucket struct { serder.Metadata `union:"NewBucket"` @@ -382,6 +497,10 @@ func (b *BodyNewBucket) GetBodyType() string { return "NewBucket" } +func (b *BodyNewBucket) OnUnionSerializing() { + b.Type = b.GetBodyType() +} + // Bucket删除的事件 type BodyBucketDeleted struct { serder.Metadata `union:"BucketDeleted"` @@ -392,3 +511,7 @@ type BodyBucketDeleted struct { func (b *BodyBucketDeleted) GetBodyType() string { return "BucketDeleted" } + +func (b *BodyBucketDeleted) OnUnionSerializing() { + b.Type = b.GetBodyType() +} diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go index 00a7069..6d86e30 100644 --- a/common/pkgs/db2/object.go +++ b/common/pkgs/db2/object.go @@ -298,12 +298,12 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] if len(affectedObjIDs) > 0 { // 批量删除 ObjectBlock - if err := ctx.Table("ObjectBlock").Where("ObjectID IN ?", affectedObjIDs).Delete(&stgmod.ObjectBlock{}).Error; err != nil { + if err := db.ObjectBlock().BatchDeleteByObjectID(ctx, affectedObjIDs); err != nil { return nil, fmt.Errorf("batch delete object blocks: %w", err) } // 批量删除 PinnedObject - if err := ctx.Table("PinnedObject").Where("ObjectID IN ?", affectedObjIDs).Delete(&cdssdk.PinnedObject{}).Error; err != nil { + if err := db.PinnedObject().BatchDeleteByObjectID(ctx, affectedObjIDs); err != nil { return nil, fmt.Errorf("batch delete pinned objects: %w", err) } } @@ -343,84 +343,6 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] return affectedObjs, nil } -func (db *ObjectDB) BatchUpdateRedundancy(ctx SQLContext, objs []coormq.UpdatingObjectRedundancy) error { - if len(objs) == 0 { - return nil - } - - nowTime := time.Now() - objIDs := make([]cdssdk.ObjectID, 0, len(objs)) - dummyObjs := make([]cdssdk.Object, 0, len(objs)) - for _, obj := range objs { - objIDs = append(objIDs, obj.ObjectID) - dummyObjs = append(dummyObjs, cdssdk.Object{ - ObjectID: obj.ObjectID, - Redundancy: obj.Redundancy, - CreateTime: nowTime, // 实际不会更新,只因为不能是0值 - UpdateTime: nowTime, - }) - } - - err := db.Object().BatchUpdateColumns(ctx, dummyObjs, []string{"Redundancy", "UpdateTime"}) - if err != nil { - return fmt.Errorf("batch update object redundancy: %w", err) - } - - // 删除原本所有的编码块记录,重新添加 - err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs) - if err != nil { - return fmt.Errorf("batch delete object blocks: %w", err) - } - - // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况 - err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs) - if err != nil { - return fmt.Errorf("batch delete pinned object: %w", err) - } - - blocks := make([]stgmod.ObjectBlock, 0, len(objs)) - for _, obj := range objs { - blocks = append(blocks, obj.Blocks...) - } - err = db.ObjectBlock().BatchCreate(ctx, blocks) - if err != nil { - return fmt.Errorf("batch create object blocks: %w", err) - } - - caches := make([]model.Cache, 0, len(objs)) - for _, obj := range objs { - for _, blk := range obj.Blocks { - caches = append(caches, model.Cache{ - FileHash: blk.FileHash, - StorageID: blk.StorageID, - CreateTime: nowTime, - Priority: 0, - }) - } - } - err = db.Cache().BatchCreate(ctx, caches) - if err != nil { - return fmt.Errorf("batch create object caches: %w", err) - } - - pinneds := make([]cdssdk.PinnedObject, 0, len(objs)) - for _, obj := range objs { - for _, p := range obj.PinnedAt { - pinneds = append(pinneds, cdssdk.PinnedObject{ - ObjectID: obj.ObjectID, - StorageID: p, - CreateTime: nowTime, - }) - } - } - err = db.PinnedObject().BatchTryCreate(ctx, pinneds) - if err != nil { - return fmt.Errorf("batch create pinned objects: %w", err) - } - - return nil -} - func (db *ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error { if len(ids) == 0 { return nil diff --git a/common/pkgs/ioswitch2/agent_worker.go b/common/pkgs/ioswitch2/agent_worker.go index 757cb86..f2bcdc4 100644 --- a/common/pkgs/ioswitch2/agent_worker.go +++ b/common/pkgs/ioswitch2/agent_worker.go @@ -29,7 +29,7 @@ func (w *AgentWorker) NewClient() (exec.WorkerClient, error) { return nil, err } - return &AgentWorkerClient{cli: cli}, nil + return &AgentWorkerClient{hubID: w.Hub.HubID, cli: cli}, nil } func (w *AgentWorker) String() string { diff --git a/common/pkgs/ioswitch2/http_hub_worker.go b/common/pkgs/ioswitch2/http_hub_worker.go index f145f25..c1db4f9 100644 --- a/common/pkgs/ioswitch2/http_hub_worker.go +++ b/common/pkgs/ioswitch2/http_hub_worker.go @@ -29,7 +29,7 @@ func (w *HttpHubWorker) NewClient() (exec.WorkerClient, error) { return nil, err } - return &HttpHubWorkerClient{cli: cli}, nil + return &HttpHubWorkerClient{hubID: w.Hub.HubID, cli: cli}, nil } func (w *HttpHubWorker) String() string { diff --git a/common/pkgs/mq/coordinator/object.go b/common/pkgs/mq/coordinator/object.go index c90712d..ad63e6e 100644 --- a/common/pkgs/mq/coordinator/object.go +++ b/common/pkgs/mq/coordinator/object.go @@ -272,6 +272,7 @@ type DeleteObjects struct { type DeleteObjectsResp struct { mq.MessageBodyBase + Successes []cdssdk.ObjectID `json:"successes"` } func ReqDeleteObjects(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) *DeleteObjects { @@ -280,8 +281,10 @@ func ReqDeleteObjects(userID cdssdk.UserID, objectIDs []cdssdk.ObjectID) *Delete ObjectIDs: objectIDs, } } -func RespDeleteObjects() *DeleteObjectsResp { - return &DeleteObjectsResp{} +func RespDeleteObjects(sucs []cdssdk.ObjectID) *DeleteObjectsResp { + return &DeleteObjectsResp{ + Successes: sucs, + } } func (client *Client) DeleteObjects(msg *DeleteObjects) (*DeleteObjectsResp, error) { return mq.Request(Service.DeleteObjects, client.rabbitCli, msg) diff --git a/common/pkgs/mq/coordinator/package.go b/common/pkgs/mq/coordinator/package.go index e077a7b..1bbc0b3 100644 --- a/common/pkgs/mq/coordinator/package.go +++ b/common/pkgs/mq/coordinator/package.go @@ -120,9 +120,8 @@ var _ = Register(Service.UpdatePackage) type UpdatePackage struct { mq.MessageBodyBase - PackageID cdssdk.PackageID `json:"packageID"` - Adds []AddObjectEntry `json:"adds"` - Deletes []cdssdk.ObjectID `json:"deletes"` + PackageID cdssdk.PackageID `json:"packageID"` + Adds []AddObjectEntry `json:"adds"` } type UpdatePackageResp struct { mq.MessageBodyBase @@ -136,11 +135,10 @@ type AddObjectEntry struct { StorageIDs []cdssdk.StorageID `json:"storageIDs"` } -func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry, deletes []cdssdk.ObjectID) *UpdatePackage { +func NewUpdatePackage(packageID cdssdk.PackageID, adds []AddObjectEntry) *UpdatePackage { return &UpdatePackage{ PackageID: packageID, Adds: adds, - Deletes: deletes, } } func NewUpdatePackageResp(added []cdssdk.Object) *UpdatePackageResp { diff --git a/common/pkgs/sysevent/watcher.go b/common/pkgs/sysevent/watcher.go index 6ffbd72..c93cca5 100644 --- a/common/pkgs/sysevent/watcher.go +++ b/common/pkgs/sysevent/watcher.go @@ -113,9 +113,23 @@ func (w *WatcherHost) AddWatcher(watcher Watcher) { w.watchers = append(w.watchers, watcher) } +func (w *WatcherHost) AddWatcherFn(fn func(event SysEvent)) Watcher { + watcher := &fnWatcher{fn: fn} + w.AddWatcher(watcher) + return watcher +} + func (w *WatcherHost) RemoveWatcher(watcher Watcher) { w.lock.Lock() defer w.lock.Unlock() w.watchers = lo2.Remove(w.watchers, watcher) } + +type fnWatcher struct { + fn func(event SysEvent) +} + +func (w *fnWatcher) OnEvent(event SysEvent) { + w.fn(event) +} diff --git a/common/pkgs/uploader/create_load.go b/common/pkgs/uploader/create_load.go index 359ea27..3827c16 100644 --- a/common/pkgs/uploader/create_load.go +++ b/common/pkgs/uploader/create_load.go @@ -96,7 +96,7 @@ func (u *CreateLoadUploader) Commit() (CreateLoadResult, error) { } defer stgglb.CoordinatorMQPool.Release(coorCli) - updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(u.pkg.PackageID, u.successes, nil)) + updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(u.pkg.PackageID, u.successes)) if err != nil { return CreateLoadResult{}, fmt.Errorf("updating package: %w", err) } diff --git a/common/pkgs/uploader/update.go b/common/pkgs/uploader/update.go index 596db35..127b156 100644 --- a/common/pkgs/uploader/update.go +++ b/common/pkgs/uploader/update.go @@ -100,7 +100,7 @@ func (w *UpdateUploader) Commit() (UpdateResult, error) { } defer stgglb.CoordinatorMQPool.Release(coorCli) - updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(w.pkgID, w.successes, nil)) + updateResp, err := coorCli.UpdatePackage(coormq.NewUpdatePackage(w.pkgID, w.successes)) if err != nil { return UpdateResult{}, fmt.Errorf("updating package: %w", err) } diff --git a/coordinator/internal/cmd/serve.go b/coordinator/internal/cmd/serve.go index 59057d5..664dfdf 100644 --- a/coordinator/internal/cmd/serve.go +++ b/coordinator/internal/cmd/serve.go @@ -1,13 +1,16 @@ package cmd import ( + "context" "fmt" "os" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" "gitlink.org.cn/cloudream/storage/coordinator/internal/config" mymq "gitlink.org.cn/cloudream/storage/coordinator/internal/mq" ) @@ -30,7 +33,15 @@ func serve(configPath string) { logger.Fatalf("new db2 failed, err: %s", err.Error()) } - coorSvr, err := coormq.NewServer(mymq.NewService(db2), config.Cfg().RabbitMQ) + // 初始化系统事件发布器 + evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &stgmod.SourceCoordinator{}) + if err != nil { + logger.Errorf("new sysevent publisher: %v", err) + os.Exit(1) + } + go servePublisher(evtPub) + + coorSvr, err := coormq.NewServer(mymq.NewService(db2, evtPub), config.Cfg().RabbitMQ) if err != nil { logger.Fatalf("new coordinator server failed, err: %s", err.Error()) } @@ -46,6 +57,41 @@ func serve(configPath string) { <-forever } +func servePublisher(evtPub *sysevent.Publisher) { + logger.Info("start serving sysevent publisher") + + ch := evtPub.Start() + +loop: + for { + val, err := ch.Receive().Wait(context.Background()) + if err != nil { + logger.Errorf("sysevent publisher stopped with error: %s", err.Error()) + break + } + + switch val := val.(type) { + case sysevent.PublishError: + logger.Errorf("publishing event: %v", val) + + case sysevent.PublisherExited: + if val.Err != nil { + logger.Errorf("publisher exited with error: %v", val.Err) + } else { + logger.Info("publisher exited") + } + break loop + + case sysevent.OtherError: + logger.Errorf("sysevent: %v", val) + } + } + logger.Info("sysevent publisher stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) +} + func serveCoorServer(server *coormq.Server, cfg mq.Config) { logger.Info("start serving command server") diff --git a/coordinator/internal/mq/bucket.go b/coordinator/internal/mq/bucket.go index ccaf031..3d79d0c 100644 --- a/coordinator/internal/mq/bucket.go +++ b/coordinator/internal/mq/bucket.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" "gorm.io/gorm" @@ -93,6 +94,10 @@ func (svc *Service) CreateBucket(msg *coormq.CreateBucket) (*coormq.CreateBucket return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } + svc.evtPub.Publish(&stgmod.BodyNewBucket{ + Info: bucket, + }) + return mq.ReplyOK(coormq.NewCreateBucketResp(bucket)) } @@ -133,5 +138,9 @@ func (svc *Service) DeleteBucket(msg *coormq.DeleteBucket) (*coormq.DeleteBucket return nil, mq.Failed(errorcode.OperationFailed, "delete bucket failed") } + svc.evtPub.Publish(&stgmod.BodyBucketDeleted{ + BucketID: msg.BucketID, + }) + return mq.ReplyOK(coormq.NewDeleteBucketResp()) } diff --git a/coordinator/internal/mq/object.go b/coordinator/internal/mq/object.go index 4431e16..1c2e256 100644 --- a/coordinator/internal/mq/object.go +++ b/coordinator/internal/mq/object.go @@ -3,8 +3,10 @@ package mq import ( "errors" "fmt" + "time" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gitlink.org.cn/cloudream/storage/common/pkgs/db2/model" "gorm.io/gorm" "github.com/samber/lo" @@ -192,8 +194,95 @@ func (svc *Service) GetObjectDetails(msg *coormq.GetObjectDetails) (*coormq.GetO } func (svc *Service) UpdateObjectRedundancy(msg *coormq.UpdateObjectRedundancy) (*coormq.UpdateObjectRedundancyResp, *mq.CodeMessage) { - err := svc.db2.DoTx(func(tx db2.SQLContext) error { - return svc.db2.Object().BatchUpdateRedundancy(tx, msg.Updatings) + err := svc.db2.DoTx(func(ctx db2.SQLContext) error { + db := svc.db2 + objs := msg.Updatings + + nowTime := time.Now() + objIDs := make([]cdssdk.ObjectID, 0, len(objs)) + for _, obj := range objs { + objIDs = append(objIDs, obj.ObjectID) + } + + avaiIDs, err := db.Object().BatchTestObjectID(ctx, objIDs) + if err != nil { + return fmt.Errorf("batch test object id: %w", err) + } + + // 过滤掉已经不存在的对象。 + // 注意,objIDs没有被过滤,因为后续逻辑不过滤也不会出错 + objs = lo.Filter(objs, func(obj coormq.UpdatingObjectRedundancy, _ int) bool { + return avaiIDs[obj.ObjectID] + }) + + dummyObjs := make([]cdssdk.Object, 0, len(objs)) + for _, obj := range objs { + dummyObjs = append(dummyObjs, cdssdk.Object{ + ObjectID: obj.ObjectID, + Redundancy: obj.Redundancy, + CreateTime: nowTime, // 实际不会更新,只因为不能是0值 + UpdateTime: nowTime, + }) + } + + err = db.Object().BatchUpdateColumns(ctx, dummyObjs, []string{"Redundancy", "UpdateTime"}) + if err != nil { + return fmt.Errorf("batch update object redundancy: %w", err) + } + + // 删除原本所有的编码块记录,重新添加 + err = db.ObjectBlock().BatchDeleteByObjectID(ctx, objIDs) + if err != nil { + return fmt.Errorf("batch delete object blocks: %w", err) + } + + // 删除原本Pin住的Object。暂不考虑FileHash没有变化的情况 + err = db.PinnedObject().BatchDeleteByObjectID(ctx, objIDs) + if err != nil { + return fmt.Errorf("batch delete pinned object: %w", err) + } + + blocks := make([]stgmod.ObjectBlock, 0, len(objs)) + for _, obj := range objs { + blocks = append(blocks, obj.Blocks...) + } + err = db.ObjectBlock().BatchCreate(ctx, blocks) + if err != nil { + return fmt.Errorf("batch create object blocks: %w", err) + } + + caches := make([]model.Cache, 0, len(objs)) + for _, obj := range objs { + for _, blk := range obj.Blocks { + caches = append(caches, model.Cache{ + FileHash: blk.FileHash, + StorageID: blk.StorageID, + CreateTime: nowTime, + Priority: 0, + }) + } + } + err = db.Cache().BatchCreate(ctx, caches) + if err != nil { + return fmt.Errorf("batch create object caches: %w", err) + } + + pinneds := make([]cdssdk.PinnedObject, 0, len(objs)) + for _, obj := range objs { + for _, p := range obj.PinnedAt { + pinneds = append(pinneds, cdssdk.PinnedObject{ + ObjectID: obj.ObjectID, + StorageID: p, + CreateTime: nowTime, + }) + } + } + err = db.PinnedObject().BatchTryCreate(ctx, pinneds) + if err != nil { + return fmt.Errorf("batch create pinned objects: %w", err) + } + + return nil }) if err != nil { logger.Warnf("batch updating redundancy: %s", err.Error()) @@ -275,6 +364,8 @@ func pickByObjectIDs[T any](objs []T, objIDs []cdssdk.ObjectID, getID func(T) cd func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsResp, *mq.CodeMessage) { var sucs []cdssdk.ObjectID + var evt []*stgmod.BodyObjectInfoUpdated + err := svc.db2.DoTx(func(tx db2.SQLContext) error { msg.Movings = sort2.Sort(msg.Movings, func(o1, o2 cdsapi.MovingObject) int { return sort2.Cmp(o1.ObjectID, o2.ObjectID) @@ -336,6 +427,11 @@ func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsRes } sucs = lo.Map(newObjs, func(obj cdssdk.Object, _ int) cdssdk.ObjectID { return obj.ObjectID }) + evt = lo.Map(newObjs, func(obj cdssdk.Object, _ int) *stgmod.BodyObjectInfoUpdated { + return &stgmod.BodyObjectInfoUpdated{ + Object: obj, + } + }) return nil }) if err != nil { @@ -343,6 +439,10 @@ func (svc *Service) MoveObjects(msg *coormq.MoveObjects) (*coormq.MoveObjectsRes return nil, mq.Failed(errorcode.OperationFailed, "move objects failed") } + for _, e := range evt { + svc.evtPub.Publish(e) + } + return mq.ReplyOK(coormq.RespMoveObjects(sucs)) } @@ -453,8 +553,15 @@ func (svc *Service) checkPathChangedObjects(tx db2.SQLContext, userID cdssdk.Use } func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObjectsResp, *mq.CodeMessage) { + var sucs []cdssdk.ObjectID err := svc.db2.DoTx(func(tx db2.SQLContext) error { - err := svc.db2.Object().BatchDelete(tx, msg.ObjectIDs) + avaiIDs, err := svc.db2.Object().BatchTestObjectID(tx, msg.ObjectIDs) + if err != nil { + return fmt.Errorf("batch testing object id: %w", err) + } + sucs = lo.Keys(avaiIDs) + + err = svc.db2.Object().BatchDelete(tx, msg.ObjectIDs) if err != nil { return fmt.Errorf("batch deleting objects: %w", err) } @@ -481,7 +588,13 @@ func (svc *Service) DeleteObjects(msg *coormq.DeleteObjects) (*coormq.DeleteObje return nil, mq.Failed(errorcode.OperationFailed, "batch delete objects failed") } - return mq.ReplyOK(coormq.RespDeleteObjects()) + for _, objID := range sucs { + svc.evtPub.Publish(&stgmod.BodyObjectDeleted{ + ObjectID: objID, + }) + } + + return mq.ReplyOK(coormq.RespDeleteObjects(sucs)) } func (svc *Service) CloneObjects(msg *coormq.CloneObjects) (*coormq.CloneObjectsResp, *mq.CodeMessage) { @@ -494,6 +607,8 @@ func (svc *Service) CloneObjects(msg *coormq.CloneObjects) (*coormq.CloneObjects Clonings map[string]CloningObject } + var evt []*stgmod.BodyNewOrUpdateObject + // TODO 要检查用户是否有Object、Package的权限 clonings := make(map[cdssdk.PackageID]*PackageClonings) for i, cloning := range msg.Clonings { @@ -600,6 +715,25 @@ func (svc *Service) CloneObjects(msg *coormq.CloneObjects) (*coormq.CloneObjects for i, cloning := range avaiClonings { ret[cloning.OrgIndex] = &newObjs[i] } + + for i, cloning := range avaiClonings { + var evtBlks []stgmod.BlockDistributionObjectInfo + blkType := getBlockTypeFromRed(newObjs[i].Redundancy) + + oldBlks := avaiDetailsMap[cloning.Cloning.ObjectID].Blocks + for _, blk := range oldBlks { + evtBlks = append(evtBlks, stgmod.BlockDistributionObjectInfo{ + BlockType: blkType, + Index: blk.Index, + StorageID: blk.StorageID, + }) + } + + evt = append(evt, &stgmod.BodyNewOrUpdateObject{ + Info: newObjs[i], + BlockDistribution: evtBlks, + }) + } return nil }) @@ -608,5 +742,9 @@ func (svc *Service) CloneObjects(msg *coormq.CloneObjects) (*coormq.CloneObjects return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } + for _, e := range evt { + svc.evtPub.Publish(e) + } + return mq.ReplyOK(coormq.RespCloneObjects(ret)) } diff --git a/coordinator/internal/mq/package.go b/coordinator/internal/mq/package.go index 307a0cb..999234b 100644 --- a/coordinator/internal/mq/package.go +++ b/coordinator/internal/mq/package.go @@ -79,6 +79,10 @@ func (svc *Service) CreatePackage(msg *coormq.CreatePackage) (*coormq.CreatePack return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } + svc.evtPub.Publish(&stgmod.BodyNewPackage{ + Info: pkg, + }) + return mq.ReplyOK(coormq.NewCreatePackageResp(pkg)) } @@ -90,21 +94,11 @@ func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePack return fmt.Errorf("getting package by id: %w", err) } - // 先执行删除操作 - if len(msg.Deletes) > 0 { - if err := svc.db2.Object().BatchDelete(tx, msg.Deletes); err != nil { - return fmt.Errorf("deleting objects: %w", err) - } - } - - // 再执行添加操作 - if len(msg.Adds) > 0 { - ad, err := svc.db2.Object().BatchAdd(tx, msg.PackageID, msg.Adds) - if err != nil { - return fmt.Errorf("adding objects: %w", err) - } - added = ad + ad, err := svc.db2.Object().BatchAdd(tx, msg.PackageID, msg.Adds) + if err != nil { + return fmt.Errorf("adding objects: %w", err) } + added = ad return nil }) @@ -113,6 +107,26 @@ func (svc *Service) UpdatePackage(msg *coormq.UpdatePackage) (*coormq.UpdatePack return nil, mq.Failed(errorcode.OperationFailed, "update package failed") } + addedMp := make(map[string]cdssdk.Object) + for _, obj := range added { + addedMp[obj.Path] = obj + } + + for _, add := range msg.Adds { + var blks []stgmod.BlockDistributionObjectInfo + for _, stgID := range add.StorageIDs { + blks = append(blks, stgmod.BlockDistributionObjectInfo{ + BlockType: stgmod.BlockTypeRaw, + StorageID: stgID, + }) + } + + svc.evtPub.Publish(&stgmod.BodyNewOrUpdateObject{ + Info: addedMp[add.Path], + BlockDistribution: blks, + }) + } + return mq.ReplyOK(coormq.NewUpdatePackageResp(added)) } @@ -137,6 +151,10 @@ func (svc *Service) DeletePackage(msg *coormq.DeletePackage) (*coormq.DeletePack return nil, mq.Failed(errorcode.OperationFailed, "delete package failed") } + svc.evtPub.Publish(&stgmod.BodyPackageDeleted{ + PackageID: msg.PackageID, + }) + return mq.ReplyOK(coormq.NewDeletePackageResp()) } @@ -203,6 +221,11 @@ func (svc *Service) ClonePackage(msg *coormq.ClonePackage) (*coormq.ClonePackage return nil, mq.Failed(errorcode.OperationFailed, err.Error()) } + svc.evtPub.Publish(&stgmod.BodyPackageCloned{ + SourcePackageID: msg.PackageID, + NewPackage: pkg, + }) + return mq.ReplyOK(coormq.RespClonePackage(pkg)) } diff --git a/coordinator/internal/mq/service.go b/coordinator/internal/mq/service.go index c0fe4e5..d6f2481 100644 --- a/coordinator/internal/mq/service.go +++ b/coordinator/internal/mq/service.go @@ -2,14 +2,17 @@ package mq import ( "gitlink.org.cn/cloudream/storage/common/pkgs/db2" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" ) type Service struct { - db2 *db2.DB + db2 *db2.DB + evtPub *sysevent.Publisher } -func NewService(db2 *db2.DB) *Service { +func NewService(db2 *db2.DB, evtPub *sysevent.Publisher) *Service { return &Service{ - db2: db2, + db2: db2, + evtPub: evtPub, } } diff --git a/coordinator/internal/mq/utils.go b/coordinator/internal/mq/utils.go new file mode 100644 index 0000000..6b68c28 --- /dev/null +++ b/coordinator/internal/mq/utils.go @@ -0,0 +1,23 @@ +package mq + +import ( + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" +) + +func getBlockTypeFromRed(red cdssdk.Redundancy) string { + switch red.(type) { + case *cdssdk.NoneRedundancy: + return stgmod.BlockTypeRaw + + case *cdssdk.ECRedundancy: + return stgmod.BlockTypeEC + + case *cdssdk.LRCRedundancy: + return stgmod.BlockTypeEC + + case *cdssdk.SegmentRedundancy: + return stgmod.BlockTypeSegment + } + return "" +} diff --git a/datamap/internal/models/blockdistribution.go b/datamap/internal/models/blockdistribution.go index 12e1dc7..10e0d6e 100644 --- a/datamap/internal/models/blockdistribution.go +++ b/datamap/internal/models/blockdistribution.go @@ -170,7 +170,7 @@ func (w *BlockDistributionWatcher) OnEvent(event sysevent.SysEvent) { for _, dataTransfer := range body.DataTransfers { sourceStorageID, _ := strconv.ParseInt(string(dataTransfer.SourceStorageID), 10, 64) targetStorageID, _ := strconv.ParseInt(string(dataTransfer.TargetStorageID), 10, 64) - dataTransferCount, _ := strconv.ParseInt(dataTransfer.DataTransferCount, 10, 64) + dataTransferCount, _ := strconv.ParseInt(dataTransfer.TransferBytes, 10, 64) err := repoStorage.CreateStorageTransferCount(&StorageTransferCount{ ObjectID: int64(body.ObjectID), diff --git a/datamap/internal/models/object.go b/datamap/internal/models/object.go index 3cb2373..77d5ac2 100644 --- a/datamap/internal/models/object.go +++ b/datamap/internal/models/object.go @@ -75,7 +75,7 @@ type ObjectWatcher struct { func (w *ObjectWatcher) OnEvent(event sysevent.SysEvent) { if event.Category == "objectChange" { - if _, ok := event.Body.(*stgmod.BodyNewObject); ok { + if _, ok := event.Body.(*stgmod.BodyNewOrUpdateObject); ok { } else { fmt.Printf("Watcher %s: Unexpected Body type, expected *ObjectInfo, got %T\n", w.Name, event.Body) diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 47e149a..9ceee86 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -483,6 +483,7 @@ func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.Object } var blocks []stgmod.ObjectBlock + var blockChgs []stgmod.BlockChange for i, stg := range uploadStgs { blocks = append(blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, @@ -490,8 +491,26 @@ func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.Object StorageID: stg.Storage.Storage.StorageID, FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash, }) + blockChgs = append(blockChgs, &stgmod.BlockChangeClone{ + BlockType: stgmod.BlockTypeRaw, + SourceStorageID: obj.Blocks[0].StorageID, + TargetStorageID: stg.Storage.Storage.StorageID, + TransferBytes: 1, + }) } + // 删除原本的文件块 + blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{ + Index: 0, + StorageID: obj.Blocks[0].StorageID, + }) + + ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ + ObjectID: obj.Object.ObjectID, + PackageID: obj.Object.PackageID, + BlockChanges: blockChgs, + }) + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: red, @@ -532,6 +551,8 @@ func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectD } var blocks []stgmod.ObjectBlock + var evtTargetBlocks []stgmod.Block + var evtBlockTrans []stgmod.DataTransfer for i := 0; i < red.N; i++ { blocks = append(blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, @@ -539,8 +560,39 @@ func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectD StorageID: uploadStgs[i].Storage.Storage.StorageID, FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash, }) + evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ + BlockType: stgmod.BlockTypeEC, + Index: i, + StorageID: uploadStgs[i].Storage.Storage.StorageID, + }) + evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{ + SourceStorageID: obj.Blocks[0].StorageID, + TargetStorageID: uploadStgs[i].Storage.Storage.StorageID, + TransferBytes: 1, + }) } + ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ + ObjectID: obj.Object.ObjectID, + PackageID: obj.Object.PackageID, + BlockChanges: []stgmod.BlockChange{ + &stgmod.BlockChangeEnDecode{ + SourceBlocks: []stgmod.Block{{ + BlockType: stgmod.BlockTypeRaw, + StorageID: obj.Blocks[0].StorageID, + }}, + TargetBlocks: evtTargetBlocks, + DataTransfers: evtBlockTrans, + }, + + // 删除原本的文件块 + &stgmod.BlockChangeDeleted{ + Index: 0, + StorageID: obj.Blocks[0].StorageID, + }, + }, + }) + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: red, @@ -548,7 +600,7 @@ func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectD }, nil } -func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, uploadStorages []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { +func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.ObjectDetail, red *cdssdk.LRCRedundancy, uploadStgs []*StorageLoadInfo, allStgs map[cdssdk.StorageID]*StorageLoadInfo) (*coormq.UpdatingObjectRedundancy, error) { if len(obj.Blocks) == 0 { return nil, fmt.Errorf("object is not cached on any storages, cannot change its redundancy to ec") } @@ -563,7 +615,7 @@ func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.Object var toes []ioswitchlrc.To for i := 0; i < red.N; i++ { - toes = append(toes, ioswitchlrc.NewToStorage(*uploadStorages[i].Storage.MasterHub, uploadStorages[i].Storage.Storage, i, fmt.Sprintf("%d", i))) + toes = append(toes, ioswitchlrc.NewToStorage(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage.Storage, i, fmt.Sprintf("%d", i))) } plans := exec.NewPlanBuilder() @@ -580,15 +632,48 @@ func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.Object } var blocks []stgmod.ObjectBlock + var evtTargetBlocks []stgmod.Block + var evtBlockTrans []stgmod.DataTransfer for i := 0; i < red.N; i++ { blocks = append(blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: i, - StorageID: uploadStorages[i].Storage.Storage.StorageID, + StorageID: uploadStgs[i].Storage.Storage.StorageID, FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash, }) + evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ + BlockType: stgmod.BlockTypeEC, + Index: i, + StorageID: uploadStgs[i].Storage.Storage.StorageID, + }) + evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{ + SourceStorageID: obj.Blocks[0].StorageID, + TargetStorageID: uploadStgs[i].Storage.Storage.StorageID, + TransferBytes: 1, + }) } + ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ + ObjectID: obj.Object.ObjectID, + PackageID: obj.Object.PackageID, + BlockChanges: []stgmod.BlockChange{ + &stgmod.BlockChangeEnDecode{ + SourceBlocks: []stgmod.Block{{ + BlockType: stgmod.BlockTypeRaw, + StorageID: obj.Blocks[0].StorageID, + }}, + TargetBlocks: evtTargetBlocks, + DataTransfers: evtBlockTrans, + }, + + // 删除原本的文件块 + &stgmod.BlockChangeDeleted{ + Index: 0, + StorageID: obj.Blocks[0].StorageID, + }, + }, + }) + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: red, @@ -634,6 +719,8 @@ func (t *CheckPackageRedundancy) noneToSeg(ctx ExecuteContext, obj stgmod.Object } var blocks []stgmod.ObjectBlock + var evtTargetBlocks []stgmod.Block + var evtBlockTrans []stgmod.DataTransfer for i, stg := range uploadStgs { blocks = append(blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, @@ -641,8 +728,39 @@ func (t *CheckPackageRedundancy) noneToSeg(ctx ExecuteContext, obj stgmod.Object StorageID: stg.Storage.Storage.StorageID, FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash, }) + evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ + BlockType: stgmod.BlockTypeSegment, + Index: i, + StorageID: uploadStgs[i].Storage.Storage.StorageID, + }) + evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{ + SourceStorageID: obj.Blocks[0].StorageID, + TargetStorageID: uploadStgs[i].Storage.Storage.StorageID, + TransferBytes: 1, + }) } + ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ + ObjectID: obj.Object.ObjectID, + PackageID: obj.Object.PackageID, + BlockChanges: []stgmod.BlockChange{ + &stgmod.BlockChangeEnDecode{ + SourceBlocks: []stgmod.Block{{ + BlockType: stgmod.BlockTypeRaw, + StorageID: obj.Blocks[0].StorageID, + }}, + TargetBlocks: evtTargetBlocks, + DataTransfers: evtBlockTrans, + }, + + // 删除原本的文件块 + &stgmod.BlockChangeDeleted{ + Index: 0, + StorageID: obj.Blocks[0].StorageID, + }, + }, + }) + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: red, @@ -687,6 +805,7 @@ func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectD } var blocks []stgmod.ObjectBlock + var blockChgs []stgmod.BlockChange for i, stg := range uploadStgs { blocks = append(blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, @@ -694,8 +813,26 @@ func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectD StorageID: stg.Storage.Storage.StorageID, FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash, }) + blockChgs = append(blockChgs, &stgmod.BlockChangeClone{ + BlockType: stgmod.BlockTypeRaw, + SourceStorageID: obj.Blocks[0].StorageID, + TargetStorageID: stg.Storage.Storage.StorageID, + TransferBytes: 1, + }) } + // 删除原本的文件块 + blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{ + Index: 0, + StorageID: obj.Blocks[0].StorageID, + }) + + ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ + ObjectID: obj.Object.ObjectID, + PackageID: obj.Object.PackageID, + BlockChanges: blockChgs, + }) + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: red, @@ -739,26 +876,21 @@ func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDe // 如果选择的备份节点都是同一个,那么就只要上传一次 uploadStgs = lo.UniqBy(uploadStgs, func(item *StorageLoadInfo) cdssdk.StorageID { return item.Storage.Storage.StorageID }) - // 每个被选节点都在自己节点上重建原始数据 planBlder := exec.NewPlanBuilder() - for i := range uploadStgs { - ft := ioswitch2.NewFromTo() - ft.ECParam = srcRed + ft := ioswitch2.NewFromTo() + ft.ECParam = srcRed - for i2, block := range chosenBlocks { - ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i2].MasterHub, chosenBlockStg[i2], ioswitch2.ECStream(block.Index))) - } + for i, block := range chosenBlocks { + ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i].MasterHub, chosenBlockStg[i], ioswitch2.ECStream(block.Index))) + } - len := obj.Object.Size - ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i), math2.Range{ - Offset: 0, - Length: &len, - })) + for i := range uploadStgs { + ft.AddTo(ioswitch2.NewToShardStoreWithRange(*uploadStgs[i].Storage.MasterHub, uploadStgs[i].Storage, ioswitch2.RawStream(), fmt.Sprintf("%d", i), math2.NewRange(0, obj.Object.Size))) + } - err := parser.Parse(ft, planBlder) - if err != nil { - return nil, fmt.Errorf("parsing plan: %w", err) - } + err := parser.Parse(ft, planBlder) + if err != nil { + return nil, fmt.Errorf("parsing plan: %w", err) } // TODO 添加依赖 @@ -770,6 +902,7 @@ func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDe } var blocks []stgmod.ObjectBlock + for i := range uploadStgs { blocks = append(blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, @@ -779,6 +912,55 @@ func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDe }) } + var evtSrcBlocks []stgmod.Block + var evtTargetBlocks []stgmod.Block + for i2, block := range chosenBlocks { + evtSrcBlocks = append(evtSrcBlocks, stgmod.Block{ + BlockType: stgmod.BlockTypeEC, + Index: block.Index, + StorageID: chosenBlockStg[i2].Storage.StorageID, + }) + } + + for _, stg := range uploadStgs { + evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ + BlockType: stgmod.BlockTypeRaw, + Index: 0, + StorageID: stg.Storage.Storage.StorageID, + }) + } + + var evtBlockTrans []stgmod.DataTransfer + for _, stg := range uploadStgs { + for i2 := range chosenBlocks { + evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{ + SourceStorageID: chosenBlockStg[i2].Storage.StorageID, + TargetStorageID: stg.Storage.Storage.StorageID, + TransferBytes: 1, + }) + } + } + + var blockChgs []stgmod.BlockChange + blockChgs = append(blockChgs, &stgmod.BlockChangeEnDecode{ + SourceBlocks: evtSrcBlocks, + TargetBlocks: evtTargetBlocks, + DataTransfers: evtBlockTrans, + }) + + for _, block := range obj.Blocks { + blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{ + Index: block.Index, + StorageID: block.StorageID, + }) + } + + ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ + ObjectID: obj.Object.ObjectID, + PackageID: obj.Object.PackageID, + BlockChanges: blockChgs, + }) + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: tarRed, @@ -817,6 +999,22 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet // 目前EC的参数都相同,所以可以不用重建出完整数据然后再分块,可以直接构建出目的节点需要的块 planBlder := exec.NewPlanBuilder() + var evtSrcBlocks []stgmod.Block + var evtTargetBlocks []stgmod.Block + + ft := ioswitch2.NewFromTo() + ft.ECParam = srcRed + + for i, block := range chosenBlocks { + ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i].MasterHub, chosenBlockStg[i], ioswitch2.ECStream(block.Index))) + + evtSrcBlocks = append(evtSrcBlocks, stgmod.Block{ + BlockType: stgmod.BlockTypeEC, + Index: block.Index, + StorageID: chosenBlockStg[i].Storage.StorageID, + }) + } + var newBlocks []stgmod.ObjectBlock shouldUpdateBlocks := false for i, stg := range uploadStorages { @@ -838,24 +1036,23 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet shouldUpdateBlocks = true // 否则就要重建出这个节点需要的块 - - ft := ioswitch2.NewFromTo() - ft.ECParam = srcRed - for i2, block := range chosenBlocks { - ft.AddFrom(ioswitch2.NewFromShardstore(block.FileHash, *chosenBlockStg[i2].MasterHub, chosenBlockStg[i2], ioswitch2.ECStream(block.Index))) - } - // 输出只需要自己要保存的那一块 ft.AddTo(ioswitch2.NewToShardStore(*stg.Storage.MasterHub, stg.Storage, ioswitch2.ECStream(i), fmt.Sprintf("%d", i))) - err := parser.Parse(ft, planBlder) - if err != nil { - return nil, fmt.Errorf("parsing plan: %w", err) - } + evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ + BlockType: stgmod.BlockTypeEC, + Index: i, + StorageID: stg.Storage.Storage.StorageID, + }) newBlocks = append(newBlocks, newBlock) } + err := parser.Parse(ft, planBlder) + if err != nil { + return nil, fmt.Errorf("parsing plan: %w", err) + } + // 如果没有任何Plan,Wait会直接返回成功 execCtx := exec.NewExecContext() exec.SetValueByType(execCtx, ctx.Args.StgMgr) @@ -877,6 +1074,41 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet newBlocks[idx].FileHash = v.(*ops2.FileHashValue).Hash } + var evtBlockTrans []stgmod.DataTransfer + for _, src := range evtSrcBlocks { + for _, tar := range evtTargetBlocks { + evtBlockTrans = append(evtBlockTrans, stgmod.DataTransfer{ + SourceStorageID: src.StorageID, + TargetStorageID: tar.StorageID, + TransferBytes: 1, + }) + } + } + + var blockChgs []stgmod.BlockChange + for _, block := range obj.Blocks { + keep := lo.ContainsBy(newBlocks, func(newBlock stgmod.ObjectBlock) bool { + return newBlock.Index == block.Index && newBlock.StorageID == block.StorageID + }) + if !keep { + blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{ + Index: block.Index, + StorageID: block.StorageID, + }) + } + } + blockChgs = append(blockChgs, &stgmod.BlockChangeEnDecode{ + SourceBlocks: evtSrcBlocks, + TargetBlocks: evtTargetBlocks, + DataTransfers: evtBlockTrans, + }) + + ctx.Args.EvtPub.Publish(&stgmod.BodyBlockTransfer{ + ObjectID: obj.Object.ObjectID, + PackageID: obj.Object.PackageID, + BlockChanges: blockChgs, + }) + return &coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, Redundancy: tarRed, @@ -915,6 +1147,8 @@ func (t *CheckPackageRedundancy) lrcToLRC(ctx ExecuteContext, obj stgmod.ObjectD } } + // TODO 产生BlockTransfer事件 + if canGroupReconstruct { // return t.groupReconstructLRC(obj, lostBlocks, lostBlockGrps, blocksGrpByIndex, srcRed, uploadStorages) } diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 4af4c90..277222d 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -117,6 +117,8 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { planBld := exec.NewPlanBuilder() planningStgIDs := make(map[cdssdk.StorageID]bool) + var sysEvents []stgmod.SysEventBody + // 对于rep对象,统计出所有对象块分布最多的两个节点,用这两个节点代表所有rep对象块的分布,去进行退火算法 var repObjectsUpdating []coormq.UpdatingObjectRedundancy repMostHubIDs := t.summaryRepObjectBlockNodes(repObjects) @@ -128,6 +130,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { }) for _, obj := range repObjects { repObjectsUpdating = append(repObjectsUpdating, t.makePlansForRepObject(allStgInfos, solu, obj, planBld, planningStgIDs)) + sysEvents = append(sysEvents, t.generateSysEventForRepObject(solu, obj)...) } // 对于ec对象,则每个对象单独进行退火算法 @@ -141,6 +144,7 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { blocks: obj.Blocks, }) ecObjectsUpdating = append(ecObjectsUpdating, t.makePlansForECObject(allStgInfos, solu, obj, planBld, planningStgIDs)) + sysEvents = append(sysEvents, t.generateSysEventForECObject(solu, obj)...) } ioSwRets, err := t.executePlans(execCtx, planBld, planningStgIDs) @@ -161,6 +165,10 @@ func (t *CleanPinned) Execute(execCtx ExecuteContext) { log.Warnf("changing object redundancy: %s", err.Error()) return } + + for _, e := range sysEvents { + execCtx.Args.EvtPub.Publish(e) + } } } @@ -227,9 +235,12 @@ type annealingState struct { maxScore float64 // 搜索过程中得到过的最大分数 maxScoreRmBlocks []bool // 最大分数对应的删除方案 - rmBlocks []bool // 当前删除方案 - inversedIndex int // 当前删除方案是从上一次的方案改动哪个flag而来的 - lastScore float64 // 上一次方案的分数 + rmBlocks []bool // 当前删除方案 + inversedIndex int // 当前删除方案是从上一次的方案改动哪个flag而来的 + lastDisasterTolerance float64 // 上一次方案的容灾度 + lastSpaceCost float64 // 上一次方案的冗余度 + lastMinAccessCost float64 // 上一次方案的最小访问费用 + lastScore float64 // 上一次方案的分数 } type objectBlock struct { @@ -464,8 +475,11 @@ type combinatorialTreeNode struct { } type annealingSolution struct { - blockList []objectBlock // 所有节点的块分布情况 - rmBlocks []bool // 要删除哪些块 + blockList []objectBlock // 所有节点的块分布情况 + rmBlocks []bool // 要删除哪些块 + disasterTolerance float64 // 本方案的容灾度 + spaceCost float64 // 本方案的冗余度 + minAccessCost float64 // 本方案的最小访问费用 } func (t *CleanPinned) startAnnealing(allStgInfos map[cdssdk.StorageID]*stgmod.StorageDetail, readerStgIDs []cdssdk.StorageID, object annealingObject) annealingSolution { @@ -529,8 +543,11 @@ func (t *CleanPinned) startAnnealing(allStgInfos map[cdssdk.StorageID]*stgmod.St } // fmt.Printf("final: %v\n", state.maxScoreRmBlocks) return annealingSolution{ - blockList: state.blockList, - rmBlocks: state.maxScoreRmBlocks, + blockList: state.blockList, + rmBlocks: state.maxScoreRmBlocks, + disasterTolerance: state.lastDisasterTolerance, + spaceCost: state.lastSpaceCost, + minAccessCost: state.lastMinAccessCost, } } @@ -640,6 +657,10 @@ func (t *CleanPinned) calcScore(state *annealingState) float64 { ac := t.calcMinAccessCost(state) sc := t.calcSpaceCost(state) + state.lastDisasterTolerance = dt + state.lastMinAccessCost = ac + state.lastSpaceCost = sc + dtSc := 1.0 if dt < 1 { dtSc = 0 @@ -730,6 +751,11 @@ func (t *CleanPinned) makePlansForRepObject(allStgInfos map[cdssdk.StorageID]*st Redundancy: obj.Object.Redundancy, } + ft := ioswitch2.NewFromTo() + + fromStg := allStgInfos[obj.Blocks[0].StorageID] + ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *fromStg.MasterHub, *fromStg, ioswitch2.RawStream())) + for i, f := range solu.rmBlocks { hasCache := lo.ContainsBy(obj.Blocks, func(b stgmod.ObjectBlock) bool { return b.StorageID == solu.blockList[i].StorageID }) || lo.ContainsBy(obj.PinnedAt, func(n cdssdk.StorageID) bool { return n == solu.blockList[i].StorageID }) @@ -738,18 +764,9 @@ func (t *CleanPinned) makePlansForRepObject(allStgInfos map[cdssdk.StorageID]*st if !willRm { // 如果对象在退火后要保留副本的节点没有副本,则需要在这个节点创建副本 if !hasCache { - ft := ioswitch2.NewFromTo() - - fromStg := allStgInfos[obj.Blocks[0].StorageID] - ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *fromStg.MasterHub, *fromStg, ioswitch2.RawStream())) toStg := allStgInfos[solu.blockList[i].StorageID] ft.AddTo(ioswitch2.NewToShardStore(*toStg.MasterHub, *toStg, ioswitch2.RawStream(), fmt.Sprintf("%d.0", obj.Object.ObjectID))) - err := parser.Parse(ft, planBld) - if err != nil { - // TODO 错误处理 - continue - } planningHubIDs[solu.blockList[i].StorageID] = true } entry.Blocks = append(entry.Blocks, stgmod.ObjectBlock{ @@ -761,9 +778,72 @@ func (t *CleanPinned) makePlansForRepObject(allStgInfos map[cdssdk.StorageID]*st } } + err := parser.Parse(ft, planBld) + if err != nil { + // TODO 错误处理 + } + return entry } +func (t *CleanPinned) generateSysEventForRepObject(solu annealingSolution, obj stgmod.ObjectDetail) []stgmod.SysEventBody { + var blockChgs []stgmod.BlockChange + + for i, f := range solu.rmBlocks { + hasCache := lo.ContainsBy(obj.Blocks, func(b stgmod.ObjectBlock) bool { return b.StorageID == solu.blockList[i].StorageID }) || + lo.ContainsBy(obj.PinnedAt, func(n cdssdk.StorageID) bool { return n == solu.blockList[i].StorageID }) + willRm := f + + if !willRm { + // 如果对象在退火后要保留副本的节点没有副本,则需要在这个节点创建副本 + if !hasCache { + blockChgs = append(blockChgs, &stgmod.BlockChangeClone{ + BlockType: stgmod.BlockTypeRaw, + SourceStorageID: obj.Blocks[0].StorageID, + TargetStorageID: solu.blockList[i].StorageID, + }) + } + } else { + blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{ + Index: 0, + StorageID: solu.blockList[i].StorageID, + }) + } + } + + transEvt := &stgmod.BodyBlockTransfer{ + ObjectID: obj.Object.ObjectID, + PackageID: obj.Object.PackageID, + BlockChanges: blockChgs, + } + + var blockDist []stgmod.BlockDistributionObjectInfo + for i, f := range solu.rmBlocks { + if !f { + blockDist = append(blockDist, stgmod.BlockDistributionObjectInfo{ + BlockType: stgmod.BlockTypeRaw, + Index: 0, + StorageID: solu.blockList[i].StorageID, + }) + } + } + + distEvt := &stgmod.BodyBlockDistribution{ + ObjectID: obj.Object.ObjectID, + PackageID: obj.Object.PackageID, + Path: obj.Object.Path, + Size: obj.Object.Size, + FileHash: obj.Object.FileHash, + FaultTolerance: solu.disasterTolerance, + Redundancy: solu.spaceCost, + AvgAccessCost: 0, // TODO 计算平均访问代价,从日常访问数据中统计 + BlockDistribution: blockDist, + // TODO 不好计算传输量 + } + + return []stgmod.SysEventBody{transEvt, distEvt} +} + func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stgmod.StorageDetail, solu annealingSolution, obj stgmod.ObjectDetail, planBld *exec.PlanBuilder, planningHubIDs map[cdssdk.StorageID]bool) coormq.UpdatingObjectRedundancy { entry := coormq.UpdatingObjectRedundancy{ ObjectID: obj.Object.ObjectID, @@ -797,6 +877,7 @@ func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stg ecRed := obj.Object.Redundancy.(*cdssdk.ECRedundancy) for id, idxs := range reconstrct { + // 依次生成每个节点上的执行计划,因为如果放到一个计划里一起生成,不能保证每个节点上的块用的都是本节点上的副本 ft := ioswitch2.NewFromTo() ft.ECParam = ecRed ft.AddFrom(ioswitch2.NewFromShardstore(obj.Object.FileHash, *allStgInfos[id].MasterHub, *allStgInfos[id], ioswitch2.RawStream())) @@ -816,6 +897,85 @@ func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stg return entry } +func (t *CleanPinned) generateSysEventForECObject(solu annealingSolution, obj stgmod.ObjectDetail) []stgmod.SysEventBody { + var blockChgs []stgmod.BlockChange + + reconstrct := make(map[cdssdk.StorageID]*[]int) + for i, f := range solu.rmBlocks { + block := solu.blockList[i] + if !f { + // 如果这个块是影子块,那么就要从完整对象里重建这个块 + if !block.HasEntity { + re, ok := reconstrct[block.StorageID] + if !ok { + re = &[]int{} + reconstrct[block.StorageID] = re + } + + *re = append(*re, block.Index) + } + } else { + blockChgs = append(blockChgs, &stgmod.BlockChangeDeleted{ + Index: block.Index, + StorageID: block.StorageID, + }) + } + } + + // 由于每一个需要被重建的块都是从同中心的副本里构建出来的,所以对于每一个中心都要产生一个BlockChangeEnDecode + for id, idxs := range reconstrct { + var tarBlocks []stgmod.Block + for _, idx := range *idxs { + tarBlocks = append(tarBlocks, stgmod.Block{ + BlockType: stgmod.BlockTypeEC, + Index: idx, + StorageID: id, + }) + } + blockChgs = append(blockChgs, &stgmod.BlockChangeEnDecode{ + SourceBlocks: []stgmod.Block{{ + BlockType: stgmod.BlockTypeRaw, + Index: 0, + StorageID: id, // 影子块的原始对象就在同一个节点上 + }}, + TargetBlocks: tarBlocks, + // 传输量为0 + }) + } + + transEvt := &stgmod.BodyBlockTransfer{ + ObjectID: obj.Object.ObjectID, + PackageID: obj.Object.PackageID, + BlockChanges: blockChgs, + } + + var blockDist []stgmod.BlockDistributionObjectInfo + for i, f := range solu.rmBlocks { + if !f { + blockDist = append(blockDist, stgmod.BlockDistributionObjectInfo{ + BlockType: stgmod.BlockTypeEC, + Index: solu.blockList[i].Index, + StorageID: solu.blockList[i].StorageID, + }) + } + } + + distEvt := &stgmod.BodyBlockDistribution{ + ObjectID: obj.Object.ObjectID, + PackageID: obj.Object.PackageID, + Path: obj.Object.Path, + Size: obj.Object.Size, + FileHash: obj.Object.FileHash, + FaultTolerance: solu.disasterTolerance, + Redundancy: solu.spaceCost, + AvgAccessCost: 0, // TODO 计算平均访问代价,从日常访问数据中统计 + BlockDistribution: blockDist, + // TODO 不好计算传输量 + } + + return []stgmod.SysEventBody{transEvt, distEvt} +} + func (t *CleanPinned) executePlans(ctx ExecuteContext, planBld *exec.PlanBuilder, planningStgIDs map[cdssdk.StorageID]bool) (map[string]exec.VarValue, error) { // 统一加锁,有重复也没关系 lockBld := reqbuilder.NewBuilder() diff --git a/scanner/internal/event/event.go b/scanner/internal/event/event.go index 900d404..c20efb5 100644 --- a/scanner/internal/event/event.go +++ b/scanner/internal/event/event.go @@ -10,12 +10,14 @@ import ( "gitlink.org.cn/cloudream/storage/common/pkgs/db2" scevt "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner/event" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" ) type ExecuteArgs struct { DB *db2.DB DistLock *distlock.Service StgMgr *agtpool.AgentPool + EvtPub *sysevent.Publisher } type Executor = event.Executor[ExecuteArgs] @@ -26,11 +28,12 @@ type Event = event.Event[ExecuteArgs] type ExecuteOption = event.ExecuteOption -func NewExecutor(db *db2.DB, distLock *distlock.Service, stgAgts *agtpool.AgentPool) Executor { +func NewExecutor(db *db2.DB, distLock *distlock.Service, stgAgts *agtpool.AgentPool, evtPub *sysevent.Publisher) Executor { return event.NewExecutor(ExecuteArgs{ DB: db, DistLock: distLock, StgMgr: stgAgts, + EvtPub: evtPub, }) } diff --git a/scanner/main.go b/scanner/main.go index c9de497..80510ef 100644 --- a/scanner/main.go +++ b/scanner/main.go @@ -1,16 +1,19 @@ package main import ( + "context" "fmt" "os" "gitlink.org.cn/cloudream/common/pkgs/logger" stgglb "gitlink.org.cn/cloudream/storage/common/globals" + stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/db2" "gitlink.org.cn/cloudream/storage/common/pkgs/distlock" agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent" scmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/scanner" "gitlink.org.cn/cloudream/storage/common/pkgs/storage/agtpool" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" "gitlink.org.cn/cloudream/storage/scanner/internal/config" "gitlink.org.cn/cloudream/storage/scanner/internal/event" "gitlink.org.cn/cloudream/storage/scanner/internal/mq" @@ -50,8 +53,16 @@ func main() { // 启动存储服务管理器 stgAgts := agtpool.NewPool() + // 初始化系统事件发布器 + evtPub, err := sysevent.NewPublisher(sysevent.ConfigFromMQConfig(config.Cfg().RabbitMQ), &stgmod.SourceScanner{}) + if err != nil { + logger.Errorf("new sysevent publisher: %v", err) + os.Exit(1) + } + go servePublisher(evtPub) + // 启动事件执行器 - eventExecutor := event.NewExecutor(db, distlockSvc, stgAgts) + eventExecutor := event.NewExecutor(db, distlockSvc, stgAgts, evtPub) go serveEventExecutor(&eventExecutor) agtSvr, err := scmq.NewServer(mq.NewService(&eventExecutor), config.Cfg().RabbitMQ) @@ -88,6 +99,41 @@ func serveEventExecutor(executor *event.Executor) { os.Exit(1) } +func servePublisher(evtPub *sysevent.Publisher) { + logger.Info("start serving sysevent publisher") + + ch := evtPub.Start() + +loop: + for { + val, err := ch.Receive().Wait(context.Background()) + if err != nil { + logger.Errorf("sysevent publisher stopped with error: %s", err.Error()) + break + } + + switch val := val.(type) { + case sysevent.PublishError: + logger.Errorf("publishing event: %v", val) + + case sysevent.PublisherExited: + if val.Err != nil { + logger.Errorf("publisher exited with error: %v", val.Err) + } else { + logger.Info("publisher exited") + } + break loop + + case sysevent.OtherError: + logger.Errorf("sysevent: %v", val) + } + } + logger.Info("sysevent publisher stopped") + + // TODO 仅简单结束了程序 + os.Exit(1) +} + func serveScannerServer(server *scmq.Server) { logger.Info("start serving scanner server")