diff --git a/pkgs/ioswitch/exec/executor.go b/pkgs/ioswitch/exec/executor.go index 7f06cad..c2c17a3 100644 --- a/pkgs/ioswitch/exec/executor.go +++ b/pkgs/ioswitch/exec/executor.go @@ -5,6 +5,7 @@ import ( "fmt" "sync" + "github.com/hashicorp/go-multierror" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/utils/lo2" "gitlink.org.cn/cloudream/common/utils/sync2" @@ -48,19 +49,7 @@ func (s *Executor) Run(ctx *ExecContext) (map[string]VarValue, error) { defer cancel() - err := sync2.ParallelDo(s.plan.Ops, func(o Op, idx int) error { - err := o.Execute(ctx, s) - - s.lock.Lock() - defer s.lock.Unlock() - - if err != nil { - cancel() - return fmt.Errorf("%T: %w", o, err) - } - - return nil - }) + err := s.runOps(s.plan.Ops, ctx, cancel) if err != nil { return nil, err } @@ -68,6 +57,37 @@ func (s *Executor) Run(ctx *ExecContext) (map[string]VarValue, error) { return s.store, nil } +func (s *Executor) runOps(ops []Op, ctx *ExecContext, cancel context.CancelFunc) error { + lock := sync.Mutex{} + var err error + + var wg sync.WaitGroup + wg.Add(len(ops)) + for i, arg := range ops { + go func(arg Op, index int) { + defer wg.Done() + + if e := arg.Execute(ctx, s); e != nil { + lock.Lock() + // 尽量不记录 ErrContextCanceled 错误,除非没有其他错误 + if err == nil { + err = e + } else if err == sync2.ErrContextCanceled { + err = e + } else if e != sync2.ErrContextCanceled { + err = multierror.Append(err, e) + } + lock.Unlock() + + cancel() + } + }(arg, i) + } + wg.Wait() + + return err +} + func (s *Executor) BindVar(ctx context.Context, id VarID) (VarValue, error) { s.lock.Lock()