package inference import ( "context" "errors" "fmt" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/status" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "strconv" "sync" "github.com/zeromicro/go-zero/core/logx" ) type StopAllByDeployTaskIdLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewStopAllByDeployTaskIdLogic(ctx context.Context, svcCtx *svc.ServiceContext) *StopAllByDeployTaskIdLogic { return &StopAllByDeployTaskIdLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, } } func (l *StopAllByDeployTaskIdLogic) StopAllByDeployTaskId(req *types.StopAllByDeployTaskIdReq) (resp *types.StopAllByDeployTaskIdResp, err error) { resp = &types.StopAllByDeployTaskIdResp{} id, err := strconv.ParseInt(req.Id, 10, 64) list, err := l.svcCtx.Scheduler.AiStorages.GetInstanceListByDeployTaskId(id) if err != nil { return nil, err } if len(list) == 0 { return nil, errors.New("instances are empty") } err = l.svcCtx.Scheduler.AiStorages.UpdateDeployTaskById(id) if err != nil { return nil, err } err = l.stopAll(list) if err != nil { return nil, err } return resp, nil } func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstance) error { var wg sync.WaitGroup var errCh = make(chan interface{}, len(list)) var errs []interface{} buf := make(chan bool, 2) for _, instance := range list { wg.Add(1) ins := instance buf <- true go func() { in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId) if err != nil { e := struct { errTyp uint8 err error instanceName string clusterName string }{ errTyp: 1, err: err, instanceName: ins.InstanceName, clusterName: ins.ClusterName, } errCh <- e wg.Done() <-buf return } if status.CheckRunningStatus(in) { success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StopInferDeployInstance(l.ctx, ins.InstanceId) if !success { e := struct { errTyp uint8 err error instanceName string clusterName string }{ errTyp: 2, err: err, instanceName: ins.InstanceName, clusterName: ins.ClusterName, } errCh <- e wg.Done() <-buf return } } wg.Done() <-buf }() } wg.Wait() close(errCh) for e := range errCh { errs = append(errs, e) } if len(errs) != 0 { var msg string for _, err := range errs { e := (err).(struct { errTyp uint8 err error instanceName string clusterName string }) switch e.errTyp { case 1: msg += fmt.Sprintf("GetInstance Failed # clusterName: %v , instanceName: %v , error: %v \n", e.clusterName, e.instanceName, e.err.Error()) case 2: msg += fmt.Sprintf("StopInstance Failed # clusterName: %v , instanceName: %v , error: %v \n", e.clusterName, e.instanceName, e.err.Error()) } } return errors.New(msg) } return nil }