From bafc6fe38536a2003af544a6381ea8c3d036d4fb Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 24 Jan 2025 17:45:59 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E6=95=B0=E6=8D=AE=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/models/datamap.go | 356 +++++++++++++++++++++++++------ common/pkgs/sysevent/sysevent.go | 10 +- 2 files changed, 292 insertions(+), 74 deletions(-) diff --git a/common/models/datamap.go b/common/models/datamap.go index 41fae27..a363c44 100644 --- a/common/models/datamap.go +++ b/common/models/datamap.go @@ -1,78 +1,174 @@ package stgmod import ( + "time" + "gitlink.org.cn/cloudream/common/pkgs/types" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/serder" - "time" ) -type Source interface { +// 系统事件 +type SysEvent struct { + Timestamp time.Time `json:"timestamp"` + Source SysEventSource `json:"source"` + Category string `json:"category"` + Body SysEventBody `json:"body"` } -type Body interface { +// 事件源 +type SysEventSource interface { + GetSourceType() string } -var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Source]( +var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SysEventSource]( (*SourceCoordinator)(nil), (*SourceScanner)(nil), + (*SourceHub)(nil), )), "type") type SourceCoordinator struct { - serder.Metadata `union:"SourceCoordinator"` + serder.Metadata `union:"Coordinator"` Type string `json:"type"` } +func (s *SourceCoordinator) GetSourceType() string { + return "Coordinator" +} + type SourceScanner struct { - serder.Metadata `union:"SourceScanner"` + serder.Metadata `union:"Scanner"` + Type string `json:"type"` +} + +func (s *SourceScanner) GetSourceType() string { + return "Scanner" +} + +type SourceHub struct { + serder.Metadata `union:"Hub"` Type string `json:"type"` HubID cdssdk.HubID `json:"hubID"` HubName string `json:"hubName"` } -var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Body]( - (*BodyHubInfo)(nil), - (*BodyStorageInfo)(nil), +func (s *SourceHub) GetSourceType() string { + return "Hub" +} + +// 事件体 +type SysEventBody interface { + GetBodyType() string +} + +var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[SysEventBody]( + (*BodyNewHub)(nil), + (*BodyHubUpdated)(nil), + (*BodyHubDeleted)(nil), + + (*BodyNewStorage)(nil), + (*BodyStorageUpdated)(nil), + (*BodyStorageDeleted)(nil), + (*BodyStorageStats)(nil), (*BodyHubTransferStats)(nil), (*BodyHubStorageTransferStats)(nil), (*BodyBlockTransfer)(nil), (*BodyBlockDistribution)(nil), - (*BodyObjectChange)(nil), - (*BodyPackageChange)(nil), - (*BodyBucketChange)(nil), + + (*BodyNewObject)(nil), + (*BodyObjectUpdated)(nil), + (*BodyObjectDeleted)(nil), + + (*BodyNewPackage)(nil), + (*BodyPackageDeleted)(nil), + + (*BodyNewBucket)(nil), + (*BodyBucketDeleted)(nil), )), "type") -type BodyHubInfo struct { - serder.Metadata `union:"BodyHubInfo"` - HubID cdssdk.HubID `json:"hubID"` - HubInfo cdssdk.Hub `json:"hubInfo"` +// 新增Hub的事件 +type BodyNewHub struct { + serder.Metadata `union:"NewHub"` + Type string `json:"type"` + Info cdssdk.Hub `json:"info"` +} + +func (b *BodyNewHub) GetBodyType() string { + return "NewHub" +} + +// Hub信息更新的事件 +type BodyHubUpdated struct { + serder.Metadata `union:"HubUpdated"` + Type string `json:"type"` + Info cdssdk.Hub `json:"info"` +} + +func (b *BodyHubUpdated) GetBodyType() string { + return "HubUpdated" +} + +// Hub删除的事件 +type BodyHubDeleted struct { + serder.Metadata `union:"HubDeleted"` Type string `json:"type"` + HubID cdssdk.HubID `json:"hubID"` } -type BodyStorageInfo struct { - serder.Metadata `union:"BodyStorageInfo"` - StorageID cdssdk.StorageID `json:"storageID"` - StorageInfo cdssdk.Storage `json:"storageInfo"` +func (b *BodyHubDeleted) GetBodyType() string { + return "HubDeleted" +} + +// 新增Storage的事件 +type BodyNewStorage struct { + serder.Metadata `union:"NewStorage"` + Info cdssdk.Storage `json:"info"` + Type string `json:"type"` +} + +func (b *BodyNewStorage) GetBodyType() string { + return "NewStorage" +} + +// Storage信息更新的事件 +type BodyStorageUpdated struct { + serder.Metadata `union:"StorageUpdated"` + Type string `json:"type"` + Info cdssdk.Storage `json:"info"` +} + +func (b *BodyStorageUpdated) GetBodyType() string { + return "StorageUpdated" +} + +// Storage删除的事件 +type BodyStorageDeleted struct { + serder.Metadata `union:"StorageDeleted"` Type string `json:"type"` + StorageID cdssdk.StorageID `json:"storageID"` +} + +func (b *BodyStorageDeleted) GetBodyType() string { + return "StorageDeleted" } + +// Storage统计信息的事件 type BodyStorageStats struct { - serder.Metadata `union:"BodyStorageStats"` + serder.Metadata `union:"StorageStats"` + Type string `json:"type"` StorageID cdssdk.StorageID `json:"storageID"` DataCount int64 `json:"dataCount"` } -type DataTrans struct { - TotalTransfer int64 `json:"totalTransfer"` - RequestCount int64 `json:"requestCount"` - FailedRequestCount int64 `json:"failedRequestCount"` - AvgTransfer int64 `json:"avgTransfer"` - MaxTransfer int64 `json:"maxTransfer"` - MinTransfer int64 `json:"minTransfer"` +func (b *BodyStorageStats) GetBodyType() string { + return "StorageStats" } +// Hub数据传输统计信息的事件 type BodyHubTransferStats struct { - serder.Metadata `union:"BodyHubTransferStats"` + serder.Metadata `union:"HubTransferStats"` + Type string `json:"type"` SourceHubID cdssdk.HubID `json:"sourceHubID"` TargetHubID cdssdk.HubID `json:"targetHubID"` Send DataTrans `json:"send"` @@ -80,8 +176,23 @@ type BodyHubTransferStats struct { EndTimestamp time.Time `json:"endTimestamp"` } +func (b *BodyHubTransferStats) GetBodyType() string { + return "HubTransferStats" +} + +type DataTrans struct { + TotalTransfer int64 `json:"totalTransfer"` + RequestCount int64 `json:"requestCount"` + FailedRequestCount int64 `json:"failedRequestCount"` + AvgTransfer int64 `json:"avgTransfer"` + MaxTransfer int64 `json:"maxTransfer"` + MinTransfer int64 `json:"minTransfer"` +} + +// Hub和Storage数据传输统计信息的事件 type BodyHubStorageTransferStats struct { - serder.Metadata `union:"BodyHubStorageTransferStats"` + serder.Metadata `union:"HubStorageTransferStats"` + Type string `json:"type"` HubID cdssdk.HubID `json:"hubID"` StorageID cdssdk.StorageID `json:"storageID"` Send DataTrans `json:"send"` @@ -90,6 +201,35 @@ type BodyHubStorageTransferStats struct { EndTimestamp time.Time `json:"endTimestamp"` } +func (b *BodyHubStorageTransferStats) GetBodyType() string { + return "HubStorageTransferStats" +} + +// 块传输的事件 +type BodyBlockTransfer struct { + serder.Metadata `union:"BlockTransfer"` + Type string `json:"type"` + ObjectID cdssdk.ObjectID `json:"objectID"` + PackageID cdssdk.PackageID `json:"packageID"` + BlockChanges []BlockChange `json:"blockChanges"` +} + +func (b *BodyBlockTransfer) GetBodyType() string { + return "BlockTransfer" +} + +// 块变化类型 +type BlockChange interface { + GetBlockChangeType() string +} + +var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[BlockChange]( + (*BlockChangeClone)(nil), + (*BlockChangeDeleted)(nil), + (*BlockChangeEnDecode)(nil), + // (*BlockChangeUpdated)(nil), +)), "type") + type Block struct { BlockType string `json:"blockType"` Index string `json:"index"` @@ -101,34 +241,58 @@ type DataTransfer struct { DataTransferCount string `json:"dataTransferCount"` } -type BlockChange struct { +type BlockChangeClone struct { + serder.Metadata `union:"BlockChangeClone"` Type string `json:"type"` BlockType string `json:"blockType"` Index string `json:"index"` SourceStorageID cdssdk.StorageID `json:"sourceStorageID"` TargetStorageID cdssdk.StorageID `json:"targetStorageID"` DataTransferCount string `json:"dataTransferCount"` - Timestamp time.Time `json:"timestamp"` - SourceBlocks []Block `json:"sourceBlocks,omitempty"` - TargetBlocks []Block `json:"targetBlocks,omitempty"` - DataTransfers []DataTransfer `json:"dataTransfers,omitempty"` - Blocks []Block `json:"blocks,omitempty"` } -type BodyBlockTransfer struct { - serder.Metadata `union:"BodyBlockTransfer"` - Type string `json:"type"` - ObjectID cdssdk.ObjectID `json:"objectID"` - PackageID cdssdk.PackageID `json:"packageID"` - BlockChanges []BlockChange `json:"blockChanges"` + +func (b *BlockChangeClone) GetBlockChangeType() string { + return "Clone" } -type BlockDistributionObjectInfo struct { - Type string `json:"type"` - Index string `json:"index"` - StorageID string `json:"storageID"` +type BlockChangeDeleted struct { + serder.Metadata `union:"BlockChangeDeleted"` + Type string `json:"type"` + Index string `json:"index"` + StorageID string `json:"storageID"` +} + +func (b *BlockChangeDeleted) GetBlockChangeType() string { + return "Deleted" +} + +type BlockChangeEnDecode struct { + serder.Metadata `union:"BlockChangeEnDecode"` + Type string `json:"type"` + SourceBlocks []Block `json:"sourceBlocks,omitempty"` + TargetBlocks []Block `json:"targetBlocks,omitempty"` + DataTransfers []DataTransfer `json:"dataTransfers,omitempty"` +} + +func (b *BlockChangeEnDecode) GetBlockChangeType() string { + return "EnDecode" } -type BlockDistributionObject struct { +// TODO 块更新应该是说对象被重新上传,此时事件内应该包含全部对象块的信息,因此应该使用ObjectUpdated事件 +// type BlockChangeUpdated struct { +// serder.Metadata `union:"BlockChangeUpdated"` +// Type string `json:"type"` +// Blocks []Block `json:"blocks"` +// } + +// func (b *BlockChangeUpdated) GetBlockChangeType() string { +// return "Updated" +// } + +// 块分布的事件 +type BodyBlockDistribution struct { + serder.Metadata `union:"BlockDistribution"` + Type string `json:"type"` ObjectID cdssdk.ObjectID `json:"objectID"` PackageID cdssdk.PackageID `json:"packageID"` Path string `json:"path"` @@ -141,33 +305,91 @@ type BlockDistributionObject struct { DataTransfers []DataTransfer `json:"dataTransfers"` } -type BodyBlockDistribution struct { - serder.Metadata `union:"BodyBlockDistribution"` - Timestamp time.Time `json:"timestamp"` - Object BlockDistributionObject `json:"object,omitempty"` +func (b *BodyBlockDistribution) GetBodyType() string { + return "BlockDistribution" } -type BodyObjectChange struct { - serder.Metadata `union:"BodyObjectChange"` + +type BlockDistributionObjectInfo struct { + Type string `json:"type"` + Index string `json:"index"` + StorageID string `json:"storageID"` +} + +// 新增Object的事件 +type BodyNewObject struct { + serder.Metadata `union:"NewObject"` Type string `json:"type"` - ObjectID cdssdk.ObjectID `json:"objectID"` - PackageID cdssdk.PackageID `json:"packageID"` - Path string `json:"path"` - Size int `json:"size"` + Info cdssdk.Object `json:"info"` BlockDistribution []BlockDistributionObjectInfo `json:"blockDistribution"` - Timestamp time.Time `json:"timestamp"` } -type BodyPackageChange struct { - serder.Metadata `union:"BodyPackageChange"` + +func (b *BodyNewObject) GetBodyType() string { + return "NewObject" +} + +// Object更新的事件 +type BodyObjectUpdated struct { + serder.Metadata `union:"ObjectUpdated"` + Type string `json:"type"` + Info cdssdk.Object `json:"info"` + BlockDistribution []BlockDistributionObjectInfo `json:"blockDistribution"` +} + +func (b *BodyObjectUpdated) GetBodyType() string { + return "ObjectUpdated" +} + +// Object删除的事件 +type BodyObjectDeleted struct { + serder.Metadata `union:"ObjectDeleted"` + Type string `json:"type"` + ObjectID cdssdk.ObjectID `json:"objectID"` +} + +func (b *BodyObjectDeleted) GetBodyType() string { + return "ObjectDeleted" +} + +// 新增Package的事件 +type BodyNewPackage struct { + serder.Metadata `union:"NewPackage"` + Type string `json:"type"` + Info cdssdk.Package `json:"info"` +} + +func (b *BodyNewPackage) GetBodyType() string { + return "NewPackage" +} + +// Package删除的事件 +type BodyPackageDeleted struct { + serder.Metadata `union:"PackageDeleted"` Type string `json:"type"` PackageID cdssdk.PackageID `json:"packageID"` - PackageName string `json:"packageName"` - BucketID cdssdk.BucketID `json:"bucketID"` - Timestamp time.Time `json:"timestamp"` } -type BodyBucketChange struct { - serder.Metadata `union:"BodyBucketChange"` + +func (b *BodyPackageDeleted) GetBodyType() string { + return "PackageDeleted" +} + +// 新增Bucket的事件 +type BodyNewBucket struct { + serder.Metadata `union:"NewBucket"` + Type string `json:"type"` + Info cdssdk.Bucket `json:"info"` +} + +func (b *BodyNewBucket) GetBodyType() string { + return "NewBucket" +} + +// Bucket删除的事件 +type BodyBucketDeleted struct { + serder.Metadata `union:"BucketDeleted"` Type string `json:"type"` BucketID cdssdk.BucketID `json:"bucketID"` - BucketName string `json:"bucketName"` - Timestamp time.Time `json:"timestamp"` +} + +func (b *BodyBucketDeleted) GetBodyType() string { + return "BucketDeleted" } diff --git a/common/pkgs/sysevent/sysevent.go b/common/pkgs/sysevent/sysevent.go index 41a1f8f..93c2a62 100644 --- a/common/pkgs/sysevent/sysevent.go +++ b/common/pkgs/sysevent/sysevent.go @@ -2,16 +2,12 @@ package sysevent import ( stgmod "gitlink.org.cn/cloudream/storage/common/models" - "time" ) const ( SysEventQueueName = "SysEventQueue" ) -type SysEvent struct { - Timestamp time.Time `json:"timestamp"` - Source stgmod.Source `json:"source"` - Category string `json:"category"` - Body stgmod.Body `json:"body"` -} +type SysEvent = stgmod.SysEvent + +type Source = stgmod.SysEventSource