Browse Source

ObjectBlock表增加Size字段

gitlink
Sydonian 8 months ago
parent
commit
7d7cf8b3f0
8 changed files with 47 additions and 17 deletions
  1. +3
    -0
      common/models/models.go
  2. +1
    -0
      common/pkgs/db2/object.go
  3. +1
    -1
      common/pkgs/ioswitch2/ops2/bypass.go
  4. +7
    -5
      common/pkgs/ioswitch2/ops2/shard_store.go
  5. +1
    -1
      common/pkgs/uploader/create_load.go
  6. +1
    -1
      common/pkgs/uploader/update.go
  7. +26
    -8
      scanner/internal/event/check_package_redundancy.go
  8. +7
    -1
      scanner/internal/event/clean_pinned.go

+ 3
- 0
common/models/models.go View File

@@ -11,6 +11,7 @@ type ObjectBlock struct {
Index int `gorm:"column:Index; primaryKey; type:int" json:"index"`
StorageID cdssdk.StorageID `gorm:"column:StorageID; primaryKey; type:bigint" json:"storageID"` // 这个块应该在哪个节点上
FileHash cdssdk.FileHash `gorm:"column:FileHash; type:char(68); not null" json:"fileHash"`
Size int64 `gorm:"column:Size; type:bigint" json:"size"`
}

func (ObjectBlock) TableName() string {
@@ -77,6 +78,7 @@ type GrouppedObjectBlock struct {
ObjectID cdssdk.ObjectID
Index int
FileHash cdssdk.FileHash
Size int64
StorageIDs []cdssdk.StorageID
}

@@ -89,6 +91,7 @@ func (o *ObjectDetail) GroupBlocks() []GrouppedObjectBlock {
ObjectID: block.ObjectID,
Index: block.Index,
FileHash: block.FileHash,
Size: block.Size,
}
}
grp.StorageIDs = append(grp.StorageIDs, block.StorageID)


+ 1
- 0
common/pkgs/db2/object.go View File

@@ -356,6 +356,7 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []
Index: 0,
StorageID: stgID,
FileHash: add.FileHash,
Size: add.Size,
})
}
}


+ 1
- 1
common/pkgs/ioswitch2/ops2/bypass.go View File

@@ -76,7 +76,7 @@ func (o *BypassToShardStore) Execute(ctx *exec.ExecContext, e *exec.Executor) er
}

e.PutVar(o.BypassCallback, &BypassHandleResultValue{Commited: true})
e.PutVar(o.FileHash, &FileHashValue{Hash: fileInfo.Hash})
e.PutVar(o.FileHash, &ShardInfoValue{Hash: fileInfo.Hash, Size: fileInfo.Size})
return nil
}



+ 7
- 5
common/pkgs/ioswitch2/ops2/shard_store.go View File

@@ -19,15 +19,16 @@ import (
func init() {
exec.UseOp[*ShardRead]()
exec.UseOp[*ShardWrite]()
exec.UseVarValue[*FileHashValue]()
exec.UseVarValue[*ShardInfoValue]()
}

type FileHashValue struct {
type ShardInfoValue struct {
Hash cdssdk.FileHash `json:"hash"`
Size int64 `json:"size"`
}

func (v *FileHashValue) Clone() exec.VarValue {
return &FileHashValue{Hash: v.Hash}
func (v *ShardInfoValue) Clone() exec.VarValue {
return &ShardInfoValue{Hash: v.Hash, Size: v.Size}
}

type ShardRead struct {
@@ -105,8 +106,9 @@ func (o *ShardWrite) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
return fmt.Errorf("writing file to shard store: %w", err)
}

e.PutVar(o.FileHash, &FileHashValue{
e.PutVar(o.FileHash, &ShardInfoValue{
Hash: fileInfo.Hash,
Size: fileInfo.Size,
})
return nil
}


+ 1
- 1
common/pkgs/uploader/create_load.go View File

@@ -68,7 +68,7 @@ func (u *CreateLoadUploader) Upload(pa string, size int64, stream io.Reader) err
defer u.lock.Unlock()

// 记录上传结果
fileHash := ret["fileHash"].(*ops2.FileHashValue).Hash
fileHash := ret["fileHash"].(*ops2.ShardInfoValue).Hash
u.successes = append(u.successes, coormq.AddObjectEntry{
Path: pa,
Size: size,


+ 1
- 1
common/pkgs/uploader/update.go View File

@@ -76,7 +76,7 @@ func (w *UpdateUploader) Upload(pat string, size int64, stream io.Reader) error
w.successes = append(w.successes, coormq.AddObjectEntry{
Path: pat,
Size: size,
FileHash: ret["fileHash"].(*ops2.FileHashValue).Hash,
FileHash: ret["fileHash"].(*ops2.ShardInfoValue).Hash,
UploadTime: uploadTime,
StorageIDs: []cdssdk.StorageID{w.targetStg.Storage.StorageID},
})


+ 26
- 8
scanner/internal/event/check_package_redundancy.go View File

@@ -485,11 +485,13 @@ func (t *CheckPackageRedundancy) noneToRep(ctx ExecuteContext, obj stgmod.Object
var blocks []stgmod.ObjectBlock
var blockChgs []stgmod.BlockChange
for i, stg := range uploadStgs {
r := ret[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue)
blocks = append(blocks, stgmod.ObjectBlock{
ObjectID: obj.Object.ObjectID,
Index: 0,
StorageID: stg.Storage.Storage.StorageID,
FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
FileHash: r.Hash,
Size: r.Size,
})
blockChgs = append(blockChgs, &stgmod.BlockChangeClone{
BlockType: stgmod.BlockTypeRaw,
@@ -554,11 +556,13 @@ func (t *CheckPackageRedundancy) noneToEC(ctx ExecuteContext, obj stgmod.ObjectD
var evtTargetBlocks []stgmod.Block
var evtBlockTrans []stgmod.DataTransfer
for i := 0; i < red.N; i++ {
r := ioRet[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue)
blocks = append(blocks, stgmod.ObjectBlock{
ObjectID: obj.Object.ObjectID,
Index: i,
StorageID: uploadStgs[i].Storage.Storage.StorageID,
FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
FileHash: r.Hash,
Size: r.Size,
})
evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
BlockType: stgmod.BlockTypeEC,
@@ -635,11 +639,13 @@ func (t *CheckPackageRedundancy) noneToLRC(ctx ExecuteContext, obj stgmod.Object
var evtTargetBlocks []stgmod.Block
var evtBlockTrans []stgmod.DataTransfer
for i := 0; i < red.N; i++ {
r := ioRet[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue)
blocks = append(blocks, stgmod.ObjectBlock{
ObjectID: obj.Object.ObjectID,
Index: i,
StorageID: uploadStgs[i].Storage.Storage.StorageID,
FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
FileHash: r.Hash,
Size: r.Size,
})
evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
BlockType: stgmod.BlockTypeEC,
@@ -722,11 +728,13 @@ func (t *CheckPackageRedundancy) noneToSeg(ctx ExecuteContext, obj stgmod.Object
var evtTargetBlocks []stgmod.Block
var evtBlockTrans []stgmod.DataTransfer
for i, stg := range uploadStgs {
r := ret[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue)
blocks = append(blocks, stgmod.ObjectBlock{
ObjectID: obj.Object.ObjectID,
Index: i,
StorageID: stg.Storage.Storage.StorageID,
FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
FileHash: r.Hash,
Size: r.Size,
})
evtTargetBlocks = append(evtTargetBlocks, stgmod.Block{
BlockType: stgmod.BlockTypeSegment,
@@ -807,11 +815,13 @@ func (t *CheckPackageRedundancy) repToRep(ctx ExecuteContext, obj stgmod.ObjectD
var blocks []stgmod.ObjectBlock
var blockChgs []stgmod.BlockChange
for i, stg := range uploadStgs {
r := ret[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue)
blocks = append(blocks, stgmod.ObjectBlock{
ObjectID: obj.Object.ObjectID,
Index: 0,
StorageID: stg.Storage.Storage.StorageID,
FileHash: ret[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
FileHash: r.Hash,
Size: r.Size,
})
blockChgs = append(blockChgs, &stgmod.BlockChangeClone{
BlockType: stgmod.BlockTypeRaw,
@@ -904,11 +914,13 @@ func (t *CheckPackageRedundancy) ecToRep(ctx ExecuteContext, obj stgmod.ObjectDe
var blocks []stgmod.ObjectBlock

for i := range uploadStgs {
r := ioRet[fmt.Sprintf("%d", i)].(*ops2.ShardInfoValue)
blocks = append(blocks, stgmod.ObjectBlock{
ObjectID: obj.Object.ObjectID,
Index: 0,
StorageID: uploadStgs[i].Storage.Storage.StorageID,
FileHash: ioRet[fmt.Sprintf("%d", i)].(*ops2.FileHashValue).Hash,
FileHash: r.Hash,
Size: r.Size,
})
}

@@ -1029,6 +1041,7 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet
// 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
if ok && lo.Contains(grp.StorageIDs, stg.Storage.Storage.StorageID) {
newBlock.FileHash = grp.FileHash
newBlock.Size = grp.Size
newBlocks = append(newBlocks, newBlock)
continue
}
@@ -1071,7 +1084,9 @@ func (t *CheckPackageRedundancy) ecToEC(ctx ExecuteContext, obj stgmod.ObjectDet
return nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
}

newBlocks[idx].FileHash = v.(*ops2.FileHashValue).Hash
r := v.(*ops2.ShardInfoValue)
newBlocks[idx].FileHash = r.Hash
newBlocks[idx].Size = r.Size
}

var evtBlockTrans []stgmod.DataTransfer
@@ -1268,6 +1283,7 @@ func (t *CheckPackageRedundancy) reconstructLRC(ctx ExecuteContext, obj stgmod.O
// 如果新选中的节点已经记录在Block表中,那么就不需要任何变更
if ok && lo.Contains(grp.StorageIDs, storage.Storage.Storage.StorageID) {
newBlock.FileHash = grp.FileHash
newBlock.Size = grp.Size
newBlocks = append(newBlocks, newBlock)
continue
}
@@ -1311,7 +1327,9 @@ func (t *CheckPackageRedundancy) reconstructLRC(ctx ExecuteContext, obj stgmod.O
return nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
}

newBlocks[idx].FileHash = v.(*ops2.FileHashValue).Hash
r := v.(*ops2.ShardInfoValue)
newBlocks[idx].FileHash = r.Hash
newBlocks[idx].Size = r.Size
}

return &coormq.UpdatingObjectRedundancy{


+ 7
- 1
scanner/internal/event/clean_pinned.go View File

@@ -249,6 +249,7 @@ type objectBlock struct {
HasEntity bool // 节点拥有实际的文件数据块
HasShadow bool // 如果节点拥有完整文件数据,那么认为这个节点拥有所有块,这些块被称为影子块
FileHash cdssdk.FileHash // 只有在拥有实际文件数据块时,这个字段才有值
Size int64 // 块大小
}

type stgDist struct {
@@ -590,6 +591,7 @@ func (t *CleanPinned) initBlockList(ctx *annealingState) {
StorageID: b.StorageID,
HasEntity: true,
FileHash: b.FileHash,
Size: b.Size,
})
blocksMap[b.StorageID] = blocks
}
@@ -774,6 +776,7 @@ func (t *CleanPinned) makePlansForRepObject(allStgInfos map[cdssdk.StorageID]*st
Index: solu.blockList[i].Index,
StorageID: solu.blockList[i].StorageID,
FileHash: obj.Object.FileHash,
Size: solu.blockList[i].Size,
})
}
}
@@ -859,6 +862,7 @@ func (t *CleanPinned) makePlansForECObject(allStgInfos map[cdssdk.StorageID]*stg
Index: block.Index,
StorageID: block.StorageID,
FileHash: block.FileHash,
Size: block.Size,
})

// 如果这个块是影子块,那么就要从完整对象里重建这个块
@@ -1024,7 +1028,9 @@ func (t *CleanPinned) populateECObjectEntry(entry *coormq.UpdatingObjectRedundan

key := fmt.Sprintf("%d.%d", obj.Object.ObjectID, entry.Blocks[i].Index)
// 不应该出现key不存在的情况
entry.Blocks[i].FileHash = ioRets[key].(*ops2.FileHashValue).Hash
r := ioRets[key].(*ops2.ShardInfoValue)
entry.Blocks[i].FileHash = r.Hash
entry.Blocks[i].Size = r.Size
}
}



Loading…
Cancel
Save