diff --git a/common/models/models.go b/common/models/models.go index a86afbe..47d326e 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -11,6 +11,7 @@ type ObjectBlock struct { Index int `gorm:"column:Index; primaryKey; type:int" json:"index"` StorageID cdssdk.StorageID `gorm:"column:StorageID; primaryKey; type:bigint" json:"storageID"` // 这个块应该在哪个节点上 FileHash cdssdk.FileHash `gorm:"column:FileHash; type:char(68); not null" json:"fileHash"` + Size int64 `gorm:"column:Size; type:bigint" json:"size"` } func (ObjectBlock) TableName() string { @@ -77,6 +78,7 @@ type GrouppedObjectBlock struct { ObjectID cdssdk.ObjectID Index int FileHash cdssdk.FileHash + Size int64 StorageIDs []cdssdk.StorageID } @@ -89,6 +91,7 @@ func (o *ObjectDetail) GroupBlocks() []GrouppedObjectBlock { ObjectID: block.ObjectID, Index: block.Index, FileHash: block.FileHash, + Size: block.Size, } } grp.StorageIDs = append(grp.StorageIDs, block.StorageID) diff --git a/common/pkgs/db2/object.go b/common/pkgs/db2/object.go index 9f0cef3..14633ed 100644 --- a/common/pkgs/db2/object.go +++ b/common/pkgs/db2/object.go @@ -356,6 +356,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds [] Index: 0, StorageID: stgID, FileHash: add.FileHash, + Size: add.Size, }) } } diff --git a/common/pkgs/ioswitch2/ops2/bypass.go b/common/pkgs/ioswitch2/ops2/bypass.go index 8af1be7..496c0db 100644 --- a/common/pkgs/ioswitch2/ops2/bypass.go +++ b/common/pkgs/ioswitch2/ops2/bypass.go @@ -76,7 +76,7 @@ func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) er } e.PutVar(o.BypassCallback, &BypassHandleResultValue{Commited: true}) - e.PutVar(o.FileHash, &FileHashValue{Hash: fileInfo.Hash}) + e.PutVar(o.FileHash, &ShardInfoValue{Hash: fileInfo.Hash, Size: fileInfo.Size}) return nil } diff --git a/common/pkgs/ioswitch2/ops2/shard_store.go b/common/pkgs/ioswitch2/ops2/shard_store.go index 4fab018..5878884 100644 --- a/common/pkgs/ioswitch2/ops2/shard_store.go +++ b/common/pkgs/ioswitch2/ops2/shard_store.go @@ -19,15 +19,16 @@ import ( func init() { exec.UseOp[*ShardRead]() exec.UseOp[*ShardWrite]() - exec.UseVarValue[*FileHashValue]() + exec.UseVarValue[*ShardInfoValue]() } -type FileHashValue struct { +type ShardInfoValue struct { Hash cdssdk.FileHash `json:"hash"` + Size int64 `json:"size"` } -func (v *FileHashValue) Clone() exec.VarValue { - return &FileHashValue{Hash: v.Hash} +func (v *ShardInfoValue) Clone() exec.VarValue { + return &ShardInfoValue{Hash: v.Hash, Size: v.Size} } type ShardRead struct { @@ -105,8 +106,9 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error { return fmt.Errorf("writing file to shard store: %w", err) } - e.PutVar(o.FileHash, &FileHashValue{ + e.PutVar(o.FileHash, &ShardInfoValue{ Hash: fileInfo.Hash, + Size: fileInfo.Size, }) return nil } diff --git a/common/pkgs/uploader/create_load.go b/common/pkgs/uploader/create_load.go index 3827c16..1bc7fc7 100644 --- a/common/pkgs/uploader/create_load.go +++ b/common/pkgs/uploader/create_load.go @@ -68,7 +68,7 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err defer u.lock.Unlock() // 记录上传结果 - fileHash := ret["fileHash"].(*ops2.FileHashValue).Hash + fileHash := ret["fileHash"].(*ops2.ShardInfoValue).Hash u.successes = append(u.successes, coormq.AddObjectEntry{ Path: pa, Size: size, diff --git a/common/pkgs/uploader/update.go b/common/pkgs/uploader/update.go index 127b156..3514876 100644 --- a/common/pkgs/uploader/update.go +++ b/common/pkgs/uploader/update.go @@ -76,7 +76,7 @@ func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error w.successes = append(w.successes, coormq.AddObjectEntry{ Path: pat, Size: size, - FileHash: ret["fileHash"].(*ops2.FileHashValue).Hash, + FileHash: ret["fileHash"].(*ops2.ShardInfoValue).Hash, UploadTime: uploadTime, StorageIDs: []cdssdk.StorageID{w.targetStg.Storage.StorageID}, }) diff --git a/scanner/internal/event/check_package_redundancy.go b/scanner/internal/event/check_package_redundancy.go index 9ceee86..0883030 100644 --- a/scanner/internal/event/check_package_redundancy.go +++ b/scanner/internal/event/check_package_redundancy.go @@ -485,11 +485,13 @@ func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.Object var blocks []stgmod.ObjectBlock var blockChgs []stgmod.BlockChange for i, stg := range uploadStgs { + r := ret[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue) blocks = append(blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: 0, StorageID: stg.Storage.Storage.StorageID, - FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash, + FileHash: r.Hash, + Size: r.Size, }) blockChgs = append(blockChgs, &stgmod.BlockChangeClone{ BlockType: stgmod.BlockTypeRaw, @@ -554,11 +556,13 @@ func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectD var evtTargetBlocks []stgmod.Block var evtBlockTrans []stgmod.DataTransfer for i := 0; i < red.N; i++ { + r := ioRet[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue) blocks = append(blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: i, StorageID: uploadStgs[i].Storage.Storage.StorageID, - FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash, + FileHash: r.Hash, + Size: r.Size, }) evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ BlockType: stgmod.BlockTypeEC, @@ -635,11 +639,13 @@ func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.Object var evtTargetBlocks []stgmod.Block var evtBlockTrans []stgmod.DataTransfer for i := 0; i < red.N; i++ { + r := ioRet[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue) blocks = append(blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: i, StorageID: uploadStgs[i].Storage.Storage.StorageID, - FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash, + FileHash: r.Hash, + Size: r.Size, }) evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ BlockType: stgmod.BlockTypeEC, @@ -722,11 +728,13 @@ func (t *CheckPackageRedundancy) noneToSeg(ctx ExecuteContext, obj stgmod.Object var evtTargetBlocks []stgmod.Block var evtBlockTrans []stgmod.DataTransfer for i, stg := range uploadStgs { + r := ret[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue) blocks = append(blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: i, StorageID: stg.Storage.Storage.StorageID, - FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash, + FileHash: r.Hash, + Size: r.Size, }) evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{ BlockType: stgmod.BlockTypeSegment, @@ -807,11 +815,13 @@ func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectD var blocks []stgmod.ObjectBlock var blockChgs []stgmod.BlockChange for i, stg := range uploadStgs { + r := ret[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue) blocks = append(blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: 0, StorageID: stg.Storage.Storage.StorageID, - FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash, + FileHash: r.Hash, + Size: r.Size, }) blockChgs = append(blockChgs, &stgmod.BlockChangeClone{ BlockType: stgmod.BlockTypeRaw, @@ -904,11 +914,13 @@ func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDe var blocks []stgmod.ObjectBlock for i := range uploadStgs { + r := ioRet[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue) blocks = append(blocks, stgmod.ObjectBlock{ ObjectID: obj.Object.ObjectID, Index: 0, StorageID: uploadStgs[i].Storage.Storage.StorageID, - FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash, + FileHash: r.Hash, + Size: r.Size, }) } @@ -1029,6 +1041,7 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更 if ok && lo.Contains(grp.StorageIDs, stg.Storage.Storage.StorageID) { newBlock.FileHash = grp.FileHash + newBlock.Size = grp.Size newBlocks = append(newBlocks, newBlock) continue } @@ -1071,7 +1084,9 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet return nil, fmt.Errorf("parsing result key %s as index: %w", k, err) } - newBlocks[idx].FileHash = v.(*ops2.FileHashValue).Hash + r := v.(*ops2.ShardInfoValue) + newBlocks[idx].FileHash = r.Hash + newBlocks[idx].Size = r.Size } var evtBlockTrans []stgmod.DataTransfer @@ -1268,6 +1283,7 @@ func (t *CheckPackageRedundancy) reconstructLRC(ctx ExecuteContext, obj stgmod.O // 如果新选中的节点已经记录在Block表中,那么就不需要任何变更 if ok && lo.Contains(grp.StorageIDs, storage.Storage.Storage.StorageID) { newBlock.FileHash = grp.FileHash + newBlock.Size = grp.Size newBlocks = append(newBlocks, newBlock) continue } @@ -1311,7 +1327,9 @@ func (t *CheckPackageRedundancy) reconstructLRC(ctx ExecuteContext, obj stgmod.O return nil, fmt.Errorf("parsing result key %s as index: %w", k, err) } - newBlocks[idx].FileHash = v.(*ops2.FileHashValue).Hash + r := v.(*ops2.ShardInfoValue) + newBlocks[idx].FileHash = r.Hash + newBlocks[idx].Size = r.Size } return &coormq.UpdatingObjectRedundancy{ diff --git a/scanner/internal/event/clean_pinned.go b/scanner/internal/event/clean_pinned.go index 277222d..693549a 100644 --- a/scanner/internal/event/clean_pinned.go +++ b/scanner/internal/event/clean_pinned.go @@ -249,6 +249,7 @@ type objectBlock struct { HasEntity bool // 节点拥有实际的文件数据块 HasShadow bool // 如果节点拥有完整文件数据,那么认为这个节点拥有所有块,这些块被称为影子块 FileHash cdssdk.FileHash // 只有在拥有实际文件数据块时,这个字段才有值 + Size int64 // 块大小 } type stgDist struct { @@ -590,6 +591,7 @@ func (t *CleanPinned) initBlockList(ctx *annealingState) { StorageID: b.StorageID, HasEntity: true, FileHash: b.FileHash, + Size: b.Size, }) blocksMap[b.StorageID] = blocks } @@ -774,6 +776,7 @@ func (t *CleanPinned) makePlansForRepObject(allStgInfos map[cdssdk.StorageID]*st Index: solu.blockList[i].Index, StorageID: solu.blockList[i].StorageID, FileHash: obj.Object.FileHash, + Size: solu.blockList[i].Size, }) } } @@ -859,6 +862,7 @@ func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stg Index: block.Index, StorageID: block.StorageID, FileHash: block.FileHash, + Size: block.Size, }) // 如果这个块是影子块,那么就要从完整对象里重建这个块 @@ -1024,7 +1028,9 @@ func (t *CleanPinned) populateECObjectEntry(entry *coormq.UpdatingObjectRedundan key := fmt.Sprintf("%d.%d", obj.Object.ObjectID, entry.Blocks[i].Index) // 不应该出现key不存在的情况 - entry.Blocks[i].FileHash = ioRets[key].(*ops2.FileHashValue).Hash + r := ioRets[key].(*ops2.ShardInfoValue) + entry.Blocks[i].FileHash = r.Hash + entry.Blocks[i].Size = r.Size } }