diff --git a/common/models/datamap.go b/common/models/datamap.go index 38bf89b..54c1f3a 100644 --- a/common/models/datamap.go +++ b/common/models/datamap.go @@ -194,7 +194,7 @@ type ObjectChangeBody struct { Path string `json:"path"` Size int `json:"size"` BlockDistribution []BlockDistribution `json:"blockDistribution"` - Timestamp string `json:"timestamp"` + Timestamp time.Time `json:"timestamp"` } type ObjectChange struct { Timestamp time.Time `json:"timestamp"` @@ -222,10 +222,10 @@ type PackageChange struct { // BucketChange bucket变化信息 type BucketChangeBody struct { - Type string `json:"type"` - BucketID string `json:"bucketID"` - BucketName string `json:"bucketName"` - Timestamp string `json:"timestamp"` + Type string `json:"type"` + BucketID string `json:"bucketID"` + BucketName string `json:"bucketName"` + Timestamp time.Time `json:"timestamp"` } type BucketChange struct { diff --git a/datamap/.env b/datamap/.env new file mode 100644 index 0000000..12a1727 --- /dev/null +++ b/datamap/.env @@ -0,0 +1,15 @@ +# 数据库配置 +DB_HOST=175.178.223.172 +DB_PORT=3306 +DB_USER=root +DB_PASSWORD= +DB_NAME=storage_datamap + +# RabbitMQ 配置 +RABBITMQ_HOST=175.178.223.172 +RABBITMQ_PORT=5672 +RABBITMQ_USER=guest +RABBITMQ_PASSWORD= + +# 服务配置 +SERVER_PORT=8080 \ No newline at end of file diff --git a/datamap/internal/handlers/handlers.go b/datamap/internal/handlers/handlers.go index 66abf46..b206d0a 100644 --- a/datamap/internal/handlers/handlers.go +++ b/datamap/internal/handlers/handlers.go @@ -114,7 +114,7 @@ func GetDataTransfer(c *gin.Context) { //block id ID: strconv.FormatInt(block.BlockID, 10), //storage id - ComboID: strconv.FormatInt(block.StorageID, 10), + ComboID: "storage" + strconv.FormatInt(block.StorageID, 10), //block index Label: block.Type + strconv.FormatInt(block.Index, 10), //block type @@ -124,11 +124,11 @@ func GetDataTransfer(c *gin.Context) { //combos ------- state or storage //添加storage combo信息 - if !containsCombo(combos, strconv.FormatInt(block.StorageID, 10), "storage") { + if !containsCombo(combos, "storage"+strconv.FormatInt(block.StorageID, 10), "storage") { combo := models.Combo{ ID: "storage" + strconv.FormatInt(block.StorageID, 10), Label: "存储中心" + strconv.FormatInt(block.StorageID, 10), - ParentId: strconv.Itoa(block.Status), + ParentId: "state" + strconv.Itoa(block.Status), ComboType: "storage", } combos = append(combos, combo) diff --git a/datamap/internal/models/hubrequest.go b/datamap/internal/models/hubrequest.go index 47a77c8..cc9abda 100644 --- a/datamap/internal/models/hubrequest.go +++ b/datamap/internal/models/hubrequest.go @@ -11,9 +11,9 @@ import ( type HubRequest struct { RequestID int64 `gorm:"column:RequestID; primaryKey; type:bigint; autoIncrement" json:"RequestID"` SourceType string `gorm:"column:SourceType; type:varchar(255); not null" json:"sourceType"` - SourceID cdssdk.HubID `gorm:"column:SourceHubID; type:bigint; not null" json:"sourceID"` + SourceID cdssdk.HubID `gorm:"column:SourceID; type:bigint; not null" json:"sourceID"` TargetType string `gorm:"column:TargetType; type:varchar(255); not null" json:"targetType"` - TargetID cdssdk.HubID `gorm:"column:TargetHubID; type:bigint; not null" json:"targetID"` + TargetID cdssdk.HubID `gorm:"column:TargetID; type:bigint; not null" json:"targetID"` DataTransferCount int64 `gorm:"column:DataTransferCount; type:bigint; not null" json:"dataTransferCount"` RequestCount int64 `gorm:"column:RequestCount; type:bigint; not null" json:"requestCount"` FailedRequestCount int64 `gorm:"column:FailedRequestCount; type:bigint; not null" json:"failedRequestCount"` @@ -61,10 +61,10 @@ func (r *HubRequestRepository) GetAllHubRequests() ([]HubRequest, error) { func ProcessHubTransfer(data stgmod.HubTransferStats) { repo := NewHubRequestRepository(DB) - //todo 加字段 hubRequest := &HubRequest{ - + SourceType: "hub", SourceID: cdssdk.HubID(data.Body.SourceHubID), + TargetType: "hub", TargetID: cdssdk.HubID(data.Body.TargetHubID), DataTransferCount: data.Body.Send.TotalTransfer, RequestCount: data.Body.Send.RequestCount, @@ -82,3 +82,50 @@ func ProcessHubTransfer(data stgmod.HubTransferStats) { } } + +// ProcessHubStorageTransfer 节点中心之间数据传输处理 +func ProcessHubStorageTransfer(data stgmod.HubStorageTransferStats) { + repo := NewHubRequestRepository(DB) + + hubRequestSend := &HubRequest{ + SourceType: "hub", + SourceID: cdssdk.HubID(data.Body.HubID), + TargetType: "storage", + TargetID: cdssdk.HubID(data.Body.StorageID), + DataTransferCount: data.Body.Send.TotalTransfer, + RequestCount: data.Body.Send.RequestCount, + FailedRequestCount: data.Body.Send.FailedRequestCount, + AvgTransferCount: data.Body.Send.AvgTransfer, + MaxTransferCount: data.Body.Send.MaxTransfer, + MinTransferCount: data.Body.Send.MinTransfer, + StartTimestamp: data.Body.StartTimestamp, + EndTimestamp: data.Body.EndTimestamp, + } + + err := repo.CreateHubRequest(hubRequestSend) + if err != nil { + log.Printf("Error update hubrequest: %v", err) + + } + + hubRequestReceive := &HubRequest{ + SourceType: "storage", + SourceID: cdssdk.HubID(data.Body.StorageID), + TargetType: "hub", + TargetID: cdssdk.HubID(data.Body.HubID), + DataTransferCount: data.Body.Receive.TotalTransfer, + RequestCount: data.Body.Receive.RequestCount, + FailedRequestCount: data.Body.Receive.FailedRequestCount, + AvgTransferCount: data.Body.Receive.AvgTransfer, + MaxTransferCount: data.Body.Receive.MaxTransfer, + MinTransferCount: data.Body.Receive.MinTransfer, + StartTimestamp: data.Body.StartTimestamp, + EndTimestamp: data.Body.EndTimestamp, + } + + err = repo.CreateHubRequest(hubRequestReceive) + if err != nil { + log.Printf("Error update hubrequest: %v", err) + + } +} diff --git a/datamap/internal/models/models.go b/datamap/internal/models/models.go index 85fb10a..3c2bb51 100644 --- a/datamap/internal/models/models.go +++ b/datamap/internal/models/models.go @@ -66,7 +66,7 @@ type ObjectDistribution struct { type DistNode struct { ID string `json:"id"` - ComboID string `json:"comboID"` + ComboID string `json:"comboId"` Label string `json:"label"` NodeType string `json:"nodeType"` } diff --git a/datamap/internal/models/storageStats.go b/datamap/internal/models/storageStats.go index d7e6928..f26c553 100644 --- a/datamap/internal/models/storageStats.go +++ b/datamap/internal/models/storageStats.go @@ -1,30 +1,37 @@ package models -// -//func ProcessStorageInfo(data stgmod.StorageInfo) { -// repo := NewStorageRepository(DB) -// -// storage, err := repo.GetStorageByID(data.Body.StorageID) -// if err != nil { -// if errors.Is(err, gorm.ErrRecordNotFound) { -// // 插入新记录 -// newStorage := &Storage{ -// StorageID: cdssdk.StorageID(data.Body.StorageID), -// DataCount: data.Body.DataCount, -// NewDataCount: 0, -// } -// repo.CreateStorage(newStorage) -// } else { -// log.Printf("Error querying storage: %v", err) -// } -// } else { -// // 更新记录 -// newDataCount := data.Body.DataCount - storage.DataCount -// storage.DataCount = data.Body.DataCount -// storage.NewDataCount = newDataCount -// err := repo.UpdateStorage(storage) -// if err != nil { -// log.Printf("Error update storage: %v", err) -// } -// } -//} +import ( + "errors" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + stgmod "gitlink.org.cn/cloudream/storage/common/models" + "gorm.io/gorm" + "log" +) + +func ProcessStorageStats(data stgmod.StorageStats) { + repo := NewStorageRepository(DB) + + storage, err := repo.GetStorageByID(data.Body.StorageID) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // 插入新记录 + newStorage := &Storage{ + StorageID: cdssdk.StorageID(data.Body.StorageID), + DataCount: data.Body.DataCount, + NewDataCount: 0, + } + repo.CreateStorage(newStorage) + } else { + log.Printf("Error querying storage: %v", err) + } + } else { + // 更新记录 + newDataCount := data.Body.DataCount - storage.DataCount + storage.DataCount = data.Body.DataCount + storage.NewDataCount = newDataCount + err := repo.UpdateStorage(storage) + if err != nil { + log.Printf("Error update storage: %v", err) + } + } +} diff --git a/datamap/internal/mq/mq.go b/datamap/internal/mq/mq.go index 7b3e18c..07c9d9b 100644 --- a/datamap/internal/mq/mq.go +++ b/datamap/internal/mq/mq.go @@ -78,7 +78,7 @@ func processMessage(queue string, body []byte) { log.Printf("Failed to unmarshal StorageStats: %v, body: %s", err, body) return } - //models.ProcessStorageInfo(data) + models.ProcessStorageStats(data) case "datamap_hubtransferstats": var data stgmod.HubTransferStats err := jsoniter.Unmarshal(body, &data) @@ -94,7 +94,7 @@ func processMessage(queue string, body []byte) { log.Printf("Failed to unmarshal HubStorageTransferStats: %v, body: %s", err, body) return } - //models.ProcessHubTransfer(data) + models.ProcessHubStorageTransfer(data) case "datamap_blocktransfer": var data stgmod.BlockTransfer err := jsoniter.Unmarshal(body, &data)