You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

schedulecreatetasklogic.go 5.7 kB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. package schedule
  2. import (
  3. "context"
  4. "fmt"
  5. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  11. "slices"
  12. "strconv"
  13. "strings"
  14. "github.com/zeromicro/go-zero/core/logx"
  15. )
  16. type ScheduleCreateTaskLogic struct {
  17. logx.Logger
  18. ctx context.Context
  19. svcCtx *svc.ServiceContext
  20. queryResource *QueryResourcesLogic
  21. }
  22. func NewScheduleCreateTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCreateTaskLogic {
  23. return &ScheduleCreateTaskLogic{
  24. Logger: logx.WithContext(ctx),
  25. ctx: ctx,
  26. svcCtx: svcCtx,
  27. queryResource: NewQueryResourcesLogic(ctx, svcCtx),
  28. }
  29. }
  30. func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) (resp *types.CreateTaskResp, err error) {
  31. resp = &types.CreateTaskResp{}
  32. if len(req.Resources) == 0 && len(req.Clusters) == 0 && len(req.StaticWeight) == 0 {
  33. return nil, fmt.Errorf("must specify at least one resource")
  34. }
  35. if len(req.Clusters) != 0 {
  36. schedatas, err := l.generateScheduleResult(req.DataDistributes, req.Clusters)
  37. if err != nil {
  38. return nil, err
  39. }
  40. taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(10), "staticWeight")
  41. if err != nil {
  42. return nil, err
  43. }
  44. resp.ScheduleDatas = schedatas
  45. resp.TaskID = taskId
  46. return resp, nil
  47. }
  48. if len(req.StaticWeight) != 0 {
  49. strgy := strategy.NewStaticWeightStrategy(req.StaticWeight, 1)
  50. sche, err := strgy.Schedule()
  51. if err != nil {
  52. return nil, err
  53. }
  54. var clusters []string
  55. for _, c := range sche {
  56. if c.Replicas == 0 {
  57. continue
  58. }
  59. clusters = append(clusters, c.ClusterId)
  60. }
  61. schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters)
  62. if err != nil {
  63. return nil, err
  64. }
  65. taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(10), "staticWeight")
  66. if err != nil {
  67. return nil, err
  68. }
  69. resp.ScheduleDatas = schedatas
  70. resp.TaskID = taskId
  71. return resp, nil
  72. }
  73. if len(req.Resources) != 0 {
  74. _, err := l.queryResource.QueryResources(&types.QueryResourcesReq{AdapterId: req.AdapterId})
  75. if err != nil {
  76. return nil, err
  77. }
  78. return
  79. }
  80. return
  81. }
  82. func (l *ScheduleCreateTaskLogic) generateScheduleResult(distribute types.DataDistribute, clusters []string) ([]*types.ScheduleData, error) {
  83. var schedatas []*types.ScheduleData
  84. for _, d := range distribute.Dataset {
  85. data := &types.ScheduleData{
  86. DataType: "dataset",
  87. PackageID: d.PackageID,
  88. ClusterIDs: make([]int64, 0),
  89. }
  90. for _, cluster := range clusters {
  91. id, _ := strconv.ParseInt(cluster, 10, 64)
  92. if !slices.Contains(d.Clusters, id) {
  93. data.ClusterIDs = append(data.ClusterIDs, id)
  94. } else {
  95. continue
  96. }
  97. }
  98. if len(data.ClusterIDs) != 0 {
  99. schedatas = append(schedatas, data)
  100. }
  101. }
  102. for _, d := range distribute.Code {
  103. data := &types.ScheduleData{
  104. DataType: "code",
  105. PackageID: d.PackageID,
  106. ClusterIDs: make([]int64, 0),
  107. }
  108. for _, cluster := range clusters {
  109. id, _ := strconv.ParseInt(cluster, 10, 64)
  110. if !slices.Contains(d.Clusters, id) {
  111. data.ClusterIDs = append(data.ClusterIDs, id)
  112. } else {
  113. continue
  114. }
  115. }
  116. if len(data.ClusterIDs) != 0 {
  117. schedatas = append(schedatas, data)
  118. }
  119. }
  120. for _, d := range distribute.Image {
  121. data := &types.ScheduleData{
  122. DataType: "image",
  123. PackageID: d.PackageID,
  124. ClusterIDs: make([]int64, 0),
  125. }
  126. for _, cluster := range clusters {
  127. id, _ := strconv.ParseInt(cluster, 10, 64)
  128. if !slices.Contains(d.Clusters, id) {
  129. data.ClusterIDs = append(data.ClusterIDs, id)
  130. } else {
  131. continue
  132. }
  133. }
  134. if len(data.ClusterIDs) != 0 {
  135. schedatas = append(schedatas, data)
  136. }
  137. }
  138. for _, d := range distribute.Model {
  139. data := &types.ScheduleData{
  140. DataType: "model",
  141. PackageID: d.PackageID,
  142. ClusterIDs: make([]int64, 0),
  143. }
  144. for _, cluster := range clusters {
  145. id, _ := strconv.ParseInt(cluster, 10, 64)
  146. if !slices.Contains(d.Clusters, id) {
  147. data.ClusterIDs = append(data.ClusterIDs, id)
  148. } else {
  149. continue
  150. }
  151. }
  152. if len(data.ClusterIDs) != 0 {
  153. schedatas = append(schedatas, data)
  154. }
  155. }
  156. if len(schedatas) != 0 {
  157. err := l.updateStorageType(&schedatas)
  158. if err != nil {
  159. return nil, err
  160. }
  161. }
  162. return schedatas, nil
  163. }
  164. func (l *ScheduleCreateTaskLogic) createTask(taskName string, strategyName string) (int64, error) {
  165. strategyCode, err := l.svcCtx.Scheduler.AiStorages.GetStrategyCode(strategyName)
  166. if err != nil {
  167. return 0, err
  168. }
  169. //adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(rs[0].AdapterId)
  170. //if err != nil {
  171. // return nil, err
  172. //}
  173. id, err := l.svcCtx.Scheduler.AiStorages.SaveTask(taskName, strategyCode, 0, "10")
  174. if err != nil {
  175. return 0, err
  176. }
  177. return id, nil
  178. }
  179. func (l *ScheduleCreateTaskLogic) updateStorageType(schedatas *[]*types.ScheduleData) error {
  180. for _, s := range *schedatas {
  181. var storageType string
  182. var sTypes []string
  183. for _, i := range s.ClusterIDs {
  184. id := strconv.FormatInt(i, 10)
  185. cluster, err := l.svcCtx.Scheduler.AiStorages.GetClustersById(id)
  186. if err != nil {
  187. return err
  188. }
  189. stype, ok := storeLink.StorageTypeMap[strings.Title(cluster.Name)]
  190. if ok {
  191. sTypes = append(sTypes, stype)
  192. }
  193. }
  194. sTypes = common.Unique(sTypes)
  195. for _, st := range sTypes {
  196. storageType += st + storeLink.COMMA
  197. }
  198. storageType = strings.TrimSuffix(storageType, storeLink.COMMA)
  199. s.StorageType = storageType
  200. }
  201. return nil
  202. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.