|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- package plans
-
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "sync"
- "sync/atomic"
-
- "gitlink.org.cn/cloudream/common/pkgs/future"
- "gitlink.org.cn/cloudream/common/utils/io2"
- stgglb "gitlink.org.cn/cloudream/storage/common/globals"
- "gitlink.org.cn/cloudream/storage/common/pkgs/ioswitch"
- agtmq "gitlink.org.cn/cloudream/storage/common/pkgs/mq/agent"
- )
-
- type ExecutorResult struct {
- ResultValues map[string]any
- }
-
- type Executor struct {
- plan ComposedPlan
- callback *future.SetValueFuture[ExecutorResult]
- mqClis []*agtmq.Client
- planTaskIDs []string
- }
-
- func Execute(plan ComposedPlan) (*Executor, error) {
- executor := Executor{
- plan: plan,
- callback: future.NewSetValue[ExecutorResult](),
- }
-
- var err error
- for _, a := range plan.AgentPlans {
- var cli *agtmq.Client
- cli, err = stgglb.AgentMQPool.Acquire(a.Node.NodeID)
- if err != nil {
- executor.Close()
- return nil, fmt.Errorf("new mq client for %d: %w", a.Node.NodeID, err)
- }
-
- executor.mqClis = append(executor.mqClis, cli)
- }
-
- for i, a := range plan.AgentPlans {
- cli := executor.mqClis[i]
-
- _, err := cli.SetupIOPlan(agtmq.NewSetupIOPlan(a.Plan))
- if err != nil {
- for i -= 1; i >= 0; i-- {
- executor.mqClis[i].CancelIOPlan(agtmq.NewCancelIOPlan(plan.ID))
- }
- executor.Close()
- return nil, fmt.Errorf("setup plan at %d: %w", a.Node.NodeID, err)
- }
- }
-
- for i, a := range plan.AgentPlans {
- cli := executor.mqClis[i]
-
- resp, err := cli.StartIOPlan(agtmq.NewStartIOPlan(a.Plan.ID))
- if err != nil {
- executor.cancelAll()
- executor.Close()
- return nil, fmt.Errorf("setup plan at %d: %w", a.Node.NodeID, err)
- }
-
- executor.planTaskIDs = append(executor.planTaskIDs, resp.TaskID)
- }
-
- go executor.pollResult()
-
- return &executor, nil
- }
-
- func (e *Executor) SendStream(info *FromExecutorStream, stream io.Reader) error {
- // TODO 考虑不使用stgglb的Local
- nodeIP := info.toNode.ExternalIP
- grpcPort := info.toNode.ExternalGRPCPort
- if info.toNode.LocationID == stgglb.Local.LocationID {
- nodeIP = info.toNode.LocalIP
- grpcPort = info.toNode.LocalGRPCPort
- }
-
- agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
- if err != nil {
- return fmt.Errorf("new agent rpc client: %w", err)
- }
- defer stgglb.AgentRPCPool.Release(agtCli)
-
- return agtCli.SendStream(e.plan.ID, info.info.ID, stream)
- }
-
- func (e *Executor) ReadStream(info *ToExecutorStream) (io.ReadCloser, error) {
- // TODO 考虑不使用stgglb的Local
- nodeIP := info.fromNode.ExternalIP
- grpcPort := info.fromNode.ExternalGRPCPort
- if info.fromNode.LocationID == stgglb.Local.LocationID {
- nodeIP = info.fromNode.LocalIP
- grpcPort = info.fromNode.LocalGRPCPort
- }
-
- agtCli, err := stgglb.AgentRPCPool.Acquire(nodeIP, grpcPort)
- if err != nil {
- return nil, fmt.Errorf("new agent rpc client: %w", err)
- }
-
- str, err := agtCli.FetchStream(e.plan.ID, info.info.ID)
- if err != nil {
- return nil, err
- }
-
- return io2.AfterReadClosed(str, func(closer io.ReadCloser) {
- stgglb.AgentRPCPool.Release(agtCli)
- }), nil
- }
-
- func (e *Executor) Wait() (ExecutorResult, error) {
- return e.callback.WaitValue(context.TODO())
- }
-
- func (e *Executor) cancelAll() {
- for _, cli := range e.mqClis {
- cli.CancelIOPlan(agtmq.NewCancelIOPlan(e.plan.ID))
- }
- }
-
- func (e *Executor) Close() {
- for _, c := range e.mqClis {
- stgglb.AgentMQPool.Release(c)
- }
- }
-
- func (e *Executor) pollResult() {
- wg := sync.WaitGroup{}
- var anyErr error
- var done atomic.Bool
- rets := make([]*ioswitch.PlanResult, len(e.plan.AgentPlans))
-
- for i, id := range e.planTaskIDs {
- idx := i
- taskID := id
-
- wg.Add(1)
- go func() {
- defer wg.Done()
-
- for {
- resp, err := e.mqClis[idx].WaitIOPlan(agtmq.NewWaitIOPlan(taskID, 5000))
- if err != nil {
- anyErr = err
- break
- }
-
- if resp.IsComplete {
- if resp.Error != "" {
- anyErr = errors.New(resp.Error)
- done.Store(true)
- } else {
- rets[idx] = &resp.Result
- }
- break
- }
-
- if done.Load() {
- break
- }
- }
- }()
- }
-
- wg.Wait()
-
- if anyErr != nil {
- e.callback.SetError(anyErr)
- return
- }
-
- reducedRet := ExecutorResult{
- ResultValues: make(map[string]any),
- }
- for _, ret := range rets {
- for k, v := range ret.Values {
- reducedRet.ResultValues[k] = v
- }
- }
-
- e.callback.SetValue(reducedRet)
- }
|