|
|
@@ -6,6 +6,7 @@ import ( |
|
|
"io" |
|
|
"io" |
|
|
"sync" |
|
|
"sync" |
|
|
|
|
|
|
|
|
|
|
|
"github.com/hashicorp/go-multierror" |
|
|
"gitlink.org.cn/cloudream/common/pkgs/future" |
|
|
"gitlink.org.cn/cloudream/common/pkgs/future" |
|
|
"gitlink.org.cn/cloudream/common/utils/io2" |
|
|
"gitlink.org.cn/cloudream/common/utils/io2" |
|
|
) |
|
|
) |
|
|
@@ -54,10 +55,12 @@ func (e *Driver) Wait(ctx context.Context) (map[string]VarValue, error) { |
|
|
func (e *Driver) execute() { |
|
|
func (e *Driver) execute() { |
|
|
wg := sync.WaitGroup{} |
|
|
wg := sync.WaitGroup{} |
|
|
|
|
|
|
|
|
|
|
|
errLock := sync.Mutex{} |
|
|
|
|
|
var execErr error |
|
|
for _, p := range e.planBlder.WorkerPlans { |
|
|
for _, p := range e.planBlder.WorkerPlans { |
|
|
wg.Add(1) |
|
|
wg.Add(1) |
|
|
|
|
|
|
|
|
go func(p *WorkerPlanBuilder) { |
|
|
|
|
|
|
|
|
go func(p *WorkerPlanBuilder, ctx context.Context, cancel context.CancelFunc) { |
|
|
defer wg.Done() |
|
|
defer wg.Done() |
|
|
|
|
|
|
|
|
plan := Plan{ |
|
|
plan := Plan{ |
|
|
@@ -67,33 +70,36 @@ func (e *Driver) execute() { |
|
|
|
|
|
|
|
|
cli, err := p.Worker.NewClient() |
|
|
cli, err := p.Worker.NewClient() |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
e.stopWith(fmt.Errorf("new client to worker %v: %w", p.Worker, err)) |
|
|
|
|
|
|
|
|
errLock.Lock() |
|
|
|
|
|
execErr = multierror.Append(execErr, fmt.Errorf("worker %v: new client: %w", p.Worker, err)) |
|
|
|
|
|
errLock.Unlock() |
|
|
|
|
|
cancel() |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
defer cli.Close() |
|
|
defer cli.Close() |
|
|
|
|
|
|
|
|
err = cli.ExecutePlan(e.ctx.Context, plan) |
|
|
|
|
|
|
|
|
err = cli.ExecutePlan(ctx, plan) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
e.stopWith(fmt.Errorf("execute plan at worker %v: %w", p.Worker, err)) |
|
|
|
|
|
|
|
|
errLock.Lock() |
|
|
|
|
|
execErr = multierror.Append(execErr, fmt.Errorf("worker %v: execute plan: %w", p.Worker, err)) |
|
|
|
|
|
errLock.Unlock() |
|
|
|
|
|
cancel() |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
}(p) |
|
|
|
|
|
|
|
|
}(p, e.ctx.Context, e.cancel) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
stored, err := e.driverExec.Run(e.ctx) |
|
|
stored, err := e.driverExec.Run(e.ctx) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
e.stopWith(fmt.Errorf("run executor switch: %w", err)) |
|
|
|
|
|
return |
|
|
|
|
|
|
|
|
errLock.Lock() |
|
|
|
|
|
execErr = multierror.Append(execErr, fmt.Errorf("driver: execute plan: %w", err)) |
|
|
|
|
|
errLock.Unlock() |
|
|
|
|
|
e.cancel() |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
wg.Wait() |
|
|
wg.Wait() |
|
|
|
|
|
|
|
|
e.callback.SetValue(stored) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (e *Driver) stopWith(err error) { |
|
|
|
|
|
e.callback.SetError(err) |
|
|
|
|
|
e.cancel() |
|
|
|
|
|
|
|
|
e.callback.SetComplete(stored, execErr) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
type DriverWriteStream struct { |
|
|
type DriverWriteStream struct { |
|
|
|