Browse Source

解决调试问题

gitlink
Sydonian 1 year ago
parent
commit
6f60d55078
24 changed files with 177 additions and 132 deletions
  1. +1
    -1
      client/internal/cmdline/put.go
  2. +1
    -1
      common/pkgs/cmd/upload_objects.go
  3. +8
    -8
      common/pkgs/db2/bucket.go
  4. +0
    -35
      common/pkgs/db2/model/model.go
  5. +11
    -12
      common/pkgs/db2/object.go
  6. +7
    -7
      common/pkgs/db2/package.go
  7. +8
    -4
      common/pkgs/db2/package_access_stat.go
  8. +2
    -2
      common/pkgs/db2/storage.go
  9. +5
    -5
      common/pkgs/ioswitch2/ops2/chunked.go
  10. +2
    -2
      common/pkgs/ioswitch2/ops2/clone.go
  11. +2
    -2
      common/pkgs/ioswitch2/ops2/ec.go
  12. +1
    -1
      common/pkgs/ioswitch2/ops2/file.go
  13. +2
    -0
      common/pkgs/ioswitch2/ops2/multipart.go
  14. +1
    -1
      common/pkgs/ioswitch2/ops2/range.go
  15. +2
    -1
      common/pkgs/ioswitch2/ops2/shard_store.go
  16. +18
    -7
      common/pkgs/ioswitch2/parser/parser.go
  17. +5
    -5
      common/pkgs/ioswitchlrc/ops2/chunked.go
  18. +2
    -2
      common/pkgs/ioswitchlrc/ops2/clone.go
  19. +3
    -3
      common/pkgs/ioswitchlrc/ops2/ec.go
  20. +1
    -1
      common/pkgs/ioswitchlrc/ops2/range.go
  21. +1
    -1
      common/pkgs/ioswitchlrc/ops2/shard_store.go
  22. +16
    -5
      common/pkgs/ioswitchlrc/parser/passes.go
  23. +32
    -11
      common/pkgs/storage/local/shard_store.go
  24. +46
    -15
      coordinator/internal/mq/storage.go

+ 1
- 1
client/internal/cmdline/put.go View File

@@ -18,7 +18,7 @@ import (
func init() {
var nodeID int64
cmd := &cobra.Command{
Use: "put",
Use: "put [local] [remote]",
Short: "Upload files to CDS",
Args: func(cmd *cobra.Command, args []string) error {
if err := cobra.ExactArgs(2)(cmd, args); err != nil {


+ 1
- 1
common/pkgs/cmd/upload_objects.go View File

@@ -100,7 +100,7 @@ func (t *UploadObjects) Execute(ctx *UploadObjectsContext) (*UploadObjectsResult
}

if len(userStgs) == 0 {
return nil, fmt.Errorf("user no available nodes")
return nil, fmt.Errorf("user no available storages")
}

// 给上传节点的IPFS加锁


+ 8
- 8
common/pkgs/db2/bucket.go View File

@@ -91,22 +91,22 @@ func (db *BucketDB) Create(ctx SQLContext, userID cdssdk.UserID, bucketName stri
Where("UserBucket.UserID = ? AND Bucket.Name = ?", userID, bucketName).
Scan(&bucketID).Error

if err == nil {
return 0, fmt.Errorf("bucket name exists")
if err != nil {
return 0, err
}

if !errors.Is(err, gorm.ErrRecordNotFound) {
return 0, err
if bucketID > 0 {
return 0, fmt.Errorf("bucket name exists")
}

newBucket := cdssdk.Bucket{Name: bucketName, CreatorID: userID}
if err := ctx.Create(&newBucket).Error; err != nil {
if err := ctx.Table("Bucket").Create(&newBucket).Error; err != nil {
return 0, fmt.Errorf("insert bucket failed, err: %w", err)
}

err = ctx.Exec("insert into UserBucket(UserID,BucketID) values(?,?)", userID, bucketID).Error
if err := ctx.Create(&newBucket).Error; err != nil {
return 0, fmt.Errorf("insert bucket failed, err: %w", err)
err = ctx.Table("UserBucket").Create(&model.UserBucket{UserID: userID, BucketID: newBucket.BucketID}).Error
if err != nil {
return 0, fmt.Errorf("insert user bucket: %w", err)
}

return newBucket.BucketID, nil


+ 0
- 35
common/pkgs/db2/model/model.go View File

@@ -1,12 +1,9 @@
package model

import (
"fmt"
"reflect"
"time"

cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
)

@@ -57,38 +54,6 @@ type Object = cdssdk.Object

type NodeConnectivity = cdssdk.NodeConnectivity

// 由于Object的Redundancy字段是interface,所以不能直接将查询结果scan成Object,必须先scan成TempObject,
// 再.ToObject()转成Object
type TempObject struct {
cdssdk.Object
Redundancy RedundancyWarpper `db:"Redundancy"`
}

func (o *TempObject) ToObject() cdssdk.Object {
obj := o.Object
obj.Redundancy = o.Redundancy.Value
return obj
}

type RedundancyWarpper struct {
Value cdssdk.Redundancy
}

func (o *RedundancyWarpper) Scan(src interface{}) error {
data, ok := src.([]uint8)
if !ok {
return fmt.Errorf("unknow src type: %v", reflect.TypeOf(data))
}

red, err := serder.JSONToObjectEx[cdssdk.Redundancy](data)
if err != nil {
return err
}

o.Value = red
return nil
}

type ObjectBlock = stgmod.ObjectBlock

type Cache struct {


+ 11
- 12
common/pkgs/db2/object.go View File

@@ -8,7 +8,6 @@ import (
"gitlink.org.cn/cloudream/common/utils/sort2"
"gorm.io/gorm/clause"

"github.com/samber/lo"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
"gitlink.org.cn/cloudream/storage/common/pkgs/db2/model"
@@ -24,9 +23,9 @@ func (db *DB) Object() *ObjectDB {
}

func (db *ObjectDB) GetByID(ctx SQLContext, objectID cdssdk.ObjectID) (model.Object, error) {
var ret model.TempObject
var ret cdssdk.Object
err := ctx.Table("Object").Where("ObjectID = ?", objectID).First(&ret).Error
return ret.ToObject(), err
return ret, err
}

func (db *ObjectDB) BatchTestObjectID(ctx SQLContext, objectIDs []cdssdk.ObjectID) (map[cdssdk.ObjectID]bool, error) {
@@ -53,13 +52,13 @@ func (db *ObjectDB) BatchGet(ctx SQLContext, objectIDs []cdssdk.ObjectID) ([]mod
return nil, nil
}

var objs []model.TempObject
var objs []cdssdk.Object
err := ctx.Table("Object").Where("ObjectID IN ?", objectIDs).Order("ObjectID ASC").Find(&objs).Error
if err != nil {
return nil, err
}

return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil
return objs, nil
}

func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID, pathes []string) ([]cdssdk.Object, error) {
@@ -67,13 +66,13 @@ func (db *ObjectDB) BatchGetByPackagePath(ctx SQLContext, pkgID cdssdk.PackageID
return nil, nil
}

var objs []model.TempObject
var objs []cdssdk.Object
err := ctx.Table("Object").Where("PackageID = ? AND Path IN ?", pkgID, pathes).Find(&objs).Error
if err != nil {
return nil, err
}

return lo.Map(objs, func(o model.TempObject, idx int) cdssdk.Object { return o.ToObject() }), nil
return objs, nil
}

func (db *ObjectDB) Create(ctx SQLContext, obj cdssdk.Object) (cdssdk.ObjectID, error) {
@@ -233,13 +232,13 @@ func (db *ObjectDB) BatchAdd(ctx SQLContext, packageID cdssdk.PackageID, adds []

// 创建 Cache
caches := make([]model.Cache, len(adds))
for _, add := range adds {
caches = append(caches, model.Cache{
for i, add := range adds {
caches[i] = model.Cache{
FileHash: add.FileHash,
StorageID: add.StorageID,
CreateTime: time.Now(),
Priority: 0,
})
}
}
if err := ctx.Table("Cache").Create(&caches).Error; err != nil {
return nil, fmt.Errorf("batch create caches: %w", err)
@@ -335,9 +334,9 @@ func (db *ObjectDB) BatchDelete(ctx SQLContext, ids []cdssdk.ObjectID) error {
return nil
}

return ctx.Table("Object").Where("ObjectID IN ?", ids).Delete(&model.TempObject{}).Error
return ctx.Table("Object").Where("ObjectID IN ?", ids).Delete(&cdssdk.Object{}).Error
}

func (db *ObjectDB) DeleteInPackage(ctx SQLContext, packageID cdssdk.PackageID) error {
return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&model.TempObject{}).Error
return ctx.Table("Object").Where("PackageID = ?", packageID).Delete(&cdssdk.Object{}).Error
}

+ 7
- 7
common/pkgs/db2/package.go View File

@@ -116,21 +116,21 @@ func (db *PackageDB) Create(ctx SQLContext, bucketID cdssdk.BucketID, name strin
err := ctx.Table("Package").
Select("PackageID").
Where("Name = ? AND BucketID = ?", name, bucketID).
First(&packageID).Error
Scan(&packageID).Error

if err == nil {
return 0, fmt.Errorf("package with given Name and BucketID already exists")
if err != nil {
return 0, err
}
if !errors.Is(err, gorm.ErrRecordNotFound) {
return 0, fmt.Errorf("query Package by PackageName and BucketID failed, err: %w", err)
if packageID != 0 {
return 0, errors.New("package already exists")
}

newPackage := model.Package{Name: name, BucketID: bucketID, State: cdssdk.PackageStateNormal}
newPackage := cdssdk.Package{Name: name, BucketID: bucketID, State: cdssdk.PackageStateNormal}
if err := ctx.Create(&newPackage).Error; err != nil {
return 0, fmt.Errorf("insert package failed, err: %w", err)
}

return cdssdk.PackageID(newPackage.PackageID), nil
return newPackage.PackageID, nil
}

// SoftDelete 设置一个对象被删除,并将相关数据删除


+ 8
- 4
common/pkgs/db2/package_access_stat.go View File

@@ -4,6 +4,8 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
stgmod "gitlink.org.cn/cloudream/storage/common/models"
coormq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/coordinator"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)

type PackageAccessStatDB struct {
@@ -41,10 +43,12 @@ func (*PackageAccessStatDB) BatchAddCounter(ctx SQLContext, entries []coormq.Add
return nil
}

sql := "INSERT INTO PackageAccessStat(PackageID, StorageID, Counter, Amount) " +
"VALUES(:PackageID, :StorageID, :Counter, 0) ON DUPLICATE KEY UPDATE Counter = Counter + VALUES(Counter)"

return ctx.Exec(sql, entries).Error
return ctx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "PackageID"}, {Name: "StorageID"}},
DoUpdates: clause.Assignments(map[string]any{
"Counter": gorm.Expr("Counter + values(Counter)"),
}),
}).Table("PackageAccessStat").Create(&entries).Error
}

func (*PackageAccessStatDB) BatchUpdateAmount(ctx SQLContext, pkgIDs []cdssdk.PackageID, historyWeight float64) error {


+ 2
- 2
common/pkgs/db2/storage.go View File

@@ -43,7 +43,7 @@ func (db *StorageDB) GetUserStorages(ctx SQLContext, userID cdssdk.UserID) ([]mo

func (db *StorageDB) BatchGetAllStorageIDs(ctx SQLContext, start int, count int) ([]cdssdk.StorageID, error) {
var ret []cdssdk.StorageID
err := ctx.Table("Storage").Select("StorageID").Find(ret).Limit(count).Offset(start).Error
err := ctx.Table("Storage").Select("StorageID").Find(&ret).Limit(count).Offset(start).Error
return ret, err
}

@@ -79,6 +79,6 @@ func (db *StorageDB) GetUserStorageByName(ctx SQLContext, userID cdssdk.UserID,

func (db *StorageDB) GetHubStorages(ctx SQLContext, hubID cdssdk.NodeID) ([]model.Storage, error) {
var stgs []model.Storage
err := ctx.Table("Storage").Select("Storage.*").Find(&stgs, "NodeID = ?", hubID).Error
err := ctx.Table("Storage").Select("Storage.*").Find(&stgs, "MasterHub = ?", hubID).Error
return stgs, err
}

+ 5
- 5
common/pkgs/ioswitch2/ops2/chunked.go View File

@@ -116,7 +116,7 @@ func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode {

func (t *ChunkedSplitNode) Split(input *dag.Var, cnt int) {
t.InputStreams().EnsureSize(1)
input.Connect(t, 0)
input.StreamTo(t, 0)
t.OutputStreams().Resize(cnt)
for i := 0; i < cnt; i++ {
t.OutputStreams().Setup(t, t.Graph().NewVar(), i)
@@ -136,11 +136,11 @@ func (t *ChunkedSplitNode) Clear() {
return
}

t.InputStreams().Get(0).Disconnect(t, 0)
t.InputStreams().Get(0).StreamNotTo(t, 0)
t.InputStreams().Resize(0)

for _, out := range t.OutputStreams().RawArray() {
out.DisconnectAll()
out.NoInputAllStream()
}
t.OutputStreams().Resize(0)
}
@@ -176,7 +176,7 @@ func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode {

func (t *ChunkedJoinNode) AddInput(str *dag.Var) {
idx := t.InputStreams().EnlargeOne()
str.Connect(t, idx)
str.StreamTo(t, idx)
}

func (t *ChunkedJoinNode) Joined() *dag.Var {
@@ -185,7 +185,7 @@ func (t *ChunkedJoinNode) Joined() *dag.Var {

func (t *ChunkedJoinNode) RemoveAllInputs() {
for i, in := range t.InputStreams().RawArray() {
in.Disconnect(t, i)
in.StreamNotTo(t, i)
}
t.InputStreams().Resize(0)
}


+ 2
- 2
common/pkgs/ioswitch2/ops2/clone.go View File

@@ -83,7 +83,7 @@ func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType {

func (t *CloneStreamType) SetInput(raw *dag.Var) {
t.InputStreams().EnsureSize(1)
raw.Connect(t, 0)
raw.StreamTo(t, 0)
}

func (t *CloneStreamType) NewOutput() *dag.Var {
@@ -117,7 +117,7 @@ func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType {

func (t *CloneVarType) SetInput(raw *dag.Var) {
t.InputValues().EnsureSize(1)
raw.Connect(t, 0)
raw.ValueTo(t, 0)
}

func (t *CloneVarType) NewOutput() *dag.Var {


+ 2
- 2
common/pkgs/ioswitch2/ops2/ec.go View File

@@ -222,12 +222,12 @@ func (b *GraphNodeBuilder) NewECMultiply(ec cdssdk.ECRedundancy) *ECMultiplyNode
func (t *ECMultiplyNode) AddInput(str *dag.Var, dataIndex int) {
t.InputIndexes = append(t.InputIndexes, dataIndex)
idx := t.InputStreams().EnlargeOne()
str.Connect(t, idx)
str.StreamTo(t, idx)
}

func (t *ECMultiplyNode) RemoveAllInputs() {
for i, in := range t.InputStreams().RawArray() {
in.Disconnect(t, i)
in.StreamNotTo(t, i)
}
t.InputStreams().Resize(0)
t.InputIndexes = nil


+ 1
- 1
common/pkgs/ioswitch2/ops2/file.go View File

@@ -133,7 +133,7 @@ func (t *FileWriteNode) Input() dag.Slot {

func (t *FileWriteNode) SetInput(str *dag.Var) {
t.InputStreams().EnsureSize(1)
str.Connect(t, 0)
str.StreamTo(t, 0)
}

func (t *FileWriteNode) GenerateOp() (exec.Op, error) {


+ 2
- 0
common/pkgs/ioswitch2/ops2/multipart.go View File

@@ -1,5 +1,6 @@
package ops2

/*
import (
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
@@ -186,3 +187,4 @@ func (t MultipartUploadNode) GenerateOp() (exec.Op, error) {
PartSize: t.PartSize,
}, nil
}
*/

+ 1
- 1
common/pkgs/ioswitch2/ops2/range.go View File

@@ -88,7 +88,7 @@ func (b *GraphNodeBuilder) NewRange() *RangeNode {

func (t *RangeNode) RangeStream(input *dag.Var, rng exec.Range) *dag.Var {
t.InputStreams().EnsureSize(1)
input.Connect(t, 0)
input.StreamTo(t, 0)
t.Range = rng
output := t.Graph().NewVar()
t.OutputStreams().Setup(t, output, 0)


+ 2
- 1
common/pkgs/ioswitch2/ops2/shard_store.go View File

@@ -164,6 +164,7 @@ type ShardWriteNode struct {

func (b *GraphNodeBuilder) NewShardWrite(stgID cdssdk.StorageID, fileHashStoreKey string) *ShardWriteNode {
node := &ShardWriteNode{
StorageID: stgID,
FileHashStoreKey: fileHashStoreKey,
}
b.AddNode(node)
@@ -172,7 +173,7 @@ func (b *GraphNodeBuilder) NewShardWrite(stgID cdssdk.StorageID, fileHashStoreKe

func (t *ShardWriteNode) SetInput(input *dag.Var) {
t.InputStreams().EnsureSize(1)
input.Connect(t, 0)
input.StreamTo(t, 0)
t.OutputValues().SetupNew(t, t.Graph().NewVar())
}



+ 18
- 7
common/pkgs/ioswitch2/parser/parser.go View File

@@ -282,7 +282,18 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToN
switch t := t.(type) {
case *ioswitch2.ToShardStore:
n := ctx.DAG.NewShardWrite(t.Storage.StorageID, t.FileHashStoreKey)
n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Hub})

switch addr := t.Hub.Address.(type) {
case *cdssdk.HttpAddressInfo:
n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Node: t.Hub})

case *cdssdk.GRPCAddressInfo:
n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Node: t.Hub, Address: *addr})

default:
return nil, fmt.Errorf("unsupported node address type %T", addr)
}

n.Env().Pinned = true

return n, nil
@@ -404,9 +415,9 @@ func (p *DefaultParser) omitSplitJoin(ctx *ParseContext) bool {
// F->Split->Join->T 变换为:F->T
splitInput := splitNode.InputStreams().Get(0)
for _, to := range joinNode.Joined().To().RawArray() {
splitInput.Connect(to.Node, to.SlotIndex)
splitInput.StreamTo(to.Node, to.SlotIndex)
}
splitInput.Disconnect(splitNode, 0)
splitInput.StreamNotTo(splitNode, 0)

// 并删除这两个指令
ctx.DAG.RemoveNode(joinNode)
@@ -528,7 +539,7 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) {
Offset: toRng.Offset - ctx.StreamRange.Offset,
Length: toRng.Length,
})
toInput.Var.Disconnect(toNode, toInput.Index)
toInput.Var.StreamNotTo(toNode, toInput.Index)
toNode.SetInput(rnged)

} else {
@@ -544,7 +555,7 @@ func (p *DefaultParser) generateRange(ctx *ParseContext) {
Offset: toRng.Offset - blkStart,
Length: toRng.Length,
})
toInput.Var.Disconnect(toNode, toInput.Index)
toInput.Var.StreamNotTo(toNode, toInput.Index)
toNode.SetInput(rnged)
}
}
@@ -561,7 +572,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) {
c := ctx.DAG.NewCloneStream()
*c.Env() = *node.Env()
for _, to := range out.To().RawArray() {
c.NewOutput().Connect(to.Node, to.SlotIndex)
c.NewOutput().StreamTo(to.Node, to.SlotIndex)
}
out.To().Resize(0)
c.SetInput(out)
@@ -575,7 +586,7 @@ func (p *DefaultParser) generateClone(ctx *ParseContext) {
t := ctx.DAG.NewCloneValue()
*t.Env() = *node.Env()
for _, to := range out.To().RawArray() {
t.NewOutput().Connect(to.Node, to.SlotIndex)
t.NewOutput().ValueTo(to.Node, to.SlotIndex)
}
out.To().Resize(0)
t.SetInput(out)


+ 5
- 5
common/pkgs/ioswitchlrc/ops2/chunked.go View File

@@ -116,7 +116,7 @@ func (b *GraphNodeBuilder) NewChunkedSplit(chunkSize int) *ChunkedSplitNode {

func (t *ChunkedSplitNode) Split(input *dag.Var, cnt int) {
t.InputStreams().EnsureSize(1)
input.Connect(t, 0)
input.StreamTo(t, 0)
t.OutputStreams().Resize(cnt)
for i := 0; i < cnt; i++ {
t.OutputStreams().Setup(t, t.Graph().NewVar(), i)
@@ -136,11 +136,11 @@ func (t *ChunkedSplitNode) Clear() {
return
}

t.InputStreams().Get(0).Disconnect(t, 0)
t.InputStreams().Get(0).StreamNotTo(t, 0)
t.InputStreams().Resize(0)

for _, out := range t.OutputStreams().RawArray() {
out.DisconnectAll()
out.NoInputAllStream()
}
t.OutputStreams().Resize(0)
}
@@ -176,7 +176,7 @@ func (b *GraphNodeBuilder) NewChunkedJoin(chunkSize int) *ChunkedJoinNode {

func (t *ChunkedJoinNode) AddInput(str *dag.Var) {
idx := t.InputStreams().EnlargeOne()
str.Connect(t, idx)
str.StreamTo(t, idx)
}

func (t *ChunkedJoinNode) Joined() *dag.Var {
@@ -185,7 +185,7 @@ func (t *ChunkedJoinNode) Joined() *dag.Var {

func (t *ChunkedJoinNode) RemoveAllInputs() {
for i, in := range t.InputStreams().RawArray() {
in.Disconnect(t, i)
in.StreamNotTo(t, i)
}
t.InputStreams().Resize(0)
}


+ 2
- 2
common/pkgs/ioswitchlrc/ops2/clone.go View File

@@ -83,7 +83,7 @@ func (b *GraphNodeBuilder) NewCloneStream() *CloneStreamType {

func (t *CloneStreamType) SetInput(raw *dag.Var) {
t.InputStreams().EnsureSize(1)
raw.Connect(t, 0)
raw.StreamTo(t, 0)
}

func (t *CloneStreamType) NewOutput() *dag.Var {
@@ -117,7 +117,7 @@ func (b *GraphNodeBuilder) NewCloneValue() *CloneVarType {

func (t *CloneVarType) SetInput(raw *dag.Var) {
t.InputValues().EnsureSize(1)
raw.Connect(t, 0)
raw.ValueTo(t, 0)
}

func (t *CloneVarType) NewOutput() *dag.Var {


+ 3
- 3
common/pkgs/ioswitchlrc/ops2/ec.go View File

@@ -131,12 +131,12 @@ func (b *GraphNodeBuilder) NewLRCConstructAny(lrc cdssdk.LRCRedundancy) *LRCCons
func (t *LRCConstructAnyNode) AddInput(str *dag.Var, dataIndex int) {
t.InputIndexes = append(t.InputIndexes, dataIndex)
idx := t.InputStreams().EnlargeOne()
str.Connect(t, idx)
str.StreamTo(t, idx)
}

func (t *LRCConstructAnyNode) RemoveAllInputs() {
for i, in := range t.InputStreams().RawArray() {
in.Disconnect(t, i)
in.StreamNotTo(t, i)
}
t.InputStreams().Resize(0)
t.InputIndexes = nil
@@ -191,7 +191,7 @@ func (t *LRCConstructGroupNode) SetupForTarget(blockIdx int, inputs []*dag.Var)
t.InputStreams().Resize(0)
for _, in := range inputs {
idx := t.InputStreams().EnlargeOne()
in.Connect(t, idx)
in.StreamTo(t, idx)
}

output := t.Graph().NewVar()


+ 1
- 1
common/pkgs/ioswitchlrc/ops2/range.go View File

@@ -88,7 +88,7 @@ func (b *GraphNodeBuilder) NewRange() *RangeNode {

func (t *RangeNode) RangeStream(input *dag.Var, rng exec.Range) *dag.Var {
t.InputStreams().EnsureSize(1)
input.Connect(t, 0)
input.StreamTo(t, 0)
t.Range = rng
output := t.Graph().NewVar()
t.OutputStreams().Setup(t, output, 0)


+ 1
- 1
common/pkgs/ioswitchlrc/ops2/shard_store.go View File

@@ -171,7 +171,7 @@ func (b *GraphNodeBuilder) NewShardWrite(fileHashStoreKey string) *ShardWriteNod

func (t *ShardWriteNode) SetInput(input *dag.Var) {
t.InputStreams().EnsureSize(1)
input.Connect(t, 0)
input.StreamTo(t, 0)
t.OutputValues().SetupNew(t, t.Graph().NewVar())
}



+ 16
- 5
common/pkgs/ioswitchlrc/parser/passes.go View File

@@ -72,6 +72,7 @@ func buildFromNode(ctx *GenerateContext, f ioswitchlrc.From) (ops2.FromNode, err
t.Open.WithNullableLength(blkRange.Offset, blkRange.Length)
}

// TODO2 支持HTTP协议
t.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: f.Node, Address: *f.Node.Address.(*cdssdk.GRPCAddressInfo)})
t.Env().Pinned = true

@@ -101,7 +102,17 @@ func buildToNode(ctx *GenerateContext, t ioswitchlrc.To) (ops2.ToNode, error) {
switch t := t.(type) {
case *ioswitchlrc.ToNode:
n := ctx.DAG.NewShardWrite(t.FileHashStoreKey)
n.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: t.Hub})
switch addr := t.Hub.Address.(type) {
// case *cdssdk.HttpAddressInfo:
// n.Env().ToEnvWorker(&ioswitchlrc.HttpHubWorker{Node: t.Hub})
// TODO2 支持HTTP协议
case *cdssdk.GRPCAddressInfo:
n.Env().ToEnvWorker(&ioswitchlrc.AgentWorker{Node: t.Hub, Address: *addr})

default:
return nil, fmt.Errorf("unsupported node address type %T", addr)
}

n.Env().Pinned = true

return n, nil
@@ -227,7 +238,7 @@ func generateRange(ctx *GenerateContext) {
Offset: toRng.Offset - ctx.StreamRange.Offset,
Length: toRng.Length,
})
toInput.Var.Disconnect(toNode, toInput.Index)
toInput.Var.StreamNotTo(toNode, toInput.Index)
toNode.SetInput(rnged)

} else {
@@ -243,7 +254,7 @@ func generateRange(ctx *GenerateContext) {
Offset: toRng.Offset - blkStart,
Length: toRng.Length,
})
toInput.Var.Disconnect(toNode, toInput.Index)
toInput.Var.StreamNotTo(toNode, toInput.Index)
toNode.SetInput(rnged)
}
}
@@ -260,7 +271,7 @@ func generateClone(ctx *GenerateContext) {
t := ctx.DAG.NewCloneStream()
*t.Env() = *node.Env()
for _, to := range out.To().RawArray() {
t.NewOutput().Connect(to.Node, to.SlotIndex)
t.NewOutput().StreamTo(to.Node, to.SlotIndex)
}
out.To().Resize(0)
t.SetInput(out)
@@ -274,7 +285,7 @@ func generateClone(ctx *GenerateContext) {
t := ctx.DAG.NewCloneValue()
*t.Env() = *node.Env()
for _, to := range out.To().RawArray() {
t.NewOutput().Connect(to.Node, to.SlotIndex)
t.NewOutput().ValueTo(to.Node, to.SlotIndex)
}
out.To().Resize(0)
t.SetInput(out)


+ 32
- 11
common/pkgs/storage/local/shard_store.go View File

@@ -2,6 +2,7 @@ package local

import (
"crypto/sha256"
"errors"
"fmt"
"io"
"io/fs"
@@ -18,9 +19,11 @@ import (
const (
TempDir = "tmp"
BlocksDir = "blocks"
SvcName = "LocalShardStore"
)

type ShardStore struct {
stg cdssdk.Storage
cfg cdssdk.LocalShardStorage
}

@@ -31,26 +34,34 @@ func NewShardStore(stg cdssdk.Storage, cfg cdssdk.LocalShardStorage) (*ShardStor
}

return &ShardStore{
stg: stg,
cfg: cfg,
}, nil
}

func (s *ShardStore) Start(ch *types.StorageEventChan) {
logger.Infof("local shard store start, root: %v, max size: %v", s.cfg.Root, s.cfg.MaxSize)
s.getLogger().Infof("local shard store start, root: %v, max size: %v", s.cfg.Root, s.cfg.MaxSize)
}

func (s *ShardStore) Stop() {
logger.Infof("local shard store stop")
s.getLogger().Infof("local shard store stop")
}

func (s *ShardStore) New() types.ShardWriter {
file, err := os.CreateTemp(filepath.Join(s.cfg.Root, "tmp"), "tmp-*")
tmpDir := filepath.Join(s.cfg.Root, TempDir)

err := os.MkdirAll(tmpDir, 0755)
if err != nil {
return utils.ErrorShardWriter(err)
}

file, err := os.CreateTemp(tmpDir, "tmp-*")
if err != nil {
return utils.ErrorShardWriter(err)
}

return &ShardWriter{
path: filepath.Join(s.cfg.Root, "tmp", file.Name()),
path: file.Name(), // file.Name 包含tmpDir路径
file: file,
hasher: sha256.New(),
owner: s,
@@ -90,6 +101,10 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) {

blockDir := filepath.Join(s.cfg.Root, BlocksDir)
err := filepath.WalkDir(blockDir, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}

if d.IsDir() {
return nil
}
@@ -108,7 +123,7 @@ func (s *ShardStore) ListAll() ([]types.FileInfo, error) {
})
return nil
})
if err != nil {
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, err
}

@@ -122,7 +137,7 @@ func (s *ShardStore) Purge(removes []cdssdk.FileHash) error {
path := filepath.Join(s.cfg.Root, BlocksDir, fileName[:2], fileName)
err := os.Remove(path)
if err != nil {
logger.Warnf("remove file %v: %v", path, err)
s.getLogger().Warnf("remove file %v: %v", path, err)
}
}

@@ -138,18 +153,20 @@ func (s *ShardStore) Stats() types.Stats {
}

func (s *ShardStore) onWritterAbort(w *ShardWriter) {
logger.Debugf("writting file %v aborted", w.path)
s.getLogger().Debugf("writting file %v aborted", w.path)
s.removeTempFile(w.path)
}

func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (types.FileInfo, error) {
logger.Debugf("write file %v finished, size: %v, hash: %v", w.path, w.size, hash)
log := s.getLogger()

log.Debugf("write file %v finished, size: %v, hash: %v", w.path, w.size, hash)

blockDir := filepath.Join(s.cfg.Root, BlocksDir, string(hash)[:2])
err := os.MkdirAll(blockDir, 0755)
if err != nil {
s.removeTempFile(w.path)
logger.Warnf("make block dir %v: %v", blockDir, err)
log.Warnf("make block dir %v: %v", blockDir, err)
return types.FileInfo{}, fmt.Errorf("making block dir: %w", err)
}

@@ -157,7 +174,7 @@ func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (type
err = os.Rename(w.path, name)
if err != nil {
s.removeTempFile(w.path)
logger.Warnf("rename %v to %v: %v", w.path, name, err)
log.Warnf("rename %v to %v: %v", w.path, name, err)
return types.FileInfo{}, fmt.Errorf("rename file: %w", err)
}

@@ -171,6 +188,10 @@ func (s *ShardStore) onWritterFinish(w *ShardWriter, hash cdssdk.FileHash) (type
func (s *ShardStore) removeTempFile(path string) {
err := os.Remove(path)
if err != nil {
logger.Warnf("removing temp file %v: %v", path, err)
s.getLogger().Warnf("removing temp file %v: %v", path, err)
}
}

func (s *ShardStore) getLogger() logger.Logger {
return logger.WithField("Svc", SvcName).WithField("Storage", s.stg)
}

+ 46
- 15
coordinator/internal/mq/storage.go View File

@@ -51,6 +51,18 @@ func (svc *Service) GetStorageDetails(msg *coormq.GetStorageDetails) (*coormq.Ge
for _, hub := range masterHubs {
masterHubMap[hub.NodeID] = hub
}
for _, stg := range stgsMp {
if stg.Storage.MasterHub != 0 {
hub, ok := masterHubMap[stg.Storage.MasterHub]
if !ok {
logger.Warnf("master hub %v of storage %v not found, this storage will not be add to result", stg.Storage.MasterHub, stg.Storage)
delete(stgsMp, stg.Storage.StorageID)
continue
}

stg.MasterHub = &hub
}
}

// 获取分片存储
shards, err := svc.db2.ShardStorage().BatchGetByStorageIDs(tx, msg.StorageIDs)
@@ -58,14 +70,12 @@ func (svc *Service) GetStorageDetails(msg *coormq.GetStorageDetails) (*coormq.Ge
return fmt.Errorf("getting shard storage: %w", err)
}
for _, shard := range shards {
stgsMp[shard.StorageID].Shard = &shard
}

for _, stg := range stgsMp {
if stg.Shard != nil {
hub := masterHubMap[stg.MasterHub.NodeID]
stg.MasterHub = &hub
stg := stgsMp[shard.StorageID]
if stg == nil {
continue
}

stg.Shard = &shard
}

// 获取共享存储的相关信息
@@ -74,7 +84,12 @@ func (svc *Service) GetStorageDetails(msg *coormq.GetStorageDetails) (*coormq.Ge
return fmt.Errorf("getting shared storage: %w", err)
}
for _, shared := range shareds {
stgsMp[shared.StorageID].Shared = &shared
stg := stgsMp[shared.StorageID]
if stg == nil {
continue
}

stg.Shared = &shared
}

return nil
@@ -118,6 +133,18 @@ func (svc *Service) GetUserStorageDetails(msg *coormq.GetUserStorageDetails) (*c
for _, hub := range masterHubs {
masterHubMap[hub.NodeID] = hub
}
for _, stg := range stgsMp {
if stg.Storage.MasterHub != 0 {
hub, ok := masterHubMap[stg.Storage.MasterHub]
if !ok {
logger.Warnf("master hub %v of storage %v not found, this storage will not be add to result", stg.Storage.MasterHub, stg.Storage)
delete(stgsMp, stg.Storage.StorageID)
continue
}

stg.MasterHub = &hub
}
}

stgIDs := lo.Map(stgs, func(stg cdssdk.Storage, i int) cdssdk.StorageID { return stg.StorageID })

@@ -127,13 +154,12 @@ func (svc *Service) GetUserStorageDetails(msg *coormq.GetUserStorageDetails) (*c
return fmt.Errorf("getting shard storage: %w", err)
}
for _, shard := range shards {
stgsMp[shard.StorageID].Shard = &shard
}
for _, stg := range stgsMp {
if stg.Shard != nil {
hub := masterHubMap[stg.MasterHub.NodeID]
stg.MasterHub = &hub
stg := stgsMp[shard.StorageID]
if stg == nil {
continue
}

stg.Shard = &shard
}

// 获取共享存储的相关信息
@@ -142,7 +168,12 @@ func (svc *Service) GetUserStorageDetails(msg *coormq.GetUserStorageDetails) (*c
return fmt.Errorf("getting shared storage: %w", err)
}
for _, shared := range shareds {
stgsMp[shared.StorageID].Shared = &shared
stg := stgsMp[shared.StorageID]
if stg == nil {
continue
}

stg.Shared = &shared
}

return nil


Loading…
Cancel
Save