Browse Source

增加Load相关的FromTo数据结构

gitlink
Sydonian 1 year ago
parent
commit
0cb2c1c7b0
2 changed files with 53 additions and 24 deletions
  1. +25
    -15
      common/pkgs/ioswitch2/fromto.go
  2. +28
    -9
      common/pkgs/ioswitch2/parser/parser.go

+ 25
- 15
common/pkgs/ioswitch2/fromto.go View File

@@ -143,18 +143,28 @@ func (t *ToShardStore) GetRange() exec.Range {
return t.Range
}

// type ToStorage struct {
// Storage cdssdk.Storage
// DataIndex int
// }

// func NewToStorage(storage cdssdk.Storage, dataIndex int) *ToStorage {
// return &ToStorage{
// Storage: storage,
// DataIndex: dataIndex,
// }
// }

// func (t *ToStorage) GetDataIndex() int {
// return t.DataIndex
// }
type LoadToShared struct {
Hub cdssdk.Hub
Storage cdssdk.Storage
UserID cdssdk.UserID
PackageID cdssdk.PackageID
Path string
}

func NewLoadToShared(hub cdssdk.Hub, storage cdssdk.Storage, userID cdssdk.UserID, packageID cdssdk.PackageID, path string) *LoadToShared {
return &LoadToShared{
Hub: hub,
Storage: storage,
UserID: userID,
PackageID: packageID,
Path: path,
}
}

func (t *LoadToShared) GetDataIndex() int {
return -1
}

func (t *LoadToShared) GetRange() exec.Range {
return exec.Range{}
}

+ 28
- 9
common/pkgs/ioswitch2/parser/parser.go View File

@@ -283,15 +283,8 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToN
case *ioswitch2.ToShardStore:
n := ctx.DAG.NewShardWrite(t.Storage.StorageID, t.FileHashStoreKey)

switch addr := t.Hub.Address.(type) {
case *cdssdk.HttpAddressInfo:
n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: t.Hub})

case *cdssdk.GRPCAddressInfo:
n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: t.Hub, Address: *addr})

default:
return nil, fmt.Errorf("unsupported node address type %T", addr)
if err := p.setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil {
return nil, err
}

n.Env().Pinned = true
@@ -305,11 +298,37 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToN

return n, nil

case *ioswitch2.LoadToShared:
n := ctx.DAG.NewSharedLoad(t.Storage.StorageID, t.UserID, t.PackageID, t.Path)

if err := p.setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil {
return nil, err
}

n.Env().Pinned = true

return n, nil

default:
return nil, fmt.Errorf("unsupported to type %T", t)
}
}

func (p *DefaultParser) setEnvByAddress(n dag.Node, hub cdssdk.Hub, addr cdssdk.HubAddressInfo) error {
switch addr := addr.(type) {
case *cdssdk.HttpAddressInfo:
n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: hub})

case *cdssdk.GRPCAddressInfo:
n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: hub, Address: *addr})

default:
return fmt.Errorf("unsupported node address type %T", addr)
}

return nil
}

// 删除输出流未被使用的Join指令
func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool {
changed := false


Loading…
Cancel
Save