diff --git a/common/models/datamap.go b/common/models/datamap.go index 54c1f3a..41fae27 100644 --- a/common/models/datamap.go +++ b/common/models/datamap.go @@ -1,71 +1,67 @@ package stgmod import ( + "gitlink.org.cn/cloudream/common/pkgs/types" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/serder" "time" ) -// HubInfo 节点信息 - -type SourceHub struct { - Type string `json:"type"` -} - -type HubInfoBody struct { - HubID int `json:"hubID"` - HubInfo cdssdk.Hub `json:"hubInfo"` - Type string `json:"type"` +type Source interface { } -type HubInfo struct { - Timestamp time.Time `json:"timestamp"` - Source SourceHub `json:"source"` - Category string `json:"category"` - Body HubInfoBody `json:"body"` +type Body interface { } -//StorageInfo 节点信息 - -type SourceStorage struct { - Type string `json:"type"` -} +var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Source]( + (*SourceCoordinator)(nil), + (*SourceScanner)(nil), +)), "type") -type StorageInfoBody struct { - StorageID int64 `json:"storageID"` - StorageInfo cdssdk.Storage `json:"storageInfo"` - Type string `json:"type"` +type SourceCoordinator struct { + serder.Metadata `union:"SourceCoordinator"` + Type string `json:"type"` } -type StorageInfo struct { - Timestamp time.Time `json:"timestamp"` - Source SourceStorage `json:"source"` - Category string `json:"category"` - Body StorageInfoBody `json:"body"` +type SourceScanner struct { + serder.Metadata `union:"SourceScanner"` + Type string `json:"type"` + HubID cdssdk.HubID `json:"hubID"` + HubName string `json:"hubName"` } -// StorageStats 节点信息数据 +var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[Body]( + (*BodyHubInfo)(nil), + (*BodyStorageInfo)(nil), + (*BodyStorageStats)(nil), + (*BodyHubTransferStats)(nil), + (*BodyHubStorageTransferStats)(nil), + (*BodyBlockTransfer)(nil), + (*BodyBlockDistribution)(nil), + (*BodyObjectChange)(nil), + (*BodyPackageChange)(nil), + (*BodyBucketChange)(nil), +)), "type") -type Source struct { - Type string `json:"type"` - HubID string `json:"hubID"` - HubName string `json:"hubName"` +type BodyHubInfo struct { + serder.Metadata `union:"BodyHubInfo"` + HubID cdssdk.HubID `json:"hubID"` + HubInfo cdssdk.Hub `json:"hubInfo"` + Type string `json:"type"` } -type StorageStatsBody struct { - StorageID int64 `json:"storageID"` - DataCount int64 `json:"dataCount"` +type BodyStorageInfo struct { + serder.Metadata `union:"BodyStorageInfo"` + StorageID cdssdk.StorageID `json:"storageID"` + StorageInfo cdssdk.Storage `json:"storageInfo"` + Type string `json:"type"` } - -type StorageStats struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body StorageStatsBody `json:"body"` +type BodyStorageStats struct { + serder.Metadata `union:"BodyStorageStats"` + StorageID cdssdk.StorageID `json:"storageID"` + DataCount int64 `json:"dataCount"` } -// HubTransferStats 节点传输信息 -// 每天一次,各节点统计自身当天向外部各个节点传输的总数据量 - type DataTrans struct { TotalTransfer int64 `json:"totalTransfer"` RequestCount int64 `json:"requestCount"` @@ -74,87 +70,58 @@ type DataTrans struct { MaxTransfer int64 `json:"maxTransfer"` MinTransfer int64 `json:"minTransfer"` } -type HubTransferStatsBody struct { - SourceHubID int64 `json:"sourceHubID"` - TargetHubID int64 `json:"targetHubID"` - Send DataTrans `json:"send"` - StartTimestamp time.Time `json:"startTimestamp"` - EndTimestamp time.Time `json:"endTimestamp"` -} -type HubTransferStats struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body HubTransferStatsBody `json:"body"` -} -//hubStorageTransferStats 节点-中心间传输信息 - -type HubStorageTransferStatsBody struct { - HubID int64 `json:"hubID"` - StorageID int64 `json:"storageID"` - Send DataTrans `json:"send"` - Receive DataTrans `json:"receive"` - StartTimestamp time.Time `json:"startTimestamp"` - EndTimestamp time.Time `json:"endTimestamp"` +type BodyHubTransferStats struct { + serder.Metadata `union:"BodyHubTransferStats"` + SourceHubID cdssdk.HubID `json:"sourceHubID"` + TargetHubID cdssdk.HubID `json:"targetHubID"` + Send DataTrans `json:"send"` + StartTimestamp time.Time `json:"startTimestamp"` + EndTimestamp time.Time `json:"endTimestamp"` } -type HubStorageTransferStats struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body HubStorageTransferStatsBody `json:"body"` +type BodyHubStorageTransferStats struct { + serder.Metadata `union:"BodyHubStorageTransferStats"` + HubID cdssdk.HubID `json:"hubID"` + StorageID cdssdk.StorageID `json:"storageID"` + Send DataTrans `json:"send"` + Receive DataTrans `json:"receive"` + StartTimestamp time.Time `json:"startTimestamp"` + EndTimestamp time.Time `json:"endTimestamp"` } -// blockTransfer 块传输信息 -/*实时日志,当对象的块信息发生变化时推送,共有4种变化类型: -拷贝 -编解码(一变多、多变一、多变多) -删除 -更新*/ - type Block struct { - BlockType string `json:"blockType"` - Index string `json:"index"` - StorageID string `json:"storageID"` + BlockType string `json:"blockType"` + Index string `json:"index"` + StorageID cdssdk.StorageID `json:"storageID"` } type DataTransfer struct { - SourceStorageID string `json:"sourceStorageID"` - TargetStorageID string `json:"targetStorageID"` - DataTransferCount string `json:"dataTransferCount"` + SourceStorageID cdssdk.StorageID `json:"sourceStorageID"` + TargetStorageID cdssdk.StorageID `json:"targetStorageID"` + DataTransferCount string `json:"dataTransferCount"` } type BlockChange struct { - Type string `json:"type"` - BlockType string `json:"blockType"` - Index string `json:"index"` - SourceStorageID string `json:"sourceStorageID"` - TargetStorageID string `json:"targetStorageID"` - DataTransferCount string `json:"dataTransferCount"` - Timestamp time.Time `json:"timestamp"` - SourceBlocks []Block `json:"sourceBlocks,omitempty"` - TargetBlocks []Block `json:"targetBlocks,omitempty"` - DataTransfers []DataTransfer `json:"dataTransfers,omitempty"` - Blocks []Block `json:"blocks,omitempty"` -} - -type BlockTransferBody struct { - Type string `json:"type"` - ObjectID string `json:"objectID"` - PackageID string `json:"packageID"` - BlockChanges []BlockChange `json:"blockChanges"` -} - -type BlockTransfer struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body BlockTransferBody `json:"body"` + Type string `json:"type"` + BlockType string `json:"blockType"` + Index string `json:"index"` + SourceStorageID cdssdk.StorageID `json:"sourceStorageID"` + TargetStorageID cdssdk.StorageID `json:"targetStorageID"` + DataTransferCount string `json:"dataTransferCount"` + Timestamp time.Time `json:"timestamp"` + SourceBlocks []Block `json:"sourceBlocks,omitempty"` + TargetBlocks []Block `json:"targetBlocks,omitempty"` + DataTransfers []DataTransfer `json:"dataTransfers,omitempty"` + Blocks []Block `json:"blocks,omitempty"` +} +type BodyBlockTransfer struct { + serder.Metadata `union:"BodyBlockTransfer"` + Type string `json:"type"` + ObjectID cdssdk.ObjectID `json:"objectID"` + PackageID cdssdk.PackageID `json:"packageID"` + BlockChanges []BlockChange `json:"blockChanges"` } -// BlockDistribution 块传输信息 -// 每天一次,在调整完成后,将当天调整前后的布局情况一起推送 - type BlockDistributionObjectInfo struct { Type string `json:"type"` Index string `json:"index"` @@ -162,8 +129,8 @@ type BlockDistributionObjectInfo struct { } type BlockDistributionObject struct { - ObjectID int64 `json:"objectID"` - PackageID int64 `json:"packageID"` + ObjectID cdssdk.ObjectID `json:"objectID"` + PackageID cdssdk.PackageID `json:"packageID"` Path string `json:"path"` Size int64 `json:"size"` FileHash string `json:"fileHash"` @@ -173,64 +140,34 @@ type BlockDistributionObject struct { BlockDistribution []BlockDistributionObjectInfo `json:"blockDistribution"` DataTransfers []DataTransfer `json:"dataTransfers"` } -type BlockDistributionBody struct { - Timestamp time.Time `json:"timestamp"` - Object BlockDistributionObject `json:"object,omitempty"` -} - -type BlockDistribution struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body BlockDistributionBody `json:"body"` -} - -// ObjectChange Object变化信息 - -type ObjectChangeBody struct { - Type string `json:"type"` - ObjectID string `json:"objectID"` - PackageID string `json:"packageID"` - Path string `json:"path"` - Size int `json:"size"` - BlockDistribution []BlockDistribution `json:"blockDistribution"` - Timestamp time.Time `json:"timestamp"` -} -type ObjectChange struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body ObjectChangeBody `json:"body"` -} -// PackageChange package变化信息 - -type PackageChangeBody struct { - Type string `json:"type"` - PackageID string `json:"packageID"` - PackageName string `json:"packageName"` - BucketID string `json:"bucketID"` - Timestamp string `json:"timestamp"` -} -type PackageChange struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body PackageChangeBody `json:"body"` -} - -// BucketChange bucket变化信息 - -type BucketChangeBody struct { - Type string `json:"type"` - BucketID string `json:"bucketID"` - BucketName string `json:"bucketName"` - Timestamp time.Time `json:"timestamp"` +type BodyBlockDistribution struct { + serder.Metadata `union:"BodyBlockDistribution"` + Timestamp time.Time `json:"timestamp"` + Object BlockDistributionObject `json:"object,omitempty"` } - -type BucketChange struct { - Timestamp time.Time `json:"timestamp"` - Source Source `json:"source"` - Category string `json:"category"` - Body BucketChangeBody `json:"body"` +type BodyObjectChange struct { + serder.Metadata `union:"BodyObjectChange"` + Type string `json:"type"` + ObjectID cdssdk.ObjectID `json:"objectID"` + PackageID cdssdk.PackageID `json:"packageID"` + Path string `json:"path"` + Size int `json:"size"` + BlockDistribution []BlockDistributionObjectInfo `json:"blockDistribution"` + Timestamp time.Time `json:"timestamp"` +} +type BodyPackageChange struct { + serder.Metadata `union:"BodyPackageChange"` + Type string `json:"type"` + PackageID cdssdk.PackageID `json:"packageID"` + PackageName string `json:"packageName"` + BucketID cdssdk.BucketID `json:"bucketID"` + Timestamp time.Time `json:"timestamp"` +} +type BodyBucketChange struct { + serder.Metadata `union:"BodyBucketChange"` + Type string `json:"type"` + BucketID cdssdk.BucketID `json:"bucketID"` + BucketName string `json:"bucketName"` + Timestamp time.Time `json:"timestamp"` } diff --git a/common/pkgs/sysevent/sysevent.go b/common/pkgs/sysevent/sysevent.go index 082b93e..41a1f8f 100644 --- a/common/pkgs/sysevent/sysevent.go +++ b/common/pkgs/sysevent/sysevent.go @@ -1,9 +1,17 @@ package sysevent +import ( + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "time" +) + const ( SysEventQueueName = "SysEventQueue" ) -type SysEvent = any // TODO 换成具体的类型 - -type Source = any // TODO 换成具体的类型 +type SysEvent struct { + Timestamp time.Time `json:"timestamp"` + Source stgmod.Source `json:"source"` + Category string `json:"category"` + Body stgmod.Body `json:"body"` +} diff --git a/datamap/internal/models/blockdistribution.go b/datamap/internal/models/blockdistribution.go index a3e6e18..cdf4005 100644 --- a/datamap/internal/models/blockdistribution.go +++ b/datamap/internal/models/blockdistribution.go @@ -2,8 +2,9 @@ package models import ( "errors" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "fmt" stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" "gorm.io/gorm" "log" "strconv" @@ -89,90 +90,103 @@ func (r *BlockDistributionRepository) DeleteBlockDistribution(objectID int64, in return r.repo.db.Exec(query, objectID, index, storageID).Error } -// ProcessBlockDistribution mq推送各节点统计自身当前的总数据量时的处理逻辑 +type BlockDistributionWatcher struct { + Name string +} -func ProcessBlockDistribution(data stgmod.BlockDistribution) { +func (w *BlockDistributionWatcher) OnEvent(event sysevent.SysEvent) { repoObject := NewObjectRepository(DB) repoBlock := NewBlockDistributionRepository(DB) repoStorage := NewStorageTransferCountRepository(DB) - //更新object表中的状态 - object, err := repoObject.GetObjectByID(data.Body.Object.ObjectID) - faultTolerance, _ := strconv.ParseFloat(data.Body.Object.FaultTolerance, 64) - redundancy, _ := strconv.ParseFloat(data.Body.Object.Redundancy, 64) - avgAccessCost, _ := strconv.ParseFloat(data.Body.Object.AvgAccessCost, 64) - if errors.Is(err, gorm.ErrRecordNotFound) { - err := repoObject.CreateObject(&Object{ - ObjectID: cdssdk.ObjectID(data.Body.Object.ObjectID), - PackageID: cdssdk.PackageID(data.Body.Object.PackageID), - Path: data.Body.Object.Path, - Size: data.Body.Object.Size, - FileHash: data.Body.Object.FileHash, - Status: StatusYesterdayAfter, - FaultTolerance: faultTolerance, - Redundancy: redundancy, - AvgAccessCost: avgAccessCost, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error create object: %v", err) - } - } else { - object.Status = StatusYesterdayAfter - err = repoObject.UpdateObject(object) - if err != nil { - log.Printf("Error update object: %v", err) - } - } - //更新block表中的状态 - for _, blockDistribution := range data.Body.Object.BlockDistribution { - blockIndex, _ := strconv.ParseInt(blockDistribution.Index, 10, 64) - blockStorageID, _ := strconv.ParseInt(blockDistribution.StorageID, 10, 64) - blockDist, err := repoBlock.GetBlockDistributionByIndex(data.Body.Object.ObjectID, blockIndex, blockStorageID) - if errors.Is(err, gorm.ErrRecordNotFound) { - err := repoBlock.CreateBlockDistribution(&BlockDistribution{ - BlockID: blockDist.BlockID, - ObjectID: blockDist.ObjectID, - Type: blockDistribution.Type, - Index: blockIndex, - StorageID: blockStorageID, - Status: StatusYesterdayAfter, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error create BlockDistribution: %v", err) + if event.Category == "blockDistribution" { + if blockDistribution, ok := event.Body.(*stgmod.BodyBlockDistribution); ok { + + //更新object表中的状态 + + object, err := repoObject.GetObjectByID(int64(blockDistribution.Object.ObjectID)) + faultTolerance, _ := strconv.ParseFloat(blockDistribution.Object.FaultTolerance, 64) + redundancy, _ := strconv.ParseFloat(blockDistribution.Object.Redundancy, 64) + avgAccessCost, _ := strconv.ParseFloat(blockDistribution.Object.AvgAccessCost, 64) + if errors.Is(err, gorm.ErrRecordNotFound) { + err := repoObject.CreateObject(&Object{ + ObjectID: blockDistribution.Object.ObjectID, + PackageID: blockDistribution.Object.PackageID, + Path: blockDistribution.Object.Path, + Size: blockDistribution.Object.Size, + FileHash: blockDistribution.Object.FileHash, + Status: StatusYesterdayAfter, + FaultTolerance: faultTolerance, + Redundancy: redundancy, + AvgAccessCost: avgAccessCost, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create object: %v", err) + } + } else { + object.Status = StatusYesterdayAfter + err = repoObject.UpdateObject(object) + if err != nil { + log.Printf("Error update object: %v", err) + } } - } else { - err := repoBlock.UpdateBlockDistribution(&BlockDistribution{ - BlockID: blockDist.BlockID, - ObjectID: blockDist.ObjectID, - Type: blockDistribution.Type, - Index: blockIndex, - StorageID: blockStorageID, - Status: StatusYesterdayAfter, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error update BlockDistribution: %v", err) + + //更新block表中的状态 + for _, blockDist := range blockDistribution.Object.BlockDistribution { + blockIndex, _ := strconv.ParseInt(blockDist.Index, 10, 64) + blockStorageID, _ := strconv.ParseInt(blockDist.StorageID, 10, 64) + blockDist, err := repoBlock.GetBlockDistributionByIndex(int64(blockDistribution.Object.ObjectID), blockIndex, blockStorageID) + if errors.Is(err, gorm.ErrRecordNotFound) { + err := repoBlock.CreateBlockDistribution(&BlockDistribution{ + BlockID: blockDist.BlockID, + ObjectID: blockDist.ObjectID, + Type: blockDist.Type, + Index: blockIndex, + StorageID: blockStorageID, + Status: StatusYesterdayAfter, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create BlockDistribution: %v", err) + } + } else { + err := repoBlock.UpdateBlockDistribution(&BlockDistribution{ + BlockID: blockDist.BlockID, + ObjectID: blockDist.ObjectID, + Type: blockDist.Type, + Index: blockIndex, + StorageID: blockStorageID, + Status: StatusYesterdayAfter, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error update BlockDistribution: %v", err) + } + } } + //在storageTransferCount表中添加记录 + for _, dataTransfer := range blockDistribution.Object.DataTransfers { + sourceStorageID, _ := strconv.ParseInt(string(dataTransfer.SourceStorageID), 10, 64) + targetStorageID, _ := strconv.ParseInt(string(dataTransfer.TargetStorageID), 10, 64) + dataTransferCount, _ := strconv.ParseInt(dataTransfer.DataTransferCount, 10, 64) + + err := repoStorage.CreateStorageTransferCount(&StorageTransferCount{ + ObjectID: int64(blockDistribution.Object.ObjectID), + Status: StatusTodayBeforeYesterday, + SourceStorageID: sourceStorageID, + TargetStorageID: targetStorageID, + DataTransferCount: dataTransferCount, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create StorageTransferCount : %v", err) + } + } + } else { + fmt.Printf("Watcher %s: Unexpected Body type, expected *BodyStorageInfo, got %T\n", w.Name, event.Body) } - } - //在storageTransferCount表中添加记录 - for _, dataTransfer := range data.Body.Object.DataTransfers { - sourceStorageID, _ := strconv.ParseInt(dataTransfer.SourceStorageID, 10, 64) - targetStorageID, _ := strconv.ParseInt(dataTransfer.TargetStorageID, 10, 64) - dataTransferCount, _ := strconv.ParseInt(dataTransfer.DataTransferCount, 10, 64) - - err := repoStorage.CreateStorageTransferCount(&StorageTransferCount{ - ObjectID: data.Body.Object.ObjectID, - Status: StatusTodayBeforeYesterday, - SourceStorageID: sourceStorageID, - TargetStorageID: targetStorageID, - DataTransferCount: dataTransferCount, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error create StorageTransferCount : %v", err) - } + } else { + fmt.Printf("Watcher %s received an event with category %s\n", w.Name, event.Category) } } diff --git a/datamap/internal/models/blocktransfer.go b/datamap/internal/models/blocktransfer.go index 0bcd351..98e210d 100644 --- a/datamap/internal/models/blocktransfer.go +++ b/datamap/internal/models/blocktransfer.go @@ -2,8 +2,10 @@ package models import ( "errors" + "fmt" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" "gorm.io/gorm" "log" "strconv" @@ -68,160 +70,170 @@ func (r *StorageTransferCountRepository) GetAllStorageTransferCounts() ([]Storag return storageTransferCounts, nil } -// ProcessBlockTransfer 处理块传输信息的实时日志,当对象的块信息发生变化时推送触发数据库刷新 +type BlockTransferWatcher struct { + Name string +} -func ProcessBlockTransfer(data stgmod.BlockTransfer) { +func (w *BlockTransferWatcher) OnEvent(event sysevent.SysEvent) { repoDist := NewBlockDistributionRepository(DB) repoStorage := NewStorageRepository(DB) repoStorageTrans := NewStorageTransferCountRepository(DB) repoObject := NewObjectRepository(DB) - for _, change := range data.Body.BlockChanges { - - objectID, _ := strconv.ParseInt(data.Body.ObjectID, 10, 64) - object, _ := repoObject.GetObjectByID(objectID) - index, _ := strconv.ParseInt(change.Index, 10, 64) - sourceStorageID, _ := strconv.ParseInt(change.SourceStorageID, 10, 64) - targetStorageID, _ := strconv.ParseInt(change.TargetStorageID, 10, 64) - newDataCount, _ := strconv.ParseInt(change.DataTransferCount, 10, 64) - - switch change.Type { - case "0": //拷贝 - //查询出存储在数据库中的BlockDistribution信息 - blockSource, errSource := repoDist.GetBlockDistributionByIndex(objectID, index, sourceStorageID) - //没有记录就将source和target的信息都保存到库中 - if errors.Is(errSource, gorm.ErrRecordNotFound) { - err := repoDist.CreateBlockDistribution(&BlockDistribution{ - ObjectID: objectID, - Type: change.BlockType, - Index: index, - StorageID: sourceStorageID, - Status: StatusNow, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error create source blockdistribution: %v", err) - } - } else { - //有数据则新增一条storageID为targetStorageID的记录,同时更新状态 - err := repoDist.CreateBlockDistribution(&BlockDistribution{ - ObjectID: blockSource.ObjectID, - Type: change.BlockType, - Index: index, - StorageID: targetStorageID, - Status: StatusNow, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error update blockdistribution: %v", err) - } - //复制完成之后增加的dataCount要加到targetStorage的记录中 - storageOld, err := repoStorage.GetStorageByID(targetStorageID) - if errors.Is(err, gorm.ErrRecordNotFound) { - err = repoStorage.CreateStorage(&Storage{ - StorageID: cdssdk.StorageID(targetStorageID), - DataCount: newDataCount, - Timestamp: time.Now(), + if event.Category == "blockTransfer" { + if blockTransfer, ok := event.Body.(*stgmod.BodyBlockTransfer); ok { + + for _, change := range blockTransfer.BlockChanges { + + objectID, _ := strconv.ParseInt(string(blockTransfer.ObjectID), 10, 64) + object, _ := repoObject.GetObjectByID(objectID) + index, _ := strconv.ParseInt(change.Index, 10, 64) + sourceStorageID, _ := strconv.ParseInt(string(change.SourceStorageID), 10, 64) + targetStorageID, _ := strconv.ParseInt(string(change.TargetStorageID), 10, 64) + newDataCount, _ := strconv.ParseInt(change.DataTransferCount, 10, 64) + + switch change.Type { + case "0": //拷贝 + //查询出存储在数据库中的BlockDistribution信息 + blockSource, errSource := repoDist.GetBlockDistributionByIndex(objectID, index, sourceStorageID) + //没有记录就将source和target的信息都保存到库中 + if errors.Is(errSource, gorm.ErrRecordNotFound) { + err := repoDist.CreateBlockDistribution(&BlockDistribution{ + ObjectID: objectID, + Type: change.BlockType, + Index: index, + StorageID: sourceStorageID, + Status: StatusNow, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create source blockdistribution: %v", err) + } + } else { + //有数据则新增一条storageID为targetStorageID的记录,同时更新状态 + err := repoDist.CreateBlockDistribution(&BlockDistribution{ + ObjectID: blockSource.ObjectID, + Type: change.BlockType, + Index: index, + StorageID: targetStorageID, + Status: StatusNow, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error update blockdistribution: %v", err) + } + //复制完成之后增加的dataCount要加到targetStorage的记录中 + storageOld, err := repoStorage.GetStorageByID(targetStorageID) + if errors.Is(err, gorm.ErrRecordNotFound) { + err = repoStorage.CreateStorage(&Storage{ + StorageID: cdssdk.StorageID(targetStorageID), + DataCount: newDataCount, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error increase datacount in targetstorage: %v", err) + } + } else { + err = repoStorage.UpdateStorage(&Storage{ + StorageID: cdssdk.StorageID(targetStorageID), + DataCount: storageOld.DataCount + newDataCount, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error increase datacount in targetstorage: %v", err) + } + } + + } + //新增记录到storageTransferCount表中 + err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{ + ObjectID: objectID, + Status: int64(blockSource.Status), + SourceStorageID: sourceStorageID, + TargetStorageID: targetStorageID, + DataTransferCount: newDataCount, + Timestamp: time.Now(), }) if err != nil { - log.Printf("Error increase datacount in targetstorage: %v", err) + log.Printf("Error create StorageTransferCount : %v", err) + } + case "1": //编解码 + //删除所有的sourceBlock + for _, sourceBlock := range change.SourceBlocks { + sourceBlockIndex, _ := strconv.ParseInt(sourceBlock.Index, 10, 64) + err := repoDist.DeleteBlockDistribution(objectID, sourceBlockIndex, sourceStorageID) + if err != nil { + log.Printf("Error delete blockdistribution: %v", err) + } } - } else { - err = repoStorage.UpdateStorage(&Storage{ - StorageID: cdssdk.StorageID(targetStorageID), - DataCount: storageOld.DataCount + newDataCount, - Timestamp: time.Now(), + //插入所有的targetBlock + for _, targetBlock := range change.TargetBlocks { + storageID, _ := strconv.ParseInt(string(targetBlock.StorageID), 10, 64) + err := repoDist.CreateBlockDistribution(&BlockDistribution{ + ObjectID: objectID, + Type: targetBlock.BlockType, + Index: index, + //直接保存到目标中心 + StorageID: storageID, + Status: StatusNow, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create blockdistribution: %v", err) + } + } + //新增记录到storageTransferCount表中 + err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{ + ObjectID: objectID, + Status: int64(object.Status), + SourceStorageID: sourceStorageID, + TargetStorageID: targetStorageID, + DataTransferCount: newDataCount, + Timestamp: time.Now(), }) if err != nil { - log.Printf("Error increase datacount in targetstorage: %v", err) + log.Printf("Error create StorageTransferCount : %v", err) } - } - } - //新增记录到storageTransferCount表中 - err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{ - ObjectID: objectID, - Status: int64(blockSource.Status), - SourceStorageID: sourceStorageID, - TargetStorageID: targetStorageID, - DataTransferCount: newDataCount, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error create StorageTransferCount : %v", err) - } - case "1": //编解码 - //删除所有的sourceBlock - for _, sourceBlock := range change.SourceBlocks { - sourceBlockIndex, _ := strconv.ParseInt(sourceBlock.Index, 10, 64) - err := repoDist.DeleteBlockDistribution(objectID, sourceBlockIndex, sourceStorageID) - if err != nil { - log.Printf("Error delete blockdistribution: %v", err) - } - } - //插入所有的targetBlock - for _, targetBlock := range change.TargetBlocks { - storageID, _ := strconv.ParseInt(targetBlock.StorageID, 10, 64) - err := repoDist.CreateBlockDistribution(&BlockDistribution{ - ObjectID: objectID, - Type: targetBlock.BlockType, - Index: index, - //直接保存到目标中心 - StorageID: storageID, - Status: StatusNow, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error create blockdistribution: %v", err) - } - } - //新增记录到storageTransferCount表中 - err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{ - ObjectID: objectID, - Status: int64(object.Status), - SourceStorageID: sourceStorageID, - TargetStorageID: targetStorageID, - DataTransferCount: newDataCount, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error create StorageTransferCount : %v", err) - } + case "2": //删除 + for _, block := range change.Blocks { + storageID, _ := strconv.ParseInt(string(block.StorageID), 10, 64) + changeIndex, _ := strconv.ParseInt(block.Index, 10, 64) + err := repoDist.DeleteBlockDistribution(objectID, changeIndex, storageID) + if err != nil { + log.Printf("Error delete blockdistribution: %v", err) + } + } - case "2": //删除 - for _, block := range change.Blocks { - storageID, _ := strconv.ParseInt(block.StorageID, 10, 64) - changeIndex, _ := strconv.ParseInt(block.Index, 10, 64) - err := repoDist.DeleteBlockDistribution(objectID, changeIndex, storageID) - if err != nil { - log.Printf("Error delete blockdistribution: %v", err) - } - } + case "3": //更新 + for _, blockUpdate := range change.Blocks { + //查询出存储在数据库中的BlockDistribution信息 + blockIndex, _ := strconv.ParseInt(blockUpdate.Index, 10, 64) + blockOld, err := repoDist.GetBlockDistributionByIndex(objectID, blockIndex, sourceStorageID) + newStorageID, _ := strconv.ParseInt(string(blockUpdate.StorageID), 10, 64) + err = repoDist.UpdateBlockDistribution(&BlockDistribution{ + BlockID: blockOld.BlockID, + ObjectID: blockOld.ObjectID, + Type: blockUpdate.BlockType, + Index: blockIndex, + StorageID: newStorageID, + Status: StatusNow, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error delete blockdistribution: %v", err) + } + } - case "3": //更新 - for _, blockUpdate := range change.Blocks { - //查询出存储在数据库中的BlockDistribution信息 - blockIndex, _ := strconv.ParseInt(blockUpdate.Index, 10, 64) - blockOld, err := repoDist.GetBlockDistributionByIndex(objectID, blockIndex, sourceStorageID) - newStorageID, _ := strconv.ParseInt(blockUpdate.StorageID, 10, 64) - err = repoDist.UpdateBlockDistribution(&BlockDistribution{ - BlockID: blockOld.BlockID, - ObjectID: blockOld.ObjectID, - Type: blockUpdate.BlockType, - Index: blockIndex, - StorageID: newStorageID, - Status: StatusNow, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error delete blockdistribution: %v", err) + default: + break } } - - default: - break + } else { + fmt.Printf("Watcher %s: Unexpected Body type, expected *BodyStorageInfo, got %T\n", w.Name, event.Body) } + } else { + fmt.Printf("Watcher %s received an event with category %s\n", w.Name, event.Category) } - } diff --git a/datamap/internal/models/hub.go b/datamap/internal/models/hub.go index fdb7795..327c22b 100644 --- a/datamap/internal/models/hub.go +++ b/datamap/internal/models/hub.go @@ -2,14 +2,13 @@ package models import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" - "gorm.io/datatypes" "gorm.io/gorm" ) type Hub struct { - HubID cdssdk.HubID `gorm:"column:HubID; primaryKey; type:bigint; autoIncrement" json:"hubID"` - Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"` - Address datatypes.JSON `gorm:"column:Address; type:json; " json:"address"` + HubID cdssdk.HubID `gorm:"column:HubID; primaryKey; type:bigint; autoIncrement" json:"hubID"` + Name string `gorm:"column:Name; type:varchar(255); not null" json:"name"` + Address cdssdk.HubAddressInfo `gorm:"column:Address; type:json; " json:"address"` } func (Hub) TableName() string { return "hub" } diff --git a/datamap/internal/models/hubinfo.go b/datamap/internal/models/hubinfo.go index e515846..bd133bb 100644 --- a/datamap/internal/models/hubinfo.go +++ b/datamap/internal/models/hubinfo.go @@ -2,9 +2,10 @@ package models import ( "encoding/json" + "fmt" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gorm.io/datatypes" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" ) // LocalHub 本地结构体,嵌入cdssdk.Hub @@ -42,33 +43,49 @@ func (s *LocalHub) UnmarshalJSON(data []byte) error { return nil } -func ProcessHubInfo(data stgmod.HubInfo) { +// 实现 Watcher 接口的结构体 +type HubInfoWatcher struct { + Name string +} + +// 实现 OnEvent 方法 +func (w *HubInfoWatcher) OnEvent(event sysevent.SysEvent) { repo := NewHubRepository(DB) - jsonData, _ := json.Marshal(data.Body.HubInfo.Address) - HubInfo := &Hub{ - HubID: cdssdk.HubID(data.Body.HubID), - Name: data.Body.HubInfo.Name, - Address: datatypes.JSON(jsonData), - } - //先判断传输数据的类型 - switch data.Body.Type { - case "add": - err := repo.CreateHub(HubInfo) - if err != nil { - return - } - case "update": - err := repo.UpdateHub(HubInfo) - if err != nil { - return - } - case "delete": - err := repo.DeleteHub(HubInfo) - if err != nil { - return + if event.Category == "hubInfo" { + if hubInfo, ok := event.Body.(*stgmod.BodyHubInfo); ok { + + hub := &Hub{ + HubID: hubInfo.HubID, + Name: hubInfo.HubInfo.Name, + Address: hubInfo.HubInfo.Address, + } + //先判断传输数据的类型 + switch hubInfo.Type { + case "add": + err := repo.CreateHub(hub) + if err != nil { + return + } + case "update": + err := repo.UpdateHub(hub) + if err != nil { + return + } + case "delete": + err := repo.DeleteHub(hub) + if err != nil { + return + } + default: + return + } + } else { + // 如果 Body 不是我们期望的类型,打印错误信息 + fmt.Printf("Watcher %s: Unexpected Body type, expected *BodyHubInfo, got %T\n", w.Name, event.Body) } - default: - return + } else { + // 如果事件的 Category 不是 hubInfo,打印默认信息 + fmt.Printf("Watcher %s received an event with category %s\n", w.Name, event.Category) } } diff --git a/datamap/internal/models/hubrequest.go b/datamap/internal/models/hubrequest.go index cc9abda..766c085 100644 --- a/datamap/internal/models/hubrequest.go +++ b/datamap/internal/models/hubrequest.go @@ -1,14 +1,17 @@ package models import ( + "fmt" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" "gorm.io/gorm" "log" "time" ) type HubRequest struct { + //todo source和target类型的区分 RequestID int64 `gorm:"column:RequestID; primaryKey; type:bigint; autoIncrement" json:"RequestID"` SourceType string `gorm:"column:SourceType; type:varchar(255); not null" json:"sourceType"` SourceID cdssdk.HubID `gorm:"column:SourceID; type:bigint; not null" json:"sourceID"` @@ -57,75 +60,98 @@ func (r *HubRequestRepository) GetAllHubRequests() ([]HubRequest, error) { return hubRequests, nil } -// ProcessHubTransfer mq推送各节点统计自身当天向外部各个节点传输的总数据量时的处理逻辑 -func ProcessHubTransfer(data stgmod.HubTransferStats) { - repo := NewHubRequestRepository(DB) - - hubRequest := &HubRequest{ - SourceType: "hub", - SourceID: cdssdk.HubID(data.Body.SourceHubID), - TargetType: "hub", - TargetID: cdssdk.HubID(data.Body.TargetHubID), - DataTransferCount: data.Body.Send.TotalTransfer, - RequestCount: data.Body.Send.RequestCount, - FailedRequestCount: data.Body.Send.FailedRequestCount, - AvgTransferCount: data.Body.Send.AvgTransfer, - MaxTransferCount: data.Body.Send.MaxTransfer, - MinTransferCount: data.Body.Send.MinTransfer, - StartTimestamp: data.Body.StartTimestamp, - EndTimestamp: data.Body.EndTimestamp, - } - - err := repo.CreateHubRequest(hubRequest) - if err != nil { - log.Printf("Error update hubrequest: %v", err) - - } +type HubTransferStatsWatcher struct { + Name string } -// ProcessHubStorageTransfer 节点中心之间数据传输处理 -func ProcessHubStorageTransfer(data stgmod.HubStorageTransferStats) { +func (w *HubTransferStatsWatcher) OnEvent(event sysevent.SysEvent) { repo := NewHubRequestRepository(DB) - hubRequestSend := &HubRequest{ - SourceType: "hub", - SourceID: cdssdk.HubID(data.Body.HubID), - TargetType: "storage", - TargetID: cdssdk.HubID(data.Body.StorageID), - DataTransferCount: data.Body.Send.TotalTransfer, - RequestCount: data.Body.Send.RequestCount, - FailedRequestCount: data.Body.Send.FailedRequestCount, - AvgTransferCount: data.Body.Send.AvgTransfer, - MaxTransferCount: data.Body.Send.MaxTransfer, - MinTransferCount: data.Body.Send.MinTransfer, - StartTimestamp: data.Body.StartTimestamp, - EndTimestamp: data.Body.EndTimestamp, - } - - err := repo.CreateHubRequest(hubRequestSend) - if err != nil { - log.Printf("Error update hubrequest: %v", err) - + if event.Category == "hubTransferStats" { + if hubTransferStats, ok := event.Body.(*stgmod.BodyHubTransferStats); ok { + hubRequest := &HubRequest{ + SourceType: "hub", + SourceID: hubTransferStats.SourceHubID, + TargetType: "hub", + TargetID: hubTransferStats.TargetHubID, + DataTransferCount: hubTransferStats.Send.TotalTransfer, + RequestCount: hubTransferStats.Send.RequestCount, + FailedRequestCount: hubTransferStats.Send.FailedRequestCount, + AvgTransferCount: hubTransferStats.Send.AvgTransfer, + MaxTransferCount: hubTransferStats.Send.MaxTransfer, + MinTransferCount: hubTransferStats.Send.MinTransfer, + StartTimestamp: hubTransferStats.StartTimestamp, + EndTimestamp: hubTransferStats.EndTimestamp, + } + + err := repo.CreateHubRequest(hubRequest) + if err != nil { + log.Printf("Error update hubrequest: %v", err) + + } + } else { + fmt.Printf("Watcher %s: Unexpected Body type, expected *BodyStorageInfo, got %T\n", w.Name, event.Body) + } + } else { + fmt.Printf("Watcher %s received an event with category %s\n", w.Name, event.Category) } +} - hubRequestReceive := &HubRequest{ - SourceType: "storage", - SourceID: cdssdk.HubID(data.Body.StorageID), - TargetType: "hub", - TargetID: cdssdk.HubID(data.Body.HubID), - DataTransferCount: data.Body.Receive.TotalTransfer, - RequestCount: data.Body.Receive.RequestCount, - FailedRequestCount: data.Body.Receive.FailedRequestCount, - AvgTransferCount: data.Body.Receive.AvgTransfer, - MaxTransferCount: data.Body.Receive.MaxTransfer, - MinTransferCount: data.Body.Receive.MinTransfer, - StartTimestamp: data.Body.StartTimestamp, - EndTimestamp: data.Body.EndTimestamp, - } +type HubStorageTransferStatsWatcher struct { + Name string +} - err = repo.CreateHubRequest(hubRequestReceive) - if err != nil { - log.Printf("Error update hubrequest: %v", err) +func (w *HubStorageTransferStatsWatcher) OnEvent(event sysevent.SysEvent) { + repo := NewHubRequestRepository(DB) + if event.Category == "hubStorageTransferStats" { + if hubStorageTransferStats, ok := event.Body.(*stgmod.BodyHubStorageTransferStats); ok { + + hubRequestSend := &HubRequest{ + SourceType: "hub", + SourceID: hubStorageTransferStats.HubID, + TargetType: "storage", + TargetID: cdssdk.HubID(hubStorageTransferStats.StorageID), + DataTransferCount: hubStorageTransferStats.Send.TotalTransfer, + RequestCount: hubStorageTransferStats.Send.RequestCount, + FailedRequestCount: hubStorageTransferStats.Send.FailedRequestCount, + AvgTransferCount: hubStorageTransferStats.Send.AvgTransfer, + MaxTransferCount: hubStorageTransferStats.Send.MaxTransfer, + MinTransferCount: hubStorageTransferStats.Send.MinTransfer, + StartTimestamp: hubStorageTransferStats.StartTimestamp, + EndTimestamp: hubStorageTransferStats.EndTimestamp, + } + + err := repo.CreateHubRequest(hubRequestSend) + if err != nil { + log.Printf("Error update hubrequest: %v", err) + + } + + hubRequestReceive := &HubRequest{ + SourceType: "storage", + SourceID: cdssdk.HubID(hubStorageTransferStats.StorageID), + TargetType: "hub", + TargetID: hubStorageTransferStats.HubID, + DataTransferCount: hubStorageTransferStats.Receive.TotalTransfer, + RequestCount: hubStorageTransferStats.Receive.RequestCount, + FailedRequestCount: hubStorageTransferStats.Receive.FailedRequestCount, + AvgTransferCount: hubStorageTransferStats.Receive.AvgTransfer, + MaxTransferCount: hubStorageTransferStats.Receive.MaxTransfer, + MinTransferCount: hubStorageTransferStats.Receive.MinTransfer, + StartTimestamp: hubStorageTransferStats.StartTimestamp, + EndTimestamp: hubStorageTransferStats.EndTimestamp, + } + + err = repo.CreateHubRequest(hubRequestReceive) + if err != nil { + log.Printf("Error update hubrequest: %v", err) + + } + } else { + fmt.Printf("Watcher %s: Unexpected Body type, expected *BodyStorageInfo, got %T\n", w.Name, event.Body) + } + } else { + fmt.Printf("Watcher %s received an event with category %s\n", w.Name, event.Category) } } diff --git a/datamap/internal/models/object.go b/datamap/internal/models/object.go index b2c8855..f83375d 100644 --- a/datamap/internal/models/object.go +++ b/datamap/internal/models/object.go @@ -1,8 +1,10 @@ package models import ( + "fmt" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" "gorm.io/gorm" "time" ) @@ -65,7 +67,19 @@ func (r *ObjectRepository) GetAllObjects() ([]Object, error) { return objects, nil } -// ProcessObject 处理 Object 数据 -func ProcessObject(data stgmod.ObjectChange) { +type ObjectWatcher struct { + Name string +} + +func (w *ObjectWatcher) OnEvent(event sysevent.SysEvent) { + if event.Category == "objectChange" { + if _, ok := event.Body.(*stgmod.BodyObjectChange); ok { + + } else { + fmt.Printf("Watcher %s: Unexpected Body type, expected *ObjectInfo, got %T\n", w.Name, event.Body) + } + } else { + fmt.Printf("Watcher %s received an event with category %s\n", w.Name, event.Category) + } } diff --git a/datamap/internal/models/storageStats.go b/datamap/internal/models/storageStats.go index f26c553..2020946 100644 --- a/datamap/internal/models/storageStats.go +++ b/datamap/internal/models/storageStats.go @@ -2,36 +2,50 @@ package models import ( "errors" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "fmt" stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" "gorm.io/gorm" "log" ) -func ProcessStorageStats(data stgmod.StorageStats) { +type StorageStatsWatcher struct { + Name string +} + +func (w *StorageStatsWatcher) OnEvent(event sysevent.SysEvent) { repo := NewStorageRepository(DB) - storage, err := repo.GetStorageByID(data.Body.StorageID) - if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - // 插入新记录 - newStorage := &Storage{ - StorageID: cdssdk.StorageID(data.Body.StorageID), - DataCount: data.Body.DataCount, - NewDataCount: 0, + if event.Category == "storageStats" { + if storageStats, ok := event.Body.(*stgmod.BodyStorageStats); ok { + + storage, err := repo.GetStorageByID(int64(storageStats.StorageID)) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // 插入新记录 + newStorage := &Storage{ + StorageID: storageStats.StorageID, + DataCount: storageStats.DataCount, + NewDataCount: 0, + } + repo.CreateStorage(newStorage) + } else { + log.Printf("Error querying storage: %v", err) + } + } else { + // 更新记录 + newDataCount := storageStats.DataCount - storage.DataCount + storage.DataCount = storageStats.DataCount + storage.NewDataCount = newDataCount + err := repo.UpdateStorage(storage) + if err != nil { + log.Printf("Error update storage: %v", err) + } } - repo.CreateStorage(newStorage) } else { - log.Printf("Error querying storage: %v", err) + fmt.Printf("Watcher %s: Unexpected Body type, expected *BodyStorageInfo, got %T\n", w.Name, event.Body) } } else { - // 更新记录 - newDataCount := data.Body.DataCount - storage.DataCount - storage.DataCount = data.Body.DataCount - storage.NewDataCount = newDataCount - err := repo.UpdateStorage(storage) - if err != nil { - log.Printf("Error update storage: %v", err) - } + fmt.Printf("Watcher %s received an event with category %s\n", w.Name, event.Category) } } diff --git a/datamap/internal/models/storageinfo.go b/datamap/internal/models/storageinfo.go index 74be392..64f47f6 100644 --- a/datamap/internal/models/storageinfo.go +++ b/datamap/internal/models/storageinfo.go @@ -1,8 +1,10 @@ package models import ( + "fmt" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" "gorm.io/gorm" "time" ) @@ -65,32 +67,45 @@ func (r *StorageRepository) GetAllStorages() ([]Storage, error) { return storages, nil } -func ProcessStorageInfo(data stgmod.StorageInfo) { +type StorageInfoWatcher struct { + Name string +} + +func (w *StorageInfoWatcher) OnEvent(event sysevent.SysEvent) { repo := NewStorageRepository(DB) - storage := &Storage{ - StorageID: cdssdk.StorageID(data.Body.StorageID), - StorageName: data.Body.StorageInfo.Name, - HubID: data.Body.StorageInfo.MasterHub, - Timestamp: data.Timestamp, - } - switch data.Body.Type { - case "add": - err := repo.CreateStorage(storage) - if err != nil { - return - } - case "update": - err := repo.UpdateStorage(storage) - if err != nil { - return - } - case "delete": - err := repo.DeleteStorage(storage) - if err != nil { - return + if event.Category == "storageInfo" { + if storageInfo, ok := event.Body.(*stgmod.BodyStorageInfo); ok { + storage := &Storage{ + StorageID: storageInfo.StorageID, + StorageName: storageInfo.StorageInfo.Name, + HubID: storageInfo.StorageInfo.MasterHub, + Timestamp: time.Now(), + } + + switch storageInfo.Type { + case "add": + err := repo.CreateStorage(storage) + if err != nil { + return + } + case "update": + err := repo.UpdateStorage(storage) + if err != nil { + return + } + case "delete": + err := repo.DeleteStorage(storage) + if err != nil { + return + } + default: + return + } + } else { + fmt.Printf("Watcher %s: Unexpected Body type, expected *BodyStorageInfo, got %T\n", w.Name, event.Body) } - default: - return + } else { + fmt.Printf("Watcher %s received an event with category %s\n", w.Name, event.Category) } }