Browse Source

增加Executor返回执行结果的机制

master
Sydonian 4 months ago
parent
commit
b9a1596726
17 changed files with 128 additions and 61 deletions
  1. +1
    -1
      client/internal/services/object.go
  2. +16
    -8
      client/internal/ticktock/redundancy_recover.go
  3. +6
    -6
      client/internal/ticktock/redundancy_shrink.go
  4. +1
    -1
      client/internal/uploader/create_load.go
  5. +1
    -1
      client/internal/uploader/update.go
  6. +1
    -1
      client/internal/uploader/uploader.go
  7. +2
    -2
      client/internal/uploader/user_space_upload.go
  8. +52
    -15
      common/pkgs/ioswitch/exec/driver.go
  9. +10
    -6
      common/pkgs/ioswitch/exec/executor.go
  10. +1
    -1
      common/pkgs/ioswitch/exec/plan_builder.go
  11. +1
    -1
      common/pkgs/ioswitch/exec/worker.go
  12. +7
    -2
      common/pkgs/ioswitch2/http_hub_worker.go
  13. +7
    -3
      common/pkgs/ioswitch2/hub_worker.go
  14. +1
    -1
      common/pkgs/ioswitch2/parser/opt/misc.go
  15. +7
    -3
      common/pkgs/ioswitchlrc/hub_worker.go
  16. +3
    -1
      common/pkgs/rpc/hub/ioswitch.go
  17. +11
    -8
      hub/sdk/api/hub_io.go

+ 1
- 1
client/internal/services/object.go View File

@@ -725,7 +725,7 @@ func (svc *ObjectService) CompleteMultipartUpload(objectID types.ObjectID, index
return types.Object{}, err return types.Object{}, err
} }


shardInfo := ret["shard"].(*ops2.FileInfoValue)
shardInfo := ret.Get("shard").(*ops2.FileInfoValue)


err = db.DoTx10(svc.DB, svc.DB.Object().BatchUpdateRedundancy, []db.UpdatingObjectRedundancy{ err = db.DoTx10(svc.DB, svc.DB.Object().BatchUpdateRedundancy, []db.UpdatingObjectRedundancy{
{ {


+ 16
- 8
client/internal/ticktock/redundancy_recover.go View File

@@ -377,7 +377,7 @@ func (t *ChangeRedundancy) noneToRep(ctx *changeRedundancyContext, obj clitypes.
var blocks []clitypes.ObjectBlock var blocks []clitypes.ObjectBlock
var blockChgs []datamap.BlockChange var blockChgs []datamap.BlockChange
for i, stg := range uploadStgs { for i, stg := range uploadStgs {
r := ret[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
r := ret.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue)
blocks = append(blocks, clitypes.ObjectBlock{ blocks = append(blocks, clitypes.ObjectBlock{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Index: 0, Index: 0,
@@ -445,7 +445,7 @@ func (t *ChangeRedundancy) noneToEC(ctx *changeRedundancyContext, obj clitypes.O
var evtTargetBlocks []datamap.Block var evtTargetBlocks []datamap.Block
var evtBlockTrans []datamap.DataTransfer var evtBlockTrans []datamap.DataTransfer
for i := 0; i < red.N; i++ { for i := 0; i < red.N; i++ {
r := ioRet[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
r := ioRet.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue)
blocks = append(blocks, clitypes.ObjectBlock{ blocks = append(blocks, clitypes.ObjectBlock{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Index: i, Index: i,
@@ -526,7 +526,7 @@ func (t *ChangeRedundancy) noneToLRC(ctx *changeRedundancyContext, obj clitypes.
var evtTargetBlocks []datamap.Block var evtTargetBlocks []datamap.Block
var evtBlockTrans []datamap.DataTransfer var evtBlockTrans []datamap.DataTransfer
for i := 0; i < red.N; i++ { for i := 0; i < red.N; i++ {
r := ioRet[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
r := ioRet.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue)
blocks = append(blocks, clitypes.ObjectBlock{ blocks = append(blocks, clitypes.ObjectBlock{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Index: i, Index: i,
@@ -614,7 +614,7 @@ func (t *ChangeRedundancy) noneToSeg(ctx *changeRedundancyContext, obj clitypes.
var evtTargetBlocks []datamap.Block var evtTargetBlocks []datamap.Block
var evtBlockTrans []datamap.DataTransfer var evtBlockTrans []datamap.DataTransfer
for i, stg := range uploadStgs { for i, stg := range uploadStgs {
r := ret[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
r := ret.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue)
blocks = append(blocks, clitypes.ObjectBlock{ blocks = append(blocks, clitypes.ObjectBlock{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Index: i, Index: i,
@@ -700,7 +700,7 @@ func (t *ChangeRedundancy) repToRep(ctx *changeRedundancyContext, obj clitypes.O
var blocks []clitypes.ObjectBlock var blocks []clitypes.ObjectBlock
var blockChgs []datamap.BlockChange var blockChgs []datamap.BlockChange
for i, stg := range uploadStgs { for i, stg := range uploadStgs {
r := ret[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
r := ret.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue)
blocks = append(blocks, clitypes.ObjectBlock{ blocks = append(blocks, clitypes.ObjectBlock{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Index: 0, Index: 0,
@@ -798,7 +798,7 @@ func (t *ChangeRedundancy) ecToRep(ctx *changeRedundancyContext, obj clitypes.Ob
var blocks []clitypes.ObjectBlock var blocks []clitypes.ObjectBlock


for i := range uploadStgs { for i := range uploadStgs {
r := ioRet[fmt.Sprintf("%d", i)].(*ops2.FileInfoValue)
r := ioRet.Get(fmt.Sprintf("%d", i)).(*ops2.FileInfoValue)
blocks = append(blocks, clitypes.ObjectBlock{ blocks = append(blocks, clitypes.ObjectBlock{
ObjectID: obj.Object.ObjectID, ObjectID: obj.Object.ObjectID,
Index: 0, Index: 0,
@@ -960,11 +960,15 @@ func (t *ChangeRedundancy) ecToEC(ctx *changeRedundancyContext, obj clitypes.Obj
return nil, nil, nil return nil, nil, nil
} }


for k, v := range ret {
for k, vs := range ret.Stored {
idx, err := strconv.ParseInt(k, 10, 64) idx, err := strconv.ParseInt(k, 10, 64)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("parsing result key %s as index: %w", k, err) return nil, nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
} }
if len(vs) == 0 {
continue
}
v := vs[0]


r := v.(*ops2.FileInfoValue) r := v.(*ops2.FileInfoValue)
newBlocks[idx].FileHash = r.Hash newBlocks[idx].FileHash = r.Hash
@@ -1202,11 +1206,15 @@ func (t *ChangeRedundancy) reconstructLRC(ctx *changeRedundancyContext, obj clit
return nil, nil, nil return nil, nil, nil
} }


for k, v := range ret {
for k, vs := range ret.Stored {
idx, err := strconv.ParseInt(k, 10, 64) idx, err := strconv.ParseInt(k, 10, 64)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("parsing result key %s as index: %w", k, err) return nil, nil, fmt.Errorf("parsing result key %s as index: %w", k, err)
} }
if len(vs) == 0 {
continue
}
v := vs[0]


r := v.(*ops2.FileInfoValue) r := v.(*ops2.FileInfoValue)
newBlocks[idx].FileHash = r.Hash newBlocks[idx].FileHash = r.Hash


+ 6
- 6
client/internal/ticktock/redundancy_shrink.go View File

@@ -906,20 +906,20 @@ func (t *ChangeRedundancy) generateSysEventForECObject(solu annealingSolution, o
return []datamap.SysEventBody{transEvt, distEvt} return []datamap.SysEventBody{transEvt, distEvt}
} }


func (t *ChangeRedundancy) executePlans(ctx *changeRedundancyContext, planBld *exec.PlanBuilder, planningSpaceIDs map[clitypes.UserSpaceID]bool, reen *publock.Reentrant) (map[string]exec.VarValue, error) {
func (t *ChangeRedundancy) executePlans(ctx *changeRedundancyContext, planBld *exec.PlanBuilder, planningSpaceIDs map[clitypes.UserSpaceID]bool, reen *publock.Reentrant) (exec.PlanResult, error) {
reqBlder := reqbuilder.NewBuilder() reqBlder := reqbuilder.NewBuilder()
for id, _ := range planningSpaceIDs { for id, _ := range planningSpaceIDs {
reqBlder.UserSpace().Buzy(id) reqBlder.UserSpace().Buzy(id)
} }
err := reen.Lock(reqBlder.Build()) err := reen.Lock(reqBlder.Build())
if err != nil { if err != nil {
return nil, fmt.Errorf("locking shard resources: %w", err)
return exec.PlanResult{}, fmt.Errorf("locking shard resources: %w", err)
} }


wg := sync.WaitGroup{} wg := sync.WaitGroup{}


// 执行IO计划 // 执行IO计划
var ioSwRets map[string]exec.VarValue
var ioSwRets exec.PlanResult
var ioSwErr error var ioSwErr error
wg.Add(1) wg.Add(1)
go func() { go func() {
@@ -938,13 +938,13 @@ func (t *ChangeRedundancy) executePlans(ctx *changeRedundancyContext, planBld *e
wg.Wait() wg.Wait()


if ioSwErr != nil { if ioSwErr != nil {
return nil, ioSwErr
return exec.PlanResult{}, ioSwErr
} }


return ioSwRets, nil return ioSwRets, nil
} }


func (t *ChangeRedundancy) populateECObjectEntry(entry *db.UpdatingObjectRedundancy, obj clitypes.ObjectDetail, ioRets map[string]exec.VarValue) {
func (t *ChangeRedundancy) populateECObjectEntry(entry *db.UpdatingObjectRedundancy, obj clitypes.ObjectDetail, ioRets exec.PlanResult) {
for i := range entry.Blocks { for i := range entry.Blocks {
if entry.Blocks[i].FileHash != "" { if entry.Blocks[i].FileHash != "" {
continue continue
@@ -952,7 +952,7 @@ func (t *ChangeRedundancy) populateECObjectEntry(entry *db.UpdatingObjectRedunda


key := fmt.Sprintf("%d.%d", obj.Object.ObjectID, entry.Blocks[i].Index) key := fmt.Sprintf("%d.%d", obj.Object.ObjectID, entry.Blocks[i].Index)
// 不应该出现key不存在的情况 // 不应该出现key不存在的情况
r := ioRets[key].(*ops2.FileInfoValue)
r := ioRets.Get(key).(*ops2.FileInfoValue)
entry.Blocks[i].FileHash = r.Hash entry.Blocks[i].FileHash = r.Hash
entry.Blocks[i].Size = r.Size entry.Blocks[i].Size = r.Size
} }


+ 1
- 1
client/internal/uploader/create_load.go View File

@@ -72,7 +72,7 @@ func (u *CreateUploader) Upload(pa types.JPath, stream io.Reader, opts ...Upload
defer u.lock.Unlock() defer u.lock.Unlock()


// 记录上传结果 // 记录上传结果
shardInfo := ret["fileHash"].(*ops2.FileInfoValue)
shardInfo := ret.Get("fileHash").(*ops2.FileInfoValue)
u.successes = append(u.successes, db.AddObjectEntry{ u.successes = append(u.successes, db.AddObjectEntry{
Path: pa.String(), Path: pa.String(),
Size: shardInfo.Size, Size: shardInfo.Size,


+ 1
- 1
client/internal/uploader/update.go View File

@@ -82,7 +82,7 @@ func (w *UpdateUploader) Upload(pat types.JPath, stream io.Reader, opts ...Uploa
defer w.lock.Unlock() defer w.lock.Unlock()


// 记录上传结果 // 记录上传结果
shardInfo := ret["shardInfo"].(*ops2.FileInfoValue)
shardInfo := ret.Get("shardInfo").(*ops2.FileInfoValue)
w.successes = append(w.successes, db.AddObjectEntry{ w.successes = append(w.successes, db.AddObjectEntry{
Path: pat.String(), Path: pat.String(),
Size: shardInfo.Size, Size: shardInfo.Size,


+ 1
- 1
client/internal/uploader/uploader.go View File

@@ -270,7 +270,7 @@ func (u *Uploader) UploadPart(objID clitypes.ObjectID, index int, stream io.Read
return fmt.Errorf("executing plan: %w", err) return fmt.Errorf("executing plan: %w", err)
} }


shardInfo := ret["shard"].(*ops2.FileInfoValue)
shardInfo := ret.Get("shard").(*ops2.FileInfoValue)
err = u.db.DoTx(func(tx db.SQLContext) error { err = u.db.DoTx(func(tx db.SQLContext) error {
return u.db.Object().AppendPart(tx, clitypes.ObjectBlock{ return u.db.Object().AppendPart(tx, clitypes.ObjectBlock{
ObjectID: objID, ObjectID: objID,


+ 2
- 2
client/internal/uploader/user_space_upload.go View File

@@ -179,7 +179,7 @@ func (u *Uploader) uploadFromBaseStore(srcSpace *clitypes.UserSpaceDetail, targe
return nil, fmt.Errorf("executing plan: %w", err) return nil, fmt.Errorf("executing plan: %w", err)
} }


adds := make([]db.AddObjectEntry, 0, len(ret))
adds := make([]db.AddObjectEntry, 0, len(ret.Stored))
for _, e := range entries { for _, e := range entries {
if e.IsDir { if e.IsDir {
continue continue
@@ -192,7 +192,7 @@ func (u *Uploader) uploadFromBaseStore(srcSpace *clitypes.UserSpaceDetail, targe
pat.DropFrontN(1) pat.DropFrontN(1)
} }


info := ret[e.Path.String()].(*ops2.FileInfoValue)
info := ret.Get(e.Path.String()).(*ops2.FileInfoValue)
adds = append(adds, db.AddObjectEntry{ adds = append(adds, db.AddObjectEntry{
Path: pat.String(), Path: pat.String(),
Size: info.Size, Size: info.Size,


+ 52
- 15
common/pkgs/ioswitch/exec/driver.go View File

@@ -15,7 +15,7 @@ import (
type Driver struct { type Driver struct {
planID PlanID planID PlanID
planBlder *PlanBuilder planBlder *PlanBuilder
callback *future.SetValueFuture[map[string]VarValue]
callback *future.SetValueFuture[PlanResult]
ctx *ExecContext ctx *ExecContext
cancel context.CancelFunc cancel context.CancelFunc
driverExec *Executor driverExec *Executor
@@ -44,20 +44,22 @@ func (e *Driver) Signal(signal *DriverSignalVar) {
e.driverExec.PutVar(signal.ID, &SignalValue{}) e.driverExec.PutVar(signal.ID, &SignalValue{})
} }


func (e *Driver) Wait(ctx context.Context) (map[string]VarValue, error) {
stored, err := e.callback.Wait(ctx)
func (e *Driver) Wait(ctx context.Context) (PlanResult, error) {
ret, err := e.callback.Wait(ctx)
if err != nil { if err != nil {
return nil, err
return PlanResult{}, err
} }


return stored, nil
return ret, nil
} }


func (e *Driver) execute() { func (e *Driver) execute() {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}


errLock := sync.Mutex{}
retLock := sync.Mutex{}
var execErr error var execErr error
stored := make(map[string][]VarValue)

for _, p := range e.planBlder.WorkerPlans { for _, p := range e.planBlder.WorkerPlans {
wg.Add(1) wg.Add(1)


@@ -71,36 +73,49 @@ func (e *Driver) execute() {


cli, err := p.Worker.NewClient() cli, err := p.Worker.NewClient()
if err != nil { if err != nil {
errLock.Lock()
retLock.Lock()
execErr = multierror.Append(execErr, fmt.Errorf("worker %v: new client: %w", p.Worker, err)) execErr = multierror.Append(execErr, fmt.Errorf("worker %v: new client: %w", p.Worker, err))
errLock.Unlock()
retLock.Unlock()
cancel() cancel()
return return
} }
defer cli.Close() defer cli.Close()


err = cli.ExecutePlan(ctx, plan)
ret, err := cli.ExecutePlan(ctx, plan)
if err != nil { if err != nil {
errLock.Lock()
retLock.Lock()
execErr = multierror.Append(execErr, fmt.Errorf("worker %v: execute plan: %w", p.Worker, err)) execErr = multierror.Append(execErr, fmt.Errorf("worker %v: execute plan: %w", p.Worker, err))
errLock.Unlock()
retLock.Unlock()
cancel() cancel()
return return
} }

retLock.Lock()
for k, v := range ret.Stored {
stored[k] = append(stored[k], v...)
}
retLock.Unlock()

}(p, e.ctx.Context, e.cancel) }(p, e.ctx.Context, e.cancel)
} }


stored, err := e.driverExec.Run(e.ctx)
ret, err := e.driverExec.Run(e.ctx)
if err != nil { if err != nil {
errLock.Lock()
retLock.Lock()
execErr = multierror.Append(execErr, fmt.Errorf("driver: execute plan: %w", err)) execErr = multierror.Append(execErr, fmt.Errorf("driver: execute plan: %w", err))
errLock.Unlock()
retLock.Unlock()
e.cancel() e.cancel()
} }


wg.Wait() wg.Wait()


e.callback.SetComplete(stored, execErr)
for k, v := range ret.Stored {
stored[k] = append(stored[k], v...)
}

e.callback.SetComplete(PlanResult{
Stored: stored,
}, execErr)
} }


type DriverWriteStream struct { type DriverWriteStream struct {
@@ -116,3 +131,25 @@ type DriverSignalVar struct {
ID VarID ID VarID
Signal SignalValue Signal SignalValue
} }

type PlanResult struct {
Stored map[string][]VarValue
}

func (r *PlanResult) Get(key string) VarValue {
v, ok := r.Stored[key]
if !ok || len(v) == 0 {
return nil
}

return v[0]
}

func (r *PlanResult) GetArray(key string) []VarValue {
v, ok := r.Stored[key]
if !ok {
return nil
}

return v
}

+ 10
- 6
common/pkgs/ioswitch/exec/executor.go View File

@@ -26,14 +26,14 @@ type Executor struct {
vars map[VarID]freeVar vars map[VarID]freeVar
bindings []*binding bindings []*binding
lock sync.Mutex lock sync.Mutex
store map[string]VarValue
store map[string][]VarValue
} }


func NewExecutor(plan Plan) *Executor { func NewExecutor(plan Plan) *Executor {
planning := Executor{ planning := Executor{
plan: plan, plan: plan,
vars: make(map[VarID]freeVar), vars: make(map[VarID]freeVar),
store: make(map[string]VarValue),
store: make(map[string][]VarValue),
} }


return &planning return &planning
@@ -43,7 +43,7 @@ func (s *Executor) Plan() *Plan {
return &s.plan return &s.plan
} }


func (s *Executor) Run(ctx *ExecContext) (map[string]VarValue, error) {
func (s *Executor) Run(ctx *ExecContext) (ExecutorResult, error) {
c, cancel := context.WithCancel(ctx.Context) c, cancel := context.WithCancel(ctx.Context)
ctx = &ExecContext{ ctx = &ExecContext{
Context: c, Context: c,
@@ -54,10 +54,10 @@ func (s *Executor) Run(ctx *ExecContext) (map[string]VarValue, error) {


err := s.runOps(s.plan.Ops, ctx, cancel) err := s.runOps(s.plan.Ops, ctx, cancel)
if err != nil { if err != nil {
return nil, err
return ExecutorResult{}, err
} }


return s.store, nil
return ExecutorResult{Stored: s.store}, nil
} }


func (s *Executor) runOps(ops []Op, ctx *ExecContext, cancel context.CancelFunc) error { func (s *Executor) runOps(ops []Op, ctx *ExecContext, cancel context.CancelFunc) error {
@@ -136,7 +136,11 @@ func (s *Executor) Store(key string, val VarValue) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()


s.store[key] = val
s.store[key] = append(s.store[key], val)
}

type ExecutorResult struct {
Stored map[string][]VarValue
} }


func BindVar[T VarValue](e *Executor, ctx context.Context, id VarID) (T, error) { func BindVar[T VarValue](e *Executor, ctx context.Context, id VarID) (T, error) {


+ 1
- 1
common/pkgs/ioswitch/exec/plan_builder.go View File

@@ -63,7 +63,7 @@ func (b *PlanBuilder) Execute(ctx *ExecContext) *Driver {
exec := Driver{ exec := Driver{
planID: planID, planID: planID,
planBlder: b, planBlder: b,
callback: future.NewSetValue[map[string]VarValue](),
callback: future.NewSetValue[PlanResult](),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
driverExec: NewExecutor(execPlan), driverExec: NewExecutor(execPlan),


+ 1
- 1
common/pkgs/ioswitch/exec/worker.go View File

@@ -94,7 +94,7 @@ type WorkerInfo interface {
} }


type WorkerClient interface { type WorkerClient interface {
ExecutePlan(ctx context.Context, plan Plan) error
ExecutePlan(ctx context.Context, plan Plan) (ExecutorResult, error)


SendStream(ctx context.Context, planID PlanID, id VarID, stream io.ReadCloser) error SendStream(ctx context.Context, planID PlanID, id VarID, stream io.ReadCloser) error
SendVar(ctx context.Context, planID PlanID, id VarID, value VarValue) error SendVar(ctx context.Context, planID PlanID, id VarID, value VarValue) error


+ 7
- 2
common/pkgs/ioswitch2/http_hub_worker.go View File

@@ -50,10 +50,15 @@ type HttpHubWorkerClient struct {
cli *hubapi.Client cli *hubapi.Client
} }


func (c *HttpHubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error {
return c.cli.ExecuteIOPlan(hubapi.ExecuteIOPlanReq{
func (c *HttpHubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) (exec.ExecutorResult, error) {
resp, err := c.cli.ExecuteIOPlan(hubapi.ExecuteIOPlanReq{
Plan: plan, Plan: plan,
}) })
if err != nil {
return exec.ExecutorResult{}, err
}

return resp.Result, nil
} }
func (c *HttpHubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { func (c *HttpHubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error {
return c.cli.SendStream(hubapi.SendStreamReq{ return c.cli.SendStream(hubapi.SendStreamReq{


+ 7
- 3
common/pkgs/ioswitch2/hub_worker.go View File

@@ -46,9 +46,13 @@ type HubWorkerClient struct {
cli *hubrpc.Client cli *hubrpc.Client
} }


func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error {
_, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan})
return err.ToError()
func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) (exec.ExecutorResult, error) {
resp, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan})
if err != nil {
return exec.ExecutorResult{}, err.ToError()
}

return resp.Result, nil
} }
func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error {
_, err := c.cli.SendIOStream(ctx, &hubrpc.SendIOStream{ _, err := c.cli.SendIOStream(ctx, &hubrpc.SendIOStream{


+ 1
- 1
common/pkgs/ioswitch2/parser/opt/misc.go View File

@@ -65,7 +65,7 @@ func StoreShardWriteResult(ctx *state.GenerateState) {
} }


storeNode := ctx.DAG.NewStore() storeNode := ctx.DAG.NewStore()
storeNode.Env().ToEnvDriver(true)
storeNode.Env().CopyFrom(n.Env())


storeNode.Store(n.ShardInfoKey, n.ShardInfoVar().Var()) storeNode.Store(n.ShardInfoKey, n.ShardInfoVar().Var())
return true return true


+ 7
- 3
common/pkgs/ioswitchlrc/hub_worker.go View File

@@ -41,9 +41,13 @@ type HubWorkerClient struct {
cli *hubrpc.Client cli *hubrpc.Client
} }


func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) error {
_, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan})
return err.ToError()
func (c *HubWorkerClient) ExecutePlan(ctx context.Context, plan exec.Plan) (exec.ExecutorResult, error) {
resp, err := c.cli.ExecuteIOPlan(ctx, &hubrpc.ExecuteIOPlan{Plan: plan})
if err != nil {
return exec.ExecutorResult{}, err.ToError()
}

return resp.Result, nil
} }
func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error { func (c *HubWorkerClient) SendStream(ctx context.Context, planID exec.PlanID, id exec.VarID, stream io.ReadCloser) error {
_, err := c.cli.SendIOStream(ctx, &hubrpc.SendIOStream{ _, err := c.cli.SendIOStream(ctx, &hubrpc.SendIOStream{


+ 3
- 1
common/pkgs/rpc/hub/ioswitch.go View File

@@ -20,7 +20,9 @@ type IOSwitchSvc interface {
type ExecuteIOPlan struct { type ExecuteIOPlan struct {
Plan exec.Plan Plan exec.Plan
} }
type ExecuteIOPlanResp struct{}
type ExecuteIOPlanResp struct {
Result exec.ExecutorResult
}


var _ = TokenAuth(Hub_ExecuteIOPlan_FullMethodName) var _ = TokenAuth(Hub_ExecuteIOPlan_FullMethodName)




+ 11
- 8
hub/sdk/api/hub_io.go View File

@@ -129,35 +129,38 @@ const ExecuteIOPlanPath = "/hubIO/executeIOPlan"
type ExecuteIOPlanReq struct { type ExecuteIOPlanReq struct {
Plan exec.Plan `json:"plan"` Plan exec.Plan `json:"plan"`
} }
type ExecuteIOPlanResp struct {
Result exec.ExecutorResult `json:"result"`
}


func (c *Client) ExecuteIOPlan(req ExecuteIOPlanReq) error {
func (c *Client) ExecuteIOPlan(req ExecuteIOPlanReq) (*ExecuteIOPlanResp, error) {
targetUrl, err := url.JoinPath(c.cfg.URL, ExecuteIOPlanPath) targetUrl, err := url.JoinPath(c.cfg.URL, ExecuteIOPlanPath)
if err != nil { if err != nil {
return err
return nil, err
} }


body, err := serder.ObjectToJSONEx(req) body, err := serder.ObjectToJSONEx(req)
if err != nil { if err != nil {
return fmt.Errorf("request to json: %w", err)
return nil, fmt.Errorf("request to json: %w", err)
} }


resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: body, Body: body,
}) })
if err != nil { if err != nil {
return err
return nil, err
} }


codeResp, err := ParseJSONResponse[response[any]](resp)
codeResp, err := ParseJSONResponse[response[ExecuteIOPlanResp]](resp)
if err != nil { if err != nil {
return err
return nil, err
} }


if codeResp.Code == errorcode.OK { if codeResp.Code == errorcode.OK {
return nil
return &codeResp.Data, nil
} }


return codeResp.ToError()
return nil, codeResp.ToError()
} }


const SendVarPath = "/hubIO/sendVar" const SendVarPath = "/hubIO/sendVar"


Loading…
Cancel
Save