diff --git a/common/pkgs/ioswitch2/fromto.go b/common/pkgs/ioswitch2/fromto.go index 159b2d4..4993640 100644 --- a/common/pkgs/ioswitch2/fromto.go +++ b/common/pkgs/ioswitch2/fromto.go @@ -143,18 +143,28 @@ func (t *ToShardStore) GetRange() exec.Range { return t.Range } -// type ToStorage struct { -// Storage cdssdk.Storage -// DataIndex int -// } - -// func NewToStorage(storage cdssdk.Storage, dataIndex int) *ToStorage { -// return &ToStorage{ -// Storage: storage, -// DataIndex: dataIndex, -// } -// } - -// func (t *ToStorage) GetDataIndex() int { -// return t.DataIndex -// } +type LoadToShared struct { + Hub cdssdk.Hub + Storage cdssdk.Storage + UserID cdssdk.UserID + PackageID cdssdk.PackageID + Path string +} + +func NewLoadToShared(hub cdssdk.Hub, storage cdssdk.Storage, userID cdssdk.UserID, packageID cdssdk.PackageID, path string) *LoadToShared { + return &LoadToShared{ + Hub: hub, + Storage: storage, + UserID: userID, + PackageID: packageID, + Path: path, + } +} + +func (t *LoadToShared) GetDataIndex() int { + return -1 +} + +func (t *LoadToShared) GetRange() exec.Range { + return exec.Range{} +} diff --git a/common/pkgs/ioswitch2/parser/parser.go b/common/pkgs/ioswitch2/parser/parser.go index 27fe20e..b05edab 100644 --- a/common/pkgs/ioswitch2/parser/parser.go +++ b/common/pkgs/ioswitch2/parser/parser.go @@ -283,15 +283,8 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToN case *ioswitch2.ToShardStore: n := ctx.DAG.NewShardWrite(t.Storage.StorageID, t.FileHashStoreKey) - switch addr := t.Hub.Address.(type) { - case *cdssdk.HttpAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: t.Hub}) - - case *cdssdk.GRPCAddressInfo: - n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: t.Hub, Address: *addr}) - - default: - return nil, fmt.Errorf("unsupported node address type %T", addr) + if err := p.setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { + return nil, err } n.Env().Pinned = true @@ -305,11 +298,37 @@ func (p *DefaultParser) buildToNode(ctx *ParseContext, t ioswitch2.To) (ops2.ToN return n, nil + case *ioswitch2.LoadToShared: + n := ctx.DAG.NewSharedLoad(t.Storage.StorageID, t.UserID, t.PackageID, t.Path) + + if err := p.setEnvByAddress(n, t.Hub, t.Hub.Address); err != nil { + return nil, err + } + + n.Env().Pinned = true + + return n, nil + default: return nil, fmt.Errorf("unsupported to type %T", t) } } +func (p *DefaultParser) setEnvByAddress(n dag.Node, hub cdssdk.Hub, addr cdssdk.HubAddressInfo) error { + switch addr := addr.(type) { + case *cdssdk.HttpAddressInfo: + n.Env().ToEnvWorker(&ioswitch2.HttpHubWorker{Hub: hub}) + + case *cdssdk.GRPCAddressInfo: + n.Env().ToEnvWorker(&ioswitch2.AgentWorker{Hub: hub, Address: *addr}) + + default: + return fmt.Errorf("unsupported node address type %T", addr) + } + + return nil +} + // 删除输出流未被使用的Join指令 func (p *DefaultParser) removeUnusedJoin(ctx *ParseContext) bool { changed := false