| @@ -31,7 +31,7 @@ func GetHubInfo(c *gin.Context) { | |||||
| storages, _ := repoStorage.GetAllStorages() | storages, _ := repoStorage.GetAllStorages() | ||||
| for _, hub := range hubs { | for _, hub := range hubs { | ||||
| node := models.Node{ | node := models.Node{ | ||||
| ID: int64(hub.HubID), | |||||
| ID: "hub" + strconv.FormatInt(int64(hub.HubID), 10), | |||||
| NodeType: "hub", | NodeType: "hub", | ||||
| Name: hub.Name, | Name: hub.Name, | ||||
| Address: hub.Address, | Address: hub.Address, | ||||
| @@ -40,7 +40,7 @@ func GetHubInfo(c *gin.Context) { | |||||
| } | } | ||||
| for _, storage := range storages { | for _, storage := range storages { | ||||
| node := models.Node{ | node := models.Node{ | ||||
| ID: int64(storage.StorageID), | |||||
| ID: "storage" + strconv.FormatInt(int64(storage.StorageID), 10), | |||||
| NodeType: "storage", | NodeType: "storage", | ||||
| Name: storage.StorageName, | Name: storage.StorageName, | ||||
| DataCount: storage.DataCount, | DataCount: storage.DataCount, | ||||
| @@ -55,9 +55,9 @@ func GetHubInfo(c *gin.Context) { | |||||
| for _, hubReq := range hubReqs { | for _, hubReq := range hubReqs { | ||||
| edge := models.Edge{ | edge := models.Edge{ | ||||
| SourceType: hubReq.SourceType, | SourceType: hubReq.SourceType, | ||||
| SourceID: int64(hubReq.SourceID), | |||||
| SourceID: hubReq.SourceType + strconv.FormatInt(int64(hubReq.SourceID), 10), | |||||
| TargetType: hubReq.TargetType, | TargetType: hubReq.TargetType, | ||||
| TargetID: int64(hubReq.TargetID), | |||||
| TargetID: hubReq.TargetType + strconv.FormatInt(int64(hubReq.TargetID), 10), | |||||
| DataTransferCount: hubReq.DataTransferCount, | DataTransferCount: hubReq.DataTransferCount, | ||||
| RequestCount: hubReq.RequestCount, | RequestCount: hubReq.RequestCount, | ||||
| FailedRequestCount: hubReq.FailedRequestCount, | FailedRequestCount: hubReq.FailedRequestCount, | ||||
| @@ -116,7 +116,7 @@ func GetDataTransfer(c *gin.Context) { | |||||
| //storage id | //storage id | ||||
| ComboID: strconv.FormatInt(block.StorageID, 10), | ComboID: strconv.FormatInt(block.StorageID, 10), | ||||
| //block index | //block index | ||||
| Label: strconv.FormatInt(block.Index, 10), | |||||
| Label: block.Type + strconv.FormatInt(block.Index, 10), | |||||
| //block type | //block type | ||||
| NodeType: block.Type, | NodeType: block.Type, | ||||
| } | } | ||||
| @@ -126,7 +126,7 @@ func GetDataTransfer(c *gin.Context) { | |||||
| //添加storage combo信息 | //添加storage combo信息 | ||||
| if !containsCombo(combos, strconv.FormatInt(block.StorageID, 10), "storage") { | if !containsCombo(combos, strconv.FormatInt(block.StorageID, 10), "storage") { | ||||
| combo := models.Combo{ | combo := models.Combo{ | ||||
| ID: strconv.FormatInt(block.StorageID, 10), | |||||
| ID: "storage" + strconv.FormatInt(block.StorageID, 10), | |||||
| Label: "存储中心" + strconv.FormatInt(block.StorageID, 10), | Label: "存储中心" + strconv.FormatInt(block.StorageID, 10), | ||||
| ParentId: strconv.Itoa(block.Status), | ParentId: strconv.Itoa(block.Status), | ||||
| ComboType: "storage", | ComboType: "storage", | ||||
| @@ -134,10 +134,23 @@ func GetDataTransfer(c *gin.Context) { | |||||
| combos = append(combos, combo) | combos = append(combos, combo) | ||||
| } | } | ||||
| //添加state combo信息 | //添加state combo信息 | ||||
| if !containsCombo(combos, strconv.Itoa(block.Status), "state") { | |||||
| if !containsCombo(combos, "state"+strconv.Itoa(block.Status), "state") { | |||||
| var statusStr string | |||||
| switch block.Status { | |||||
| case 0: | |||||
| statusStr = "实时情况" | |||||
| case 1: | |||||
| statusStr = block.Timestamp.Format("2006-01-02") + "布局调整后" | |||||
| case 2: | |||||
| statusStr = block.Timestamp.Format("2006-01-02") + "布局调整前" | |||||
| case 3: | |||||
| statusStr = block.Timestamp.Format("2006-01-02") + "布局调整后" | |||||
| default: | |||||
| statusStr = "未知状态" | |||||
| } | |||||
| combo := models.Combo{ | combo := models.Combo{ | ||||
| ID: strconv.Itoa(block.Status), | |||||
| Label: "存储状态" + string(block.Status), | |||||
| ID: "state" + strconv.Itoa(block.Status), | |||||
| Label: statusStr, | |||||
| ComboType: "state", | ComboType: "state", | ||||
| } | } | ||||
| combos = append(combos, combo) | combos = append(combos, combo) | ||||
| @@ -147,8 +160,8 @@ func GetDataTransfer(c *gin.Context) { | |||||
| relations, _ := repoStorageTrans.GetStorageTransferCountByObjectID(objectID) | relations, _ := repoStorageTrans.GetStorageTransferCountByObjectID(objectID) | ||||
| for _, relation := range relations { | for _, relation := range relations { | ||||
| edge := models.DistEdge{ | edge := models.DistEdge{ | ||||
| Source: strconv.FormatInt(relation.SourceStorageID, 10), | |||||
| Target: strconv.FormatInt(relation.TargetStorageID, 10), | |||||
| Source: "storage" + strconv.FormatInt(relation.SourceStorageID, 10), | |||||
| Target: "storage" + strconv.FormatInt(relation.TargetStorageID, 10), | |||||
| } | } | ||||
| edges = append(edges, edge) | edges = append(edges, edge) | ||||
| } | } | ||||
| @@ -71,5 +71,4 @@ func ProcessHubInfo(data stgmod.HubInfo) { | |||||
| default: | default: | ||||
| return | return | ||||
| } | } | ||||
| } | } | ||||
| @@ -33,7 +33,7 @@ type HubRelationship struct { | |||||
| } | } | ||||
| type Node struct { | type Node struct { | ||||
| ID int64 `json:"id"` // 节点/中心ID | |||||
| ID string `json:"id"` // 节点/中心ID | |||||
| NodeType string `json:"nodeType"` //节点类型 storage/hub | NodeType string `json:"nodeType"` //节点类型 storage/hub | ||||
| Name string `json:"name"` // 节点/中心名称 | Name string `json:"name"` // 节点/中心名称 | ||||
| Address cdssdk.HubAddressInfo `json:"address"` // 地址 | Address cdssdk.HubAddressInfo `json:"address"` // 地址 | ||||
| @@ -44,9 +44,9 @@ type Node struct { | |||||
| type Edge struct { | type Edge struct { | ||||
| SourceType string `json:"sourceType"` // 源节点类型 | SourceType string `json:"sourceType"` // 源节点类型 | ||||
| SourceID int64 `json:"sourceID"` // 源节点ID | |||||
| SourceID string `json:"source"` // 源节点ID | |||||
| TargetType string `json:"targetType"` // 目标节点类型 | TargetType string `json:"targetType"` // 目标节点类型 | ||||
| TargetID int64 `json:"targetID"` // 目标节点ID | |||||
| TargetID string `json:"target"` // 目标节点ID | |||||
| DataTransferCount int64 `json:"dataTransferCount"` // 数据传输量 | DataTransferCount int64 `json:"dataTransferCount"` // 数据传输量 | ||||
| RequestCount int64 `json:"requestCount"` // 请求数 | RequestCount int64 `json:"requestCount"` // 请求数 | ||||
| FailedRequestCount int64 `json:"failedRequestCount"` // 失败请求数 | FailedRequestCount int64 `json:"failedRequestCount"` // 失败请求数 | ||||
| @@ -0,0 +1,30 @@ | |||||
| package models | |||||
| // | |||||
| //func ProcessStorageInfo(data stgmod.StorageInfo) { | |||||
| // repo := NewStorageRepository(DB) | |||||
| // | |||||
| // storage, err := repo.GetStorageByID(data.Body.StorageID) | |||||
| // if err != nil { | |||||
| // if errors.Is(err, gorm.ErrRecordNotFound) { | |||||
| // // 插入新记录 | |||||
| // newStorage := &Storage{ | |||||
| // StorageID: cdssdk.StorageID(data.Body.StorageID), | |||||
| // DataCount: data.Body.DataCount, | |||||
| // NewDataCount: 0, | |||||
| // } | |||||
| // repo.CreateStorage(newStorage) | |||||
| // } else { | |||||
| // log.Printf("Error querying storage: %v", err) | |||||
| // } | |||||
| // } else { | |||||
| // // 更新记录 | |||||
| // newDataCount := data.Body.DataCount - storage.DataCount | |||||
| // storage.DataCount = data.Body.DataCount | |||||
| // storage.NewDataCount = newDataCount | |||||
| // err := repo.UpdateStorage(storage) | |||||
| // if err != nil { | |||||
| // log.Printf("Error update storage: %v", err) | |||||
| // } | |||||
| // } | |||||
| //} | |||||
| @@ -1,11 +1,9 @@ | |||||
| package models | package models | ||||
| import ( | import ( | ||||
| "errors" | |||||
| cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" | ||||
| stgmod "gitlink.org.cn/cloudream/storage/common/models" | stgmod "gitlink.org.cn/cloudream/storage/common/models" | ||||
| "gorm.io/gorm" | "gorm.io/gorm" | ||||
| "log" | |||||
| "time" | "time" | ||||
| ) | ) | ||||
| @@ -35,6 +33,9 @@ func (r *StorageRepository) CreateStorage(storage *Storage) error { | |||||
| func (r *StorageRepository) UpdateStorage(storage *Storage) error { | func (r *StorageRepository) UpdateStorage(storage *Storage) error { | ||||
| return r.repo.Update(storage) | return r.repo.Update(storage) | ||||
| } | } | ||||
| func (r *StorageRepository) DeleteStorage(storage *Storage) error { | |||||
| return r.repo.Delete(storage, uint(storage.StorageID)) | |||||
| } | |||||
| func (r *StorageRepository) GetStorageByID(id int64) (*Storage, error) { | func (r *StorageRepository) GetStorageByID(id int64) (*Storage, error) { | ||||
| var storage Storage | var storage Storage | ||||
| @@ -64,32 +65,32 @@ func (r *StorageRepository) GetAllStorages() ([]Storage, error) { | |||||
| return storages, nil | return storages, nil | ||||
| } | } | ||||
| //ProcessHubStat mq推送各节点统计自身当前的总数据量时的处理逻辑 | |||||
| func ProcessStorageInfo(data stgmod.StorageStats) { | |||||
| func ProcessStorageInfo(data stgmod.StorageInfo) { | |||||
| repo := NewStorageRepository(DB) | repo := NewStorageRepository(DB) | ||||
| storage := &Storage{ | |||||
| StorageID: cdssdk.StorageID(data.Body.StorageID), | |||||
| StorageName: data.Body.StorageInfo.Name, | |||||
| HubID: data.Body.StorageInfo.MasterHub, | |||||
| Timestamp: data.Timestamp, | |||||
| } | |||||
| storage, err := repo.GetStorageByID(data.Body.StorageID) | |||||
| if err != nil { | |||||
| if errors.Is(err, gorm.ErrRecordNotFound) { | |||||
| // 插入新记录 | |||||
| newStorage := &Storage{ | |||||
| StorageID: cdssdk.StorageID(data.Body.StorageID), | |||||
| DataCount: data.Body.DataCount, | |||||
| NewDataCount: 0, | |||||
| } | |||||
| repo.CreateStorage(newStorage) | |||||
| } else { | |||||
| log.Printf("Error querying storage: %v", err) | |||||
| switch data.Body.Type { | |||||
| case "add": | |||||
| err := repo.CreateStorage(storage) | |||||
| if err != nil { | |||||
| return | |||||
| } | } | ||||
| } else { | |||||
| // 更新记录 | |||||
| newDataCount := data.Body.DataCount - storage.DataCount | |||||
| storage.DataCount = data.Body.DataCount | |||||
| storage.NewDataCount = newDataCount | |||||
| case "update": | |||||
| err := repo.UpdateStorage(storage) | err := repo.UpdateStorage(storage) | ||||
| if err != nil { | if err != nil { | ||||
| log.Printf("Error update storage: %v", err) | |||||
| return | |||||
| } | |||||
| case "delete": | |||||
| err := repo.DeleteStorage(storage) | |||||
| if err != nil { | |||||
| return | |||||
| } | } | ||||
| default: | |||||
| return | |||||
| } | } | ||||
| } | } | ||||
| @@ -1,8 +1,8 @@ | |||||
| package mq | package mq | ||||
| import ( | import ( | ||||
| "encoding/json" | |||||
| "fmt" | "fmt" | ||||
| jsoniter "github.com/json-iterator/go" | |||||
| "github.com/streadway/amqp" | "github.com/streadway/amqp" | ||||
| stgmod "gitlink.org.cn/cloudream/storage/common/models" | stgmod "gitlink.org.cn/cloudream/storage/common/models" | ||||
| "gitlink.org.cn/cloudream/storage/datamap/internal/config" | "gitlink.org.cn/cloudream/storage/datamap/internal/config" | ||||
| @@ -25,8 +25,11 @@ func InitMQ(cfg config.RabbitMQConfig) (*amqp.Connection, error) { | |||||
| func listenQueues(conn *amqp.Connection) { | func listenQueues(conn *amqp.Connection) { | ||||
| queues := []string{ | queues := []string{ | ||||
| "datamap_hubinfo", | |||||
| "datamap_storageinfo", | "datamap_storageinfo", | ||||
| "datamap_hubtransfer", | |||||
| "datamap_storagestats", | |||||
| "datamap_hubtransferstats", | |||||
| "datamap_hubstoragetransferstats", | |||||
| "datamap_blocktransfer", | "datamap_blocktransfer", | ||||
| "datamap_blockdistribution", | "datamap_blockdistribution", | ||||
| } | } | ||||
| @@ -57,29 +60,28 @@ func processMessage(queue string, body []byte) { | |||||
| switch queue { | switch queue { | ||||
| case "datamap_hubinfo": | case "datamap_hubinfo": | ||||
| var data stgmod.HubInfo | var data stgmod.HubInfo | ||||
| if err := json.Unmarshal(body, &data); err != nil { | |||||
| if err := jsoniter.Unmarshal(body, &data); err != nil { | |||||
| log.Printf("Failed to unmarshal HubInfo: %v, body: %s", err, body) | log.Printf("Failed to unmarshal HubInfo: %v, body: %s", err, body) | ||||
| return | return | ||||
| } | } | ||||
| models.ProcessHubInfo(data) | models.ProcessHubInfo(data) | ||||
| case "datamap_storageinfo": | case "datamap_storageinfo": | ||||
| var data stgmod.StorageInfo | var data stgmod.StorageInfo | ||||
| if err := json.Unmarshal(body, &data); err != nil { | |||||
| if err := jsoniter.Unmarshal(body, &data); err != nil { | |||||
| log.Printf("Failed to unmarshal StorageInfo: %v, body: %s", err, body) | log.Printf("Failed to unmarshal StorageInfo: %v, body: %s", err, body) | ||||
| return | return | ||||
| } | } | ||||
| //models.ProcessStorageInfo(data) | |||||
| models.ProcessStorageInfo(data) | |||||
| case "datamap_storagestats": | case "datamap_storagestats": | ||||
| var data stgmod.StorageStats | var data stgmod.StorageStats | ||||
| if err := json.Unmarshal(body, &data); err != nil { | |||||
| if err := jsoniter.Unmarshal(body, &data); err != nil { | |||||
| log.Printf("Failed to unmarshal StorageStats: %v, body: %s", err, body) | log.Printf("Failed to unmarshal StorageStats: %v, body: %s", err, body) | ||||
| return | return | ||||
| } | } | ||||
| //models.ProcessStorageInfo(data) | //models.ProcessStorageInfo(data) | ||||
| case "datamap_hubtransferstats": | case "datamap_hubtransferstats": | ||||
| var data stgmod.HubTransferStats | var data stgmod.HubTransferStats | ||||
| err := json.Unmarshal(body, &data) | |||||
| err := jsoniter.Unmarshal(body, &data) | |||||
| if err != nil { | if err != nil { | ||||
| log.Printf("Failed to unmarshal HubTransferStats: %v, body: %s", err, body) | log.Printf("Failed to unmarshal HubTransferStats: %v, body: %s", err, body) | ||||
| return | return | ||||
| @@ -87,7 +89,7 @@ func processMessage(queue string, body []byte) { | |||||
| models.ProcessHubTransfer(data) | models.ProcessHubTransfer(data) | ||||
| case "datamap_hubstoragetransferstats": | case "datamap_hubstoragetransferstats": | ||||
| var data stgmod.HubStorageTransferStats | var data stgmod.HubStorageTransferStats | ||||
| err := json.Unmarshal(body, &data) | |||||
| err := jsoniter.Unmarshal(body, &data) | |||||
| if err != nil { | if err != nil { | ||||
| log.Printf("Failed to unmarshal HubStorageTransferStats: %v, body: %s", err, body) | log.Printf("Failed to unmarshal HubStorageTransferStats: %v, body: %s", err, body) | ||||
| return | return | ||||
| @@ -95,7 +97,7 @@ func processMessage(queue string, body []byte) { | |||||
| //models.ProcessHubTransfer(data) | //models.ProcessHubTransfer(data) | ||||
| case "datamap_blocktransfer": | case "datamap_blocktransfer": | ||||
| var data stgmod.BlockTransfer | var data stgmod.BlockTransfer | ||||
| err := json.Unmarshal(body, &data) | |||||
| err := jsoniter.Unmarshal(body, &data) | |||||
| if err != nil { | if err != nil { | ||||
| log.Printf("Failed to unmarshal BlockTransfer: %v, body: %s", err, body) | log.Printf("Failed to unmarshal BlockTransfer: %v, body: %s", err, body) | ||||
| return | return | ||||
| @@ -103,7 +105,7 @@ func processMessage(queue string, body []byte) { | |||||
| models.ProcessBlockTransfer(data) | models.ProcessBlockTransfer(data) | ||||
| case "datamap_blockdistribution": | case "datamap_blockdistribution": | ||||
| var data stgmod.BlockDistribution | var data stgmod.BlockDistribution | ||||
| err := json.Unmarshal(body, &data) | |||||
| err := jsoniter.Unmarshal(body, &data) | |||||
| if err != nil { | if err != nil { | ||||
| log.Printf("Failed to unmarshal BlockDistribution: %v, body: %s", err, body) | log.Printf("Failed to unmarshal BlockDistribution: %v, body: %s", err, body) | ||||
| return | return | ||||