diff --git a/datamap/internal/handlers/handlers.go b/datamap/internal/handlers/handlers.go index 0fa46ba..66abf46 100644 --- a/datamap/internal/handlers/handlers.go +++ b/datamap/internal/handlers/handlers.go @@ -31,7 +31,7 @@ func GetHubInfo(c *gin.Context) { storages, _ := repoStorage.GetAllStorages() for _, hub := range hubs { node := models.Node{ - ID: int64(hub.HubID), + ID: "hub" + strconv.FormatInt(int64(hub.HubID), 10), NodeType: "hub", Name: hub.Name, Address: hub.Address, @@ -40,7 +40,7 @@ func GetHubInfo(c *gin.Context) { } for _, storage := range storages { node := models.Node{ - ID: int64(storage.StorageID), + ID: "storage" + strconv.FormatInt(int64(storage.StorageID), 10), NodeType: "storage", Name: storage.StorageName, DataCount: storage.DataCount, @@ -55,9 +55,9 @@ func GetHubInfo(c *gin.Context) { for _, hubReq := range hubReqs { edge := models.Edge{ SourceType: hubReq.SourceType, - SourceID: int64(hubReq.SourceID), + SourceID: hubReq.SourceType + strconv.FormatInt(int64(hubReq.SourceID), 10), TargetType: hubReq.TargetType, - TargetID: int64(hubReq.TargetID), + TargetID: hubReq.TargetType + strconv.FormatInt(int64(hubReq.TargetID), 10), DataTransferCount: hubReq.DataTransferCount, RequestCount: hubReq.RequestCount, FailedRequestCount: hubReq.FailedRequestCount, @@ -116,7 +116,7 @@ func GetDataTransfer(c *gin.Context) { //storage id ComboID: strconv.FormatInt(block.StorageID, 10), //block index - Label: strconv.FormatInt(block.Index, 10), + Label: block.Type + strconv.FormatInt(block.Index, 10), //block type NodeType: block.Type, } @@ -126,7 +126,7 @@ func GetDataTransfer(c *gin.Context) { //添加storage combo信息 if !containsCombo(combos, strconv.FormatInt(block.StorageID, 10), "storage") { combo := models.Combo{ - ID: strconv.FormatInt(block.StorageID, 10), + ID: "storage" + strconv.FormatInt(block.StorageID, 10), Label: "存储中心" + strconv.FormatInt(block.StorageID, 10), ParentId: strconv.Itoa(block.Status), ComboType: "storage", @@ -134,10 +134,23 @@ func GetDataTransfer(c *gin.Context) { combos = append(combos, combo) } //添加state combo信息 - if !containsCombo(combos, strconv.Itoa(block.Status), "state") { + if !containsCombo(combos, "state"+strconv.Itoa(block.Status), "state") { + var statusStr string + switch block.Status { + case 0: + statusStr = "实时情况" + case 1: + statusStr = block.Timestamp.Format("2006-01-02") + "布局调整后" + case 2: + statusStr = block.Timestamp.Format("2006-01-02") + "布局调整前" + case 3: + statusStr = block.Timestamp.Format("2006-01-02") + "布局调整后" + default: + statusStr = "未知状态" + } combo := models.Combo{ - ID: strconv.Itoa(block.Status), - Label: "存储状态" + string(block.Status), + ID: "state" + strconv.Itoa(block.Status), + Label: statusStr, ComboType: "state", } combos = append(combos, combo) @@ -147,8 +160,8 @@ func GetDataTransfer(c *gin.Context) { relations, _ := repoStorageTrans.GetStorageTransferCountByObjectID(objectID) for _, relation := range relations { edge := models.DistEdge{ - Source: strconv.FormatInt(relation.SourceStorageID, 10), - Target: strconv.FormatInt(relation.TargetStorageID, 10), + Source: "storage" + strconv.FormatInt(relation.SourceStorageID, 10), + Target: "storage" + strconv.FormatInt(relation.TargetStorageID, 10), } edges = append(edges, edge) } diff --git a/datamap/internal/models/hubinfo.go b/datamap/internal/models/hubinfo.go index 81b1369..e515846 100644 --- a/datamap/internal/models/hubinfo.go +++ b/datamap/internal/models/hubinfo.go @@ -71,5 +71,4 @@ func ProcessHubInfo(data stgmod.HubInfo) { default: return } - } diff --git a/datamap/internal/models/models.go b/datamap/internal/models/models.go index b3b1161..85fb10a 100644 --- a/datamap/internal/models/models.go +++ b/datamap/internal/models/models.go @@ -33,7 +33,7 @@ type HubRelationship struct { } type Node struct { - ID int64 `json:"id"` // 节点/中心ID + ID string `json:"id"` // 节点/中心ID NodeType string `json:"nodeType"` //节点类型 storage/hub Name string `json:"name"` // 节点/中心名称 Address cdssdk.HubAddressInfo `json:"address"` // 地址 @@ -44,9 +44,9 @@ type Node struct { type Edge struct { SourceType string `json:"sourceType"` // 源节点类型 - SourceID int64 `json:"sourceID"` // 源节点ID + SourceID string `json:"source"` // 源节点ID TargetType string `json:"targetType"` // 目标节点类型 - TargetID int64 `json:"targetID"` // 目标节点ID + TargetID string `json:"target"` // 目标节点ID DataTransferCount int64 `json:"dataTransferCount"` // 数据传输量 RequestCount int64 `json:"requestCount"` // 请求数 FailedRequestCount int64 `json:"failedRequestCount"` // 失败请求数 diff --git a/datamap/internal/models/storageStats.go b/datamap/internal/models/storageStats.go new file mode 100644 index 0000000..d7e6928 --- /dev/null +++ b/datamap/internal/models/storageStats.go @@ -0,0 +1,30 @@ +package models + +// +//func ProcessStorageInfo(data stgmod.StorageInfo) { +// repo := NewStorageRepository(DB) +// +// storage, err := repo.GetStorageByID(data.Body.StorageID) +// if err != nil { +// if errors.Is(err, gorm.ErrRecordNotFound) { +// // 插入新记录 +// newStorage := &Storage{ +// StorageID: cdssdk.StorageID(data.Body.StorageID), +// DataCount: data.Body.DataCount, +// NewDataCount: 0, +// } +// repo.CreateStorage(newStorage) +// } else { +// log.Printf("Error querying storage: %v", err) +// } +// } else { +// // 更新记录 +// newDataCount := data.Body.DataCount - storage.DataCount +// storage.DataCount = data.Body.DataCount +// storage.NewDataCount = newDataCount +// err := repo.UpdateStorage(storage) +// if err != nil { +// log.Printf("Error update storage: %v", err) +// } +// } +//} diff --git a/datamap/internal/models/storageinfo.go b/datamap/internal/models/storageinfo.go index 9f9dbeb..74be392 100644 --- a/datamap/internal/models/storageinfo.go +++ b/datamap/internal/models/storageinfo.go @@ -1,11 +1,9 @@ package models import ( - "errors" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gorm.io/gorm" - "log" "time" ) @@ -35,6 +33,9 @@ func (r *StorageRepository) CreateStorage(storage *Storage) error { func (r *StorageRepository) UpdateStorage(storage *Storage) error { return r.repo.Update(storage) } +func (r *StorageRepository) DeleteStorage(storage *Storage) error { + return r.repo.Delete(storage, uint(storage.StorageID)) +} func (r *StorageRepository) GetStorageByID(id int64) (*Storage, error) { var storage Storage @@ -64,32 +65,32 @@ func (r *StorageRepository) GetAllStorages() ([]Storage, error) { return storages, nil } -//ProcessHubStat mq推送各节点统计自身当前的总数据量时的处理逻辑 - -func ProcessStorageInfo(data stgmod.StorageStats) { +func ProcessStorageInfo(data stgmod.StorageInfo) { repo := NewStorageRepository(DB) + storage := &Storage{ + StorageID: cdssdk.StorageID(data.Body.StorageID), + StorageName: data.Body.StorageInfo.Name, + HubID: data.Body.StorageInfo.MasterHub, + Timestamp: data.Timestamp, + } - storage, err := repo.GetStorageByID(data.Body.StorageID) - if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - // 插入新记录 - newStorage := &Storage{ - StorageID: cdssdk.StorageID(data.Body.StorageID), - DataCount: data.Body.DataCount, - NewDataCount: 0, - } - repo.CreateStorage(newStorage) - } else { - log.Printf("Error querying storage: %v", err) + switch data.Body.Type { + case "add": + err := repo.CreateStorage(storage) + if err != nil { + return } - } else { - // 更新记录 - newDataCount := data.Body.DataCount - storage.DataCount - storage.DataCount = data.Body.DataCount - storage.NewDataCount = newDataCount + case "update": err := repo.UpdateStorage(storage) if err != nil { - log.Printf("Error update storage: %v", err) + return + } + case "delete": + err := repo.DeleteStorage(storage) + if err != nil { + return } + default: + return } } diff --git a/datamap/internal/mq/mq.go b/datamap/internal/mq/mq.go index 53240fd..7b3e18c 100644 --- a/datamap/internal/mq/mq.go +++ b/datamap/internal/mq/mq.go @@ -1,8 +1,8 @@ package mq import ( - "encoding/json" "fmt" + jsoniter "github.com/json-iterator/go" "github.com/streadway/amqp" stgmod "gitlink.org.cn/cloudream/storage/common/models" "gitlink.org.cn/cloudream/storage/datamap/internal/config" @@ -25,8 +25,11 @@ func InitMQ(cfg config.RabbitMQConfig) (*amqp.Connection, error) { func listenQueues(conn *amqp.Connection) { queues := []string{ + "datamap_hubinfo", "datamap_storageinfo", - "datamap_hubtransfer", + "datamap_storagestats", + "datamap_hubtransferstats", + "datamap_hubstoragetransferstats", "datamap_blocktransfer", "datamap_blockdistribution", } @@ -57,29 +60,28 @@ func processMessage(queue string, body []byte) { switch queue { case "datamap_hubinfo": var data stgmod.HubInfo - - if err := json.Unmarshal(body, &data); err != nil { + if err := jsoniter.Unmarshal(body, &data); err != nil { log.Printf("Failed to unmarshal HubInfo: %v, body: %s", err, body) return } models.ProcessHubInfo(data) case "datamap_storageinfo": var data stgmod.StorageInfo - if err := json.Unmarshal(body, &data); err != nil { + if err := jsoniter.Unmarshal(body, &data); err != nil { log.Printf("Failed to unmarshal StorageInfo: %v, body: %s", err, body) return } - //models.ProcessStorageInfo(data) + models.ProcessStorageInfo(data) case "datamap_storagestats": var data stgmod.StorageStats - if err := json.Unmarshal(body, &data); err != nil { + if err := jsoniter.Unmarshal(body, &data); err != nil { log.Printf("Failed to unmarshal StorageStats: %v, body: %s", err, body) return } //models.ProcessStorageInfo(data) case "datamap_hubtransferstats": var data stgmod.HubTransferStats - err := json.Unmarshal(body, &data) + err := jsoniter.Unmarshal(body, &data) if err != nil { log.Printf("Failed to unmarshal HubTransferStats: %v, body: %s", err, body) return @@ -87,7 +89,7 @@ func processMessage(queue string, body []byte) { models.ProcessHubTransfer(data) case "datamap_hubstoragetransferstats": var data stgmod.HubStorageTransferStats - err := json.Unmarshal(body, &data) + err := jsoniter.Unmarshal(body, &data) if err != nil { log.Printf("Failed to unmarshal HubStorageTransferStats: %v, body: %s", err, body) return @@ -95,7 +97,7 @@ func processMessage(queue string, body []byte) { //models.ProcessHubTransfer(data) case "datamap_blocktransfer": var data stgmod.BlockTransfer - err := json.Unmarshal(body, &data) + err := jsoniter.Unmarshal(body, &data) if err != nil { log.Printf("Failed to unmarshal BlockTransfer: %v, body: %s", err, body) return @@ -103,7 +105,7 @@ func processMessage(queue string, body []byte) { models.ProcessBlockTransfer(data) case "datamap_blockdistribution": var data stgmod.BlockDistribution - err := json.Unmarshal(body, &data) + err := jsoniter.Unmarshal(body, &data) if err != nil { log.Printf("Failed to unmarshal BlockDistribution: %v, body: %s", err, body) return