diff --git a/pkgs/ioswitch/exec/driver.go b/pkgs/ioswitch/exec/driver.go index 34f0695..5c89ade 100644 --- a/pkgs/ioswitch/exec/driver.go +++ b/pkgs/ioswitch/exec/driver.go @@ -6,6 +6,7 @@ import ( "io" "sync" + "github.com/hashicorp/go-multierror" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/utils/io2" ) @@ -54,10 +55,12 @@ func (e *Driver) Wait(ctx context.Context) (map[string]VarValue, error) { func (e *Driver) execute() { wg := sync.WaitGroup{} + errLock := sync.Mutex{} + var execErr error for _, p := range e.planBlder.WorkerPlans { wg.Add(1) - go func(p *WorkerPlanBuilder) { + go func(p *WorkerPlanBuilder, ctx context.Context, cancel context.CancelFunc) { defer wg.Done() plan := Plan{ @@ -67,33 +70,36 @@ func (e *Driver) execute() { cli, err := p.Worker.NewClient() if err != nil { - e.stopWith(fmt.Errorf("new client to worker %v: %w", p.Worker, err)) + errLock.Lock() + execErr = multierror.Append(execErr, fmt.Errorf("worker %v: new client: %w", p.Worker, err)) + errLock.Unlock() + cancel() return } defer cli.Close() - err = cli.ExecutePlan(e.ctx.Context, plan) + err = cli.ExecutePlan(ctx, plan) if err != nil { - e.stopWith(fmt.Errorf("execute plan at worker %v: %w", p.Worker, err)) + errLock.Lock() + execErr = multierror.Append(execErr, fmt.Errorf("worker %v: execute plan: %w", p.Worker, err)) + errLock.Unlock() + cancel() return } - }(p) + }(p, e.ctx.Context, e.cancel) } stored, err := e.driverExec.Run(e.ctx) if err != nil { - e.stopWith(fmt.Errorf("run executor switch: %w", err)) - return + errLock.Lock() + execErr = multierror.Append(execErr, fmt.Errorf("driver: execute plan: %w", err)) + errLock.Unlock() + e.cancel() } wg.Wait() - e.callback.SetValue(stored) -} - -func (e *Driver) stopWith(err error) { - e.callback.SetError(err) - e.cancel() + e.callback.SetComplete(stored, execErr) } type DriverWriteStream struct { diff --git a/pkgs/ioswitch/exec/executor.go b/pkgs/ioswitch/exec/executor.go index 7c54141..4f0b3ca 100644 --- a/pkgs/ioswitch/exec/executor.go +++ b/pkgs/ioswitch/exec/executor.go @@ -44,7 +44,10 @@ func (s *Executor) Plan() *Plan { func (s *Executor) Run(ctx *ExecContext) (map[string]VarValue, error) { c, cancel := context.WithCancel(ctx.Context) - ctx.Context = c + ctx = &ExecContext{ + Context: c, + Values: ctx.Values, + } defer cancel()