From e898591a00d079fb9f27c2b2c7b0532f4e46a5f5 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 24 Jan 2025 17:46:24 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E5=A4=84=E7=90=86=E4=BA=8B?= =?UTF-8?q?=E4=BB=B6=E7=9A=84=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- datamap/internal/models/blockdistribution.go | 173 ++++++----- datamap/internal/models/blocktransfer.go | 291 +++++++++---------- datamap/internal/models/hubinfo.go | 62 ++-- datamap/internal/models/object.go | 5 +- datamap/internal/models/storageinfo.go | 65 ++--- 5 files changed, 290 insertions(+), 306 deletions(-) diff --git a/datamap/internal/models/blockdistribution.go b/datamap/internal/models/blockdistribution.go index cdf4005..12e1dc7 100644 --- a/datamap/internal/models/blockdistribution.go +++ b/datamap/internal/models/blockdistribution.go @@ -2,13 +2,13 @@ package models import ( "errors" - "fmt" - stgmod "gitlink.org.cn/cloudream/storage/common/models" - "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" - "gorm.io/gorm" "log" "strconv" "time" + + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" + "gorm.io/gorm" ) type BlockDistribution struct { @@ -95,98 +95,93 @@ type BlockDistributionWatcher struct { } func (w *BlockDistributionWatcher) OnEvent(event sysevent.SysEvent) { + body, ok := event.Body.(*stgmod.BodyBlockDistribution) + if !ok { + return + } + repoObject := NewObjectRepository(DB) repoBlock := NewBlockDistributionRepository(DB) repoStorage := NewStorageTransferCountRepository(DB) - if event.Category == "blockDistribution" { - if blockDistribution, ok := event.Body.(*stgmod.BodyBlockDistribution); ok { - - //更新object表中的状态 - - object, err := repoObject.GetObjectByID(int64(blockDistribution.Object.ObjectID)) - faultTolerance, _ := strconv.ParseFloat(blockDistribution.Object.FaultTolerance, 64) - redundancy, _ := strconv.ParseFloat(blockDistribution.Object.Redundancy, 64) - avgAccessCost, _ := strconv.ParseFloat(blockDistribution.Object.AvgAccessCost, 64) - if errors.Is(err, gorm.ErrRecordNotFound) { - err := repoObject.CreateObject(&Object{ - ObjectID: blockDistribution.Object.ObjectID, - PackageID: blockDistribution.Object.PackageID, - Path: blockDistribution.Object.Path, - Size: blockDistribution.Object.Size, - FileHash: blockDistribution.Object.FileHash, - Status: StatusYesterdayAfter, - FaultTolerance: faultTolerance, - Redundancy: redundancy, - AvgAccessCost: avgAccessCost, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error create object: %v", err) - } - } else { - object.Status = StatusYesterdayAfter - err = repoObject.UpdateObject(object) - if err != nil { - log.Printf("Error update object: %v", err) - } - } + //更新object表中的状态 + object, err := repoObject.GetObjectByID(int64(body.ObjectID)) + faultTolerance, _ := strconv.ParseFloat(body.FaultTolerance, 64) + redundancy, _ := strconv.ParseFloat(body.Redundancy, 64) + avgAccessCost, _ := strconv.ParseFloat(body.AvgAccessCost, 64) + if errors.Is(err, gorm.ErrRecordNotFound) { + err := repoObject.CreateObject(&Object{ + ObjectID: body.ObjectID, + PackageID: body.PackageID, + Path: body.Path, + Size: body.Size, + FileHash: body.FileHash, + Status: StatusYesterdayAfter, + FaultTolerance: faultTolerance, + Redundancy: redundancy, + AvgAccessCost: avgAccessCost, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create object: %v", err) + } + } else { + object.Status = StatusYesterdayAfter + err = repoObject.UpdateObject(object) + if err != nil { + log.Printf("Error update object: %v", err) + } + } - //更新block表中的状态 - for _, blockDist := range blockDistribution.Object.BlockDistribution { - blockIndex, _ := strconv.ParseInt(blockDist.Index, 10, 64) - blockStorageID, _ := strconv.ParseInt(blockDist.StorageID, 10, 64) - blockDist, err := repoBlock.GetBlockDistributionByIndex(int64(blockDistribution.Object.ObjectID), blockIndex, blockStorageID) - if errors.Is(err, gorm.ErrRecordNotFound) { - err := repoBlock.CreateBlockDistribution(&BlockDistribution{ - BlockID: blockDist.BlockID, - ObjectID: blockDist.ObjectID, - Type: blockDist.Type, - Index: blockIndex, - StorageID: blockStorageID, - Status: StatusYesterdayAfter, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error create BlockDistribution: %v", err) - } - } else { - err := repoBlock.UpdateBlockDistribution(&BlockDistribution{ - BlockID: blockDist.BlockID, - ObjectID: blockDist.ObjectID, - Type: blockDist.Type, - Index: blockIndex, - StorageID: blockStorageID, - Status: StatusYesterdayAfter, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error update BlockDistribution: %v", err) - } - } - } - //在storageTransferCount表中添加记录 - for _, dataTransfer := range blockDistribution.Object.DataTransfers { - sourceStorageID, _ := strconv.ParseInt(string(dataTransfer.SourceStorageID), 10, 64) - targetStorageID, _ := strconv.ParseInt(string(dataTransfer.TargetStorageID), 10, 64) - dataTransferCount, _ := strconv.ParseInt(dataTransfer.DataTransferCount, 10, 64) - - err := repoStorage.CreateStorageTransferCount(&StorageTransferCount{ - ObjectID: int64(blockDistribution.Object.ObjectID), - Status: StatusTodayBeforeYesterday, - SourceStorageID: sourceStorageID, - TargetStorageID: targetStorageID, - DataTransferCount: dataTransferCount, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error create StorageTransferCount : %v", err) - } + //更新block表中的状态 + for _, blockDist := range body.BlockDistribution { + blockIndex, _ := strconv.ParseInt(blockDist.Index, 10, 64) + blockStorageID, _ := strconv.ParseInt(blockDist.StorageID, 10, 64) + blockDist, err := repoBlock.GetBlockDistributionByIndex(int64(body.ObjectID), blockIndex, blockStorageID) + if errors.Is(err, gorm.ErrRecordNotFound) { + err := repoBlock.CreateBlockDistribution(&BlockDistribution{ + BlockID: blockDist.BlockID, + ObjectID: blockDist.ObjectID, + Type: blockDist.Type, + Index: blockIndex, + StorageID: blockStorageID, + Status: StatusYesterdayAfter, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create BlockDistribution: %v", err) } } else { - fmt.Printf("Watcher %s: Unexpected Body type, expected *BodyStorageInfo, got %T\n", w.Name, event.Body) + err := repoBlock.UpdateBlockDistribution(&BlockDistribution{ + BlockID: blockDist.BlockID, + ObjectID: blockDist.ObjectID, + Type: blockDist.Type, + Index: blockIndex, + StorageID: blockStorageID, + Status: StatusYesterdayAfter, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error update BlockDistribution: %v", err) + } + } + } + //在storageTransferCount表中添加记录 + for _, dataTransfer := range body.DataTransfers { + sourceStorageID, _ := strconv.ParseInt(string(dataTransfer.SourceStorageID), 10, 64) + targetStorageID, _ := strconv.ParseInt(string(dataTransfer.TargetStorageID), 10, 64) + dataTransferCount, _ := strconv.ParseInt(dataTransfer.DataTransferCount, 10, 64) + + err := repoStorage.CreateStorageTransferCount(&StorageTransferCount{ + ObjectID: int64(body.ObjectID), + Status: StatusTodayBeforeYesterday, + SourceStorageID: sourceStorageID, + TargetStorageID: targetStorageID, + DataTransferCount: dataTransferCount, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create StorageTransferCount : %v", err) } - } else { - fmt.Printf("Watcher %s received an event with category %s\n", w.Name, event.Category) } } diff --git a/datamap/internal/models/blocktransfer.go b/datamap/internal/models/blocktransfer.go index 98e210d..6b40071 100644 --- a/datamap/internal/models/blocktransfer.go +++ b/datamap/internal/models/blocktransfer.go @@ -2,14 +2,14 @@ package models import ( "errors" - "fmt" + "log" + "strconv" + "time" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" "gorm.io/gorm" - "log" - "strconv" - "time" ) type StorageTransferCount struct { @@ -75,165 +75,160 @@ type BlockTransferWatcher struct { } func (w *BlockTransferWatcher) OnEvent(event sysevent.SysEvent) { + body, ok := event.Body.(*stgmod.BodyBlockTransfer) + if !ok { + return + } repoDist := NewBlockDistributionRepository(DB) repoStorage := NewStorageRepository(DB) repoStorageTrans := NewStorageTransferCountRepository(DB) repoObject := NewObjectRepository(DB) - if event.Category == "blockTransfer" { - if blockTransfer, ok := event.Body.(*stgmod.BodyBlockTransfer); ok { - - for _, change := range blockTransfer.BlockChanges { - - objectID, _ := strconv.ParseInt(string(blockTransfer.ObjectID), 10, 64) - object, _ := repoObject.GetObjectByID(objectID) - index, _ := strconv.ParseInt(change.Index, 10, 64) - sourceStorageID, _ := strconv.ParseInt(string(change.SourceStorageID), 10, 64) - targetStorageID, _ := strconv.ParseInt(string(change.TargetStorageID), 10, 64) - newDataCount, _ := strconv.ParseInt(change.DataTransferCount, 10, 64) - - switch change.Type { - case "0": //拷贝 - //查询出存储在数据库中的BlockDistribution信息 - blockSource, errSource := repoDist.GetBlockDistributionByIndex(objectID, index, sourceStorageID) - //没有记录就将source和target的信息都保存到库中 - if errors.Is(errSource, gorm.ErrRecordNotFound) { - err := repoDist.CreateBlockDistribution(&BlockDistribution{ - ObjectID: objectID, - Type: change.BlockType, - Index: index, - StorageID: sourceStorageID, - Status: StatusNow, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error create source blockdistribution: %v", err) - } - } else { - //有数据则新增一条storageID为targetStorageID的记录,同时更新状态 - err := repoDist.CreateBlockDistribution(&BlockDistribution{ - ObjectID: blockSource.ObjectID, - Type: change.BlockType, - Index: index, - StorageID: targetStorageID, - Status: StatusNow, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error update blockdistribution: %v", err) - } - //复制完成之后增加的dataCount要加到targetStorage的记录中 - storageOld, err := repoStorage.GetStorageByID(targetStorageID) - if errors.Is(err, gorm.ErrRecordNotFound) { - err = repoStorage.CreateStorage(&Storage{ - StorageID: cdssdk.StorageID(targetStorageID), - DataCount: newDataCount, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error increase datacount in targetstorage: %v", err) - } - } else { - err = repoStorage.UpdateStorage(&Storage{ - StorageID: cdssdk.StorageID(targetStorageID), - DataCount: storageOld.DataCount + newDataCount, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error increase datacount in targetstorage: %v", err) - } - } - - } - //新增记录到storageTransferCount表中 - err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{ - ObjectID: objectID, - Status: int64(blockSource.Status), - SourceStorageID: sourceStorageID, - TargetStorageID: targetStorageID, - DataTransferCount: newDataCount, - Timestamp: time.Now(), + for _, change := range body.BlockChanges { + + objectID, _ := strconv.ParseInt(string(body.ObjectID), 10, 64) + object, _ := repoObject.GetObjectByID(objectID) + // index, _ := strconv.ParseInt(change.Index, 10, 64) + // sourceStorageID, _ := strconv.ParseInt(string(change.SourceStorageID), 10, 64) + // targetStorageID, _ := strconv.ParseInt(string(change.TargetStorageID), 10, 64) + // newDataCount, _ := strconv.ParseInt(change.DataTransferCount, 10, 64) + + switch change := change.(type) { + case *stgmod.BlockChangeClone: //拷贝 + // TODO 从change中获取index, sourceStorageID, targetStorageID, newDataCount,下同 + + //查询出存储在数据库中的BlockDistribution信息 + blockSource, errSource := repoDist.GetBlockDistributionByIndex(objectID, index, sourceStorageID) + //没有记录就将source和target的信息都保存到库中 + if errors.Is(errSource, gorm.ErrRecordNotFound) { + err := repoDist.CreateBlockDistribution(&BlockDistribution{ + ObjectID: objectID, + Type: change.BlockType, + Index: index, + StorageID: sourceStorageID, + Status: StatusNow, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create source blockdistribution: %v", err) + } + } else { + //有数据则新增一条storageID为targetStorageID的记录,同时更新状态 + err := repoDist.CreateBlockDistribution(&BlockDistribution{ + ObjectID: blockSource.ObjectID, + Type: change.BlockType, + Index: index, + StorageID: targetStorageID, + Status: StatusNow, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error update blockdistribution: %v", err) + } + //复制完成之后增加的dataCount要加到targetStorage的记录中 + storageOld, err := repoStorage.GetStorageByID(targetStorageID) + if errors.Is(err, gorm.ErrRecordNotFound) { + err = repoStorage.CreateStorage(&Storage{ + StorageID: cdssdk.StorageID(targetStorageID), + DataCount: newDataCount, + Timestamp: time.Now(), }) if err != nil { - log.Printf("Error create StorageTransferCount : %v", err) - } - case "1": //编解码 - //删除所有的sourceBlock - for _, sourceBlock := range change.SourceBlocks { - sourceBlockIndex, _ := strconv.ParseInt(sourceBlock.Index, 10, 64) - err := repoDist.DeleteBlockDistribution(objectID, sourceBlockIndex, sourceStorageID) - if err != nil { - log.Printf("Error delete blockdistribution: %v", err) - } - } - //插入所有的targetBlock - for _, targetBlock := range change.TargetBlocks { - storageID, _ := strconv.ParseInt(string(targetBlock.StorageID), 10, 64) - err := repoDist.CreateBlockDistribution(&BlockDistribution{ - ObjectID: objectID, - Type: targetBlock.BlockType, - Index: index, - //直接保存到目标中心 - StorageID: storageID, - Status: StatusNow, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error create blockdistribution: %v", err) - } + log.Printf("Error increase datacount in targetstorage: %v", err) } - //新增记录到storageTransferCount表中 - err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{ - ObjectID: objectID, - Status: int64(object.Status), - SourceStorageID: sourceStorageID, - TargetStorageID: targetStorageID, - DataTransferCount: newDataCount, - Timestamp: time.Now(), + } else { + err = repoStorage.UpdateStorage(&Storage{ + StorageID: cdssdk.StorageID(targetStorageID), + DataCount: storageOld.DataCount + newDataCount, + Timestamp: time.Now(), }) if err != nil { - log.Printf("Error create StorageTransferCount : %v", err) - } - - case "2": //删除 - for _, block := range change.Blocks { - storageID, _ := strconv.ParseInt(string(block.StorageID), 10, 64) - changeIndex, _ := strconv.ParseInt(block.Index, 10, 64) - err := repoDist.DeleteBlockDistribution(objectID, changeIndex, storageID) - if err != nil { - log.Printf("Error delete blockdistribution: %v", err) - } - } - - case "3": //更新 - for _, blockUpdate := range change.Blocks { - //查询出存储在数据库中的BlockDistribution信息 - blockIndex, _ := strconv.ParseInt(blockUpdate.Index, 10, 64) - blockOld, err := repoDist.GetBlockDistributionByIndex(objectID, blockIndex, sourceStorageID) - newStorageID, _ := strconv.ParseInt(string(blockUpdate.StorageID), 10, 64) - err = repoDist.UpdateBlockDistribution(&BlockDistribution{ - BlockID: blockOld.BlockID, - ObjectID: blockOld.ObjectID, - Type: blockUpdate.BlockType, - Index: blockIndex, - StorageID: newStorageID, - Status: StatusNow, - Timestamp: time.Now(), - }) - if err != nil { - log.Printf("Error delete blockdistribution: %v", err) - } + log.Printf("Error increase datacount in targetstorage: %v", err) } + } - default: - break + } + //新增记录到storageTransferCount表中 + err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{ + ObjectID: objectID, + Status: int64(blockSource.Status), + SourceStorageID: sourceStorageID, + TargetStorageID: targetStorageID, + DataTransferCount: newDataCount, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create StorageTransferCount : %v", err) + } + case *stgmod.BlockChangeEnDecode: //编解码 + //删除所有的sourceBlock + for _, sourceBlock := range change.SourceBlocks { + sourceBlockIndex, _ := strconv.ParseInt(sourceBlock.Index, 10, 64) + err := repoDist.DeleteBlockDistribution(objectID, sourceBlockIndex, sourceStorageID) + if err != nil { + log.Printf("Error delete blockdistribution: %v", err) } } - } else { - fmt.Printf("Watcher %s: Unexpected Body type, expected *BodyStorageInfo, got %T\n", w.Name, event.Body) + //插入所有的targetBlock + for _, targetBlock := range change.TargetBlocks { + storageID, _ := strconv.ParseInt(string(targetBlock.StorageID), 10, 64) + err := repoDist.CreateBlockDistribution(&BlockDistribution{ + ObjectID: objectID, + Type: targetBlock.BlockType, + Index: index, + //直接保存到目标中心 + StorageID: storageID, + Status: StatusNow, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create blockdistribution: %v", err) + } + } + //新增记录到storageTransferCount表中 + err := repoStorageTrans.CreateStorageTransferCount(&StorageTransferCount{ + ObjectID: objectID, + Status: int64(object.Status), + SourceStorageID: sourceStorageID, + TargetStorageID: targetStorageID, + DataTransferCount: newDataCount, + Timestamp: time.Now(), + }) + if err != nil { + log.Printf("Error create StorageTransferCount : %v", err) + } + + case *stgmod.BlockChangeDeleted: //删除 + storageID, _ := strconv.ParseInt(string(change.StorageID), 10, 64) + changeIndex, _ := strconv.ParseInt(change.Index, 10, 64) + err := repoDist.DeleteBlockDistribution(objectID, changeIndex, storageID) + if err != nil { + log.Printf("Error delete blockdistribution: %v", err) + } + + // case *stgmod.BlockChangeUpdated: //更新 + // for _, blockUpdate := range change.Blocks { + // //查询出存储在数据库中的BlockDistribution信息 + // blockIndex, _ := strconv.ParseInt(blockUpdate.Index, 10, 64) + // blockOld, err := repoDist.GetBlockDistributionByIndex(objectID, blockIndex, sourceStorageID) + // newStorageID, _ := strconv.ParseInt(string(blockUpdate.StorageID), 10, 64) + // err = repoDist.UpdateBlockDistribution(&BlockDistribution{ + // BlockID: blockOld.BlockID, + // ObjectID: blockOld.ObjectID, + // Type: blockUpdate.BlockType, + // Index: blockIndex, + // StorageID: newStorageID, + // Status: StatusNow, + // Timestamp: time.Now(), + // }) + // if err != nil { + // log.Printf("Error delete blockdistribution: %v", err) + // } + // } + + default: + break } - } else { - fmt.Printf("Watcher %s received an event with category %s\n", w.Name, event.Category) } } diff --git a/datamap/internal/models/hubinfo.go b/datamap/internal/models/hubinfo.go index bd133bb..423c1e6 100644 --- a/datamap/internal/models/hubinfo.go +++ b/datamap/internal/models/hubinfo.go @@ -2,7 +2,7 @@ package models import ( "encoding/json" - "fmt" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" @@ -50,42 +50,36 @@ type HubInfoWatcher struct { // 实现 OnEvent 方法 func (w *HubInfoWatcher) OnEvent(event sysevent.SysEvent) { + repo := NewHubRepository(DB) - if event.Category == "hubInfo" { - if hubInfo, ok := event.Body.(*stgmod.BodyHubInfo); ok { + switch body := event.Body.(type) { + case *stgmod.BodyNewHub: + err := repo.CreateHub(&Hub{ + HubID: body.Info.HubID, + Name: body.Info.Name, + Address: body.Info.Address, + }) + if err != nil { + return + } + + case *stgmod.BodyHubUpdated: + err := repo.UpdateHub(&Hub{ + HubID: body.Info.HubID, + Name: body.Info.Name, + Address: body.Info.Address, + }) + if err != nil { + return + } - hub := &Hub{ - HubID: hubInfo.HubID, - Name: hubInfo.HubInfo.Name, - Address: hubInfo.HubInfo.Address, - } - //先判断传输数据的类型 - switch hubInfo.Type { - case "add": - err := repo.CreateHub(hub) - if err != nil { - return - } - case "update": - err := repo.UpdateHub(hub) - if err != nil { - return - } - case "delete": - err := repo.DeleteHub(hub) - if err != nil { - return - } - default: - return - } - } else { - // 如果 Body 不是我们期望的类型,打印错误信息 - fmt.Printf("Watcher %s: Unexpected Body type, expected *BodyHubInfo, got %T\n", w.Name, event.Body) + case *stgmod.BodyHubDeleted: + err := repo.DeleteHub(&Hub{ + HubID: body.HubID, + }) + if err != nil { + return } - } else { - // 如果事件的 Category 不是 hubInfo,打印默认信息 - fmt.Printf("Watcher %s received an event with category %s\n", w.Name, event.Category) } } diff --git a/datamap/internal/models/object.go b/datamap/internal/models/object.go index f83375d..3cb2373 100644 --- a/datamap/internal/models/object.go +++ b/datamap/internal/models/object.go @@ -2,11 +2,12 @@ package models import ( "fmt" + "time" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" "gorm.io/gorm" - "time" ) type Object struct { @@ -74,7 +75,7 @@ type ObjectWatcher struct { func (w *ObjectWatcher) OnEvent(event sysevent.SysEvent) { if event.Category == "objectChange" { - if _, ok := event.Body.(*stgmod.BodyObjectChange); ok { + if _, ok := event.Body.(*stgmod.BodyNewObject); ok { } else { fmt.Printf("Watcher %s: Unexpected Body type, expected *ObjectInfo, got %T\n", w.Name, event.Body) diff --git a/datamap/internal/models/storageinfo.go b/datamap/internal/models/storageinfo.go index 64f47f6..7d13510 100644 --- a/datamap/internal/models/storageinfo.go +++ b/datamap/internal/models/storageinfo.go @@ -1,12 +1,12 @@ package models import ( - "fmt" + "time" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/common/pkgs/sysevent" "gorm.io/gorm" - "time" ) type Storage struct { @@ -74,38 +74,37 @@ type StorageInfoWatcher struct { func (w *StorageInfoWatcher) OnEvent(event sysevent.SysEvent) { repo := NewStorageRepository(DB) - if event.Category == "storageInfo" { - if storageInfo, ok := event.Body.(*stgmod.BodyStorageInfo); ok { - storage := &Storage{ - StorageID: storageInfo.StorageID, - StorageName: storageInfo.StorageInfo.Name, - HubID: storageInfo.StorageInfo.MasterHub, - Timestamp: time.Now(), - } + switch body := event.Body.(type) { + case *stgmod.BodyNewStorage: + storage := &Storage{ + StorageID: body.Info.StorageID, + StorageName: body.Info.Name, + HubID: body.Info.MasterHub, + Timestamp: time.Now(), + } + err := repo.CreateStorage(storage) + if err != nil { + return + } - switch storageInfo.Type { - case "add": - err := repo.CreateStorage(storage) - if err != nil { - return - } - case "update": - err := repo.UpdateStorage(storage) - if err != nil { - return - } - case "delete": - err := repo.DeleteStorage(storage) - if err != nil { - return - } - default: - return - } - } else { - fmt.Printf("Watcher %s: Unexpected Body type, expected *BodyStorageInfo, got %T\n", w.Name, event.Body) + case *stgmod.BodyStorageUpdated: + storage := &Storage{ + StorageID: body.Info.StorageID, + StorageName: body.Info.Name, + HubID: body.Info.MasterHub, + Timestamp: time.Now(), + } + err := repo.UpdateStorage(storage) + if err != nil { + return + } + case *stgmod.BodyStorageDeleted: + storage := &Storage{ + StorageID: body.StorageID, + } + err := repo.DeleteStorage(storage) + if err != nil { + return } - } else { - fmt.Printf("Watcher %s received an event with category %s\n", w.Name, event.Category) } }