You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

stopallbydeploytaskidlogic.go 3.9 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package inference
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  11. "strconv"
  12. "sync"
  13. "github.com/zeromicro/go-zero/core/logx"
  14. )
  15. type StopAllByDeployTaskIdLogic struct {
  16. logx.Logger
  17. ctx context.Context
  18. svcCtx *svc.ServiceContext
  19. }
  20. func NewStopAllByDeployTaskIdLogic(ctx context.Context, svcCtx *svc.ServiceContext) *StopAllByDeployTaskIdLogic {
  21. return &StopAllByDeployTaskIdLogic{
  22. Logger: logx.WithContext(ctx),
  23. ctx: ctx,
  24. svcCtx: svcCtx,
  25. }
  26. }
  27. func (l *StopAllByDeployTaskIdLogic) StopAllByDeployTaskId(req *types.StopAllByDeployTaskIdReq) (resp *types.StopAllByDeployTaskIdResp, err error) {
  28. resp = &types.StopAllByDeployTaskIdResp{}
  29. id, err := strconv.ParseInt(req.Id, 10, 64)
  30. list, err := l.svcCtx.Scheduler.AiStorages.GetInstanceListByDeployTaskId(id)
  31. if err != nil {
  32. return nil, err
  33. }
  34. if len(list) == 0 {
  35. return nil, errors.New("instances are empty")
  36. }
  37. err = l.svcCtx.Scheduler.AiStorages.UpdateDeployTaskById(id)
  38. if err != nil {
  39. return nil, err
  40. }
  41. err = l.stopAll(list)
  42. if err != nil {
  43. return nil, err
  44. }
  45. return resp, nil
  46. }
  47. func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstance) error {
  48. var wg sync.WaitGroup
  49. var errCh = make(chan interface{}, len(list))
  50. var errs []interface{}
  51. buf := make(chan bool, 2)
  52. for _, instance := range list {
  53. wg.Add(1)
  54. ins := instance
  55. buf <- true
  56. go func() {
  57. in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId)
  58. if err != nil {
  59. e := struct {
  60. errTyp uint8
  61. err error
  62. instanceName string
  63. clusterName string
  64. }{
  65. errTyp: 1,
  66. err: err,
  67. instanceName: ins.InstanceName,
  68. clusterName: ins.ClusterName,
  69. }
  70. errCh <- e
  71. wg.Done()
  72. <-buf
  73. return
  74. }
  75. if checkStatus(in) {
  76. success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StopInferDeployInstance(l.ctx, ins.InstanceId)
  77. if !success {
  78. e := struct {
  79. errTyp uint8
  80. err error
  81. instanceName string
  82. clusterName string
  83. }{
  84. errTyp: 2,
  85. err: err,
  86. instanceName: ins.InstanceName,
  87. clusterName: ins.ClusterName,
  88. }
  89. errCh <- e
  90. wg.Done()
  91. <-buf
  92. return
  93. }
  94. }
  95. wg.Done()
  96. <-buf
  97. }()
  98. }
  99. wg.Wait()
  100. close(errCh)
  101. for e := range errCh {
  102. errs = append(errs, e)
  103. }
  104. if len(errs) != 0 {
  105. var msg string
  106. for _, err := range errs {
  107. e := (err).(struct {
  108. errTyp uint8
  109. err error
  110. instanceName string
  111. clusterName string
  112. })
  113. switch e.errTyp {
  114. case 1:
  115. msg += fmt.Sprintf("GetInstance Failed # clusterName: %v , instanceName: %v , error: %v \n", e.clusterName, e.instanceName, e.err.Error())
  116. case 2:
  117. msg += fmt.Sprintf("StopInstance Failed # clusterName: %v , instanceName: %v , error: %v \n", e.clusterName, e.instanceName, e.err.Error())
  118. }
  119. }
  120. return errors.New(msg)
  121. }
  122. return nil
  123. }
  124. func checkStatus(in *inference.DeployInstance) bool {
  125. switch in.ClusterType {
  126. case storeLink.TYPE_OCTOPUS:
  127. switch in.Status {
  128. case "running":
  129. return true
  130. default:
  131. return false
  132. }
  133. case storeLink.TYPE_MODELARTS:
  134. switch in.Status {
  135. case "running":
  136. return true
  137. default:
  138. return false
  139. }
  140. case storeLink.TYPE_SHUGUANGAI:
  141. switch in.Status {
  142. case "Running":
  143. return true
  144. default:
  145. return false
  146. }
  147. default:
  148. return false
  149. }
  150. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.