You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

stopallbydeploytaskidlogic.go 3.8 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package inference
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  11. "strconv"
  12. "sync"
  13. "github.com/zeromicro/go-zero/core/logx"
  14. )
  15. type StopAllByDeployTaskIdLogic struct {
  16. logx.Logger
  17. ctx context.Context
  18. svcCtx *svc.ServiceContext
  19. }
  20. func NewStopAllByDeployTaskIdLogic(ctx context.Context, svcCtx *svc.ServiceContext) *StopAllByDeployTaskIdLogic {
  21. return &StopAllByDeployTaskIdLogic{
  22. Logger: logx.WithContext(ctx),
  23. ctx: ctx,
  24. svcCtx: svcCtx,
  25. }
  26. }
  27. func (l *StopAllByDeployTaskIdLogic) StopAllByDeployTaskId(req *types.StopAllByDeployTaskIdReq) (resp *types.StopAllByDeployTaskIdResp, err error) {
  28. resp = &types.StopAllByDeployTaskIdResp{}
  29. id, err := strconv.ParseInt(req.Id, 10, 64)
  30. list, err := l.svcCtx.Scheduler.AiStorages.GetInstanceListByDeployTaskId(id)
  31. if err != nil {
  32. return nil, err
  33. }
  34. if len(list) == 0 {
  35. return nil, errors.New("instances are empty")
  36. }
  37. err = l.svcCtx.Scheduler.AiStorages.UpdateDeployTaskById(id)
  38. if err != nil {
  39. return nil, err
  40. }
  41. err = l.stopAll(list)
  42. if err != nil {
  43. return nil, err
  44. }
  45. return resp, nil
  46. }
  47. func (l *StopAllByDeployTaskIdLogic) stopAll(list []*models.AiInferDeployInstance) error {
  48. var wg sync.WaitGroup
  49. var errCh = make(chan interface{}, len(list))
  50. var errs []interface{}
  51. buf := make(chan bool, 2)
  52. for _, instance := range list {
  53. wg.Add(1)
  54. ins := instance
  55. buf <- true
  56. go func() {
  57. in, err := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].GetInferDeployInstance(l.ctx, ins.InstanceId)
  58. if err != nil {
  59. e := struct {
  60. errTyp uint8
  61. err error
  62. instanceName string
  63. clusterName string
  64. }{
  65. errTyp: 1,
  66. err: err,
  67. instanceName: ins.InstanceName,
  68. clusterName: ins.ClusterName,
  69. }
  70. errCh <- e
  71. wg.Done()
  72. <-buf
  73. }
  74. if checkStatus(in) {
  75. success := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[strconv.FormatInt(ins.AdapterId, 10)][strconv.FormatInt(ins.ClusterId, 10)].StopInferDeployInstance(l.ctx, ins.InstanceId)
  76. if !success {
  77. e := struct {
  78. errTyp uint8
  79. err error
  80. instanceName string
  81. clusterName string
  82. }{
  83. errTyp: 2,
  84. err: err,
  85. instanceName: ins.InstanceName,
  86. clusterName: ins.ClusterName,
  87. }
  88. errCh <- e
  89. wg.Done()
  90. <-buf
  91. }
  92. }
  93. }()
  94. <-buf
  95. }
  96. for e := range errCh {
  97. errs = append(errs, e)
  98. }
  99. if len(errs) != 0 {
  100. var msg string
  101. for _, err := range errs {
  102. e := (err).(struct {
  103. errTyp uint8
  104. err error
  105. instanceName string
  106. clusterName string
  107. })
  108. switch e.errTyp {
  109. case 1:
  110. msg += fmt.Sprintf("GetInstance Failed # clusterName: %v , instanceName: %v , error: %v \n", e.clusterName, e.instanceName, e.err.Error())
  111. case 2:
  112. msg += fmt.Sprintf("StopInstance Failed # clusterName: %v , instanceName: %v , error: %v \n", e.clusterName, e.instanceName, e.err.Error())
  113. }
  114. }
  115. return errors.New(msg)
  116. }
  117. return nil
  118. }
  119. func checkStatus(in *inference.DeployInstance) bool {
  120. switch in.ClusterType {
  121. case storeLink.TYPE_OCTOPUS:
  122. switch in.Status {
  123. case "running":
  124. return true
  125. default:
  126. return false
  127. }
  128. case storeLink.TYPE_MODELARTS:
  129. switch in.Status {
  130. case "running":
  131. return true
  132. default:
  133. return false
  134. }
  135. case storeLink.TYPE_SHUGUANGAI:
  136. switch in.Status {
  137. case "Running":
  138. return true
  139. default:
  140. return false
  141. }
  142. default:
  143. return false
  144. }
  145. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.