You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

schedulecreatetasklogic.go 6.5 kB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. package schedule
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  14. "slices"
  15. "strings"
  16. "github.com/zeromicro/go-zero/core/logx"
  17. )
  18. const (
  19. TRAINNING_TASK_REPLICA = 1
  20. )
  21. type ScheduleCreateTaskLogic struct {
  22. logx.Logger
  23. ctx context.Context
  24. svcCtx *svc.ServiceContext
  25. queryResource *QueryResourcesLogic
  26. }
  27. func NewScheduleCreateTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCreateTaskLogic {
  28. return &ScheduleCreateTaskLogic{
  29. Logger: logx.WithContext(ctx),
  30. ctx: ctx,
  31. svcCtx: svcCtx,
  32. queryResource: NewQueryResourcesLogic(ctx, svcCtx),
  33. }
  34. }
  35. func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) (resp *types.CreateTaskResp, err error) {
  36. resp = &types.CreateTaskResp{}
  37. if req.JobResources.ScheduleStrategy == "" {
  38. return nil, fmt.Errorf("must specify ScheduleStrategy")
  39. }
  40. if len(req.JobResources.Clusters) == 0 {
  41. return nil, fmt.Errorf("must specify at least one cluster")
  42. }
  43. var clusters []string
  44. if len(req.JobResources.Clusters) == 1 {
  45. clusters = append(clusters, req.JobResources.Clusters[0].ClusterID)
  46. schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters)
  47. if err != nil {
  48. return nil, err
  49. }
  50. taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(10), req.JobResources.ScheduleStrategy, req.JobResources.Clusters)
  51. if err != nil {
  52. return nil, err
  53. }
  54. resp.ScheduleDatas = schedatas
  55. resp.TaskID = taskId
  56. return resp, nil
  57. } else {
  58. clusterInfos, err := getClusterInfosByStrategy(&req.JobResources)
  59. if err != nil {
  60. return nil, err
  61. }
  62. for _, info := range clusterInfos {
  63. clusters = append(clusters, info.ClusterID)
  64. }
  65. schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters)
  66. if err != nil {
  67. return nil, err
  68. }
  69. taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(10), req.JobResources.ScheduleStrategy, clusterInfos)
  70. if err != nil {
  71. return nil, err
  72. }
  73. resp.ScheduleDatas = schedatas
  74. resp.TaskID = taskId
  75. return resp, nil
  76. }
  77. }
  78. func getClusterInfosByStrategy(resources *types.JobResources) ([]*types.JobClusterInfo, error) {
  79. var clusterInfos []*types.JobClusterInfo
  80. switch resources.ScheduleStrategy {
  81. case strategy.LEASTLOADFIRST:
  82. _ = strategy.NewLeastLoadFirst(TRAINNING_TASK_REPLICA, nil)
  83. }
  84. return clusterInfos, nil
  85. }
  86. func (l *ScheduleCreateTaskLogic) createTask(taskName string, strategyName string, jobClusterInfo []*types.JobClusterInfo) (int64, error) {
  87. var synergyStatus int64
  88. if len(jobClusterInfo) > 1 {
  89. synergyStatus = 1
  90. }
  91. strategyCode, err := l.svcCtx.Scheduler.AiStorages.GetStrategyCode(strategyName)
  92. if err != nil {
  93. return 0, err
  94. }
  95. taskId, err := l.svcCtx.Scheduler.AiStorages.SaveTask(taskName, strategyCode, synergyStatus, "10")
  96. if err != nil {
  97. return 0, err
  98. }
  99. adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(ADAPTERID)
  100. if err != nil {
  101. return 0, err
  102. }
  103. for _, i := range jobClusterInfo {
  104. clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(i.ClusterID)
  105. opt := &option.AiOption{}
  106. err := l.svcCtx.Scheduler.AiStorages.SaveAiTask(taskId, opt, adapterName, i.ClusterID, clusterName, "", constants.Saved, "")
  107. if err != nil {
  108. return 0, errors.New("database add failed: " + err.Error())
  109. }
  110. }
  111. return taskId, nil
  112. }
  113. func (l *ScheduleCreateTaskLogic) generateScheduleResult(distribute types.DataDistribute, clusters []string) ([]*types.ScheduleData, error) {
  114. var schedatas []*types.ScheduleData
  115. for _, d := range distribute.Dataset {
  116. data := &types.ScheduleData{
  117. DataType: "dataset",
  118. PackageID: d.PackageID,
  119. ClusterIDs: make([]string, 0),
  120. }
  121. for _, cluster := range clusters {
  122. if !slices.Contains(d.Clusters, cluster) {
  123. data.ClusterIDs = append(data.ClusterIDs, cluster)
  124. } else {
  125. continue
  126. }
  127. }
  128. if len(data.ClusterIDs) != 0 {
  129. schedatas = append(schedatas, data)
  130. }
  131. }
  132. for _, d := range distribute.Code {
  133. data := &types.ScheduleData{
  134. DataType: "code",
  135. PackageID: d.PackageID,
  136. ClusterIDs: make([]string, 0),
  137. }
  138. for _, cluster := range clusters {
  139. if !slices.Contains(d.Clusters, cluster) {
  140. data.ClusterIDs = append(data.ClusterIDs, cluster)
  141. } else {
  142. continue
  143. }
  144. }
  145. if len(data.ClusterIDs) != 0 {
  146. schedatas = append(schedatas, data)
  147. }
  148. }
  149. for _, d := range distribute.Image {
  150. data := &types.ScheduleData{
  151. DataType: "image",
  152. PackageID: d.PackageID,
  153. ClusterIDs: make([]string, 0),
  154. }
  155. for _, cluster := range clusters {
  156. if !slices.Contains(d.Clusters, cluster) {
  157. data.ClusterIDs = append(data.ClusterIDs, cluster)
  158. } else {
  159. continue
  160. }
  161. }
  162. if len(data.ClusterIDs) != 0 {
  163. schedatas = append(schedatas, data)
  164. }
  165. }
  166. for _, d := range distribute.Model {
  167. data := &types.ScheduleData{
  168. DataType: "model",
  169. PackageID: d.PackageID,
  170. ClusterIDs: make([]string, 0),
  171. }
  172. for _, cluster := range clusters {
  173. if !slices.Contains(d.Clusters, cluster) {
  174. data.ClusterIDs = append(data.ClusterIDs, cluster)
  175. } else {
  176. continue
  177. }
  178. }
  179. if len(data.ClusterIDs) != 0 {
  180. schedatas = append(schedatas, data)
  181. }
  182. }
  183. if len(schedatas) != 0 {
  184. err := l.updateStorageType(&schedatas)
  185. if err != nil {
  186. return nil, err
  187. }
  188. }
  189. return schedatas, nil
  190. }
  191. func (l *ScheduleCreateTaskLogic) updateStorageType(schedatas *[]*types.ScheduleData) error {
  192. for _, s := range *schedatas {
  193. var storageType string
  194. var sTypes []string
  195. for _, id := range s.ClusterIDs {
  196. cluster, err := l.svcCtx.Scheduler.AiStorages.GetClustersById(id)
  197. if err != nil {
  198. return err
  199. }
  200. stype, ok := storeLink.StorageTypeMap[strings.Title(cluster.Name)]
  201. if ok {
  202. sTypes = append(sTypes, stype)
  203. }
  204. }
  205. sTypes = common.Unique(sTypes)
  206. for _, st := range sTypes {
  207. storageType += st + storeLink.COMMA
  208. }
  209. storageType = strings.TrimSuffix(storageType, storeLink.COMMA)
  210. s.StorageType = storageType
  211. }
  212. return nil
  213. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.