You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

scheduleruntasklogic.go 9.6 kB

11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
10 months ago
11 months ago
11 months ago
10 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. package schedule
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "strings"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  14. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  15. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  16. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  17. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  18. "gopkg.in/yaml.v2"
  19. )
  20. type ScheduleRunTaskLogic struct {
  21. logx.Logger
  22. ctx context.Context
  23. svcCtx *svc.ServiceContext
  24. }
  25. func NewScheduleRunTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleRunTaskLogic {
  26. return &ScheduleRunTaskLogic{
  27. Logger: logx.WithContext(ctx),
  28. ctx: ctx,
  29. svcCtx: svcCtx,
  30. }
  31. }
  32. func (l *ScheduleRunTaskLogic) ScheduleRunTask(req *types.RunTaskReq) (resp *types.RunTaskResp, err error) {
  33. // find task
  34. task, err := l.svcCtx.Scheduler.AiStorages.GetTaskById(req.TaskID)
  35. if err != nil {
  36. return nil, err
  37. }
  38. if task == nil {
  39. return nil, errors.New("task not found ")
  40. }
  41. if task.Status != constants.Saved {
  42. switch task.Status {
  43. case constants.Cancelled:
  44. return nil, errors.New("task has been cancelled ")
  45. case constants.Failed:
  46. return nil, errors.New("task was already failed ")
  47. case constants.Running:
  48. return nil, errors.New("task is running ")
  49. case constants.Succeeded:
  50. return nil, errors.New("task is completed ")
  51. default:
  52. return nil, fmt.Errorf("task is being: %s", task.Status)
  53. }
  54. }
  55. var clustersWithDataDistributes ClustersWithDataDistributes
  56. err = yaml.Unmarshal([]byte(task.YamlString), &clustersWithDataDistributes)
  57. if err != nil {
  58. return nil, err
  59. }
  60. opt := &option.AiOption{
  61. AdapterId: ADAPTERID,
  62. TaskName: task.Name,
  63. TaskId: task.Id,
  64. StrategyName: "",
  65. ResourcesRequired: clustersWithDataDistributes.Clusters[0].ResourcesRequired,
  66. }
  67. // update assignedClusters
  68. assignedClusters, err := updateClustersByScheduledDatas(task.Id, &clustersWithDataDistributes, req.ScheduledDatas)
  69. if err != nil {
  70. return nil, err
  71. }
  72. aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt)
  73. if err != nil {
  74. return nil, err
  75. }
  76. results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, executor.SUBMIT_MODE_STORAGE_SCHEDULE, assignedClusters)
  77. if err != nil {
  78. return nil, err
  79. }
  80. rs := (results).([]*schedulers.AiResult)
  81. err = l.SaveResult(task, rs, opt)
  82. if err != nil {
  83. return nil, err
  84. }
  85. return
  86. }
  87. func (l *ScheduleRunTaskLogic) SaveResult(task *models.Task, results []*schedulers.AiResult, opt *option.AiOption) error {
  88. for _, r := range results {
  89. opt.ComputeCard = strings.ToUpper(r.Card)
  90. opt.Replica = r.Replica
  91. opt.Output = r.Output
  92. adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(r.AdapterId)
  93. if err != nil {
  94. return err
  95. }
  96. clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(r.ClusterId)
  97. err = l.svcCtx.Scheduler.AiStorages.SaveAiTask(task.Id, opt, adapterName, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg)
  98. if err != nil {
  99. return err
  100. }
  101. l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(r.AdapterId, adapterName, r.ClusterId, clusterName, r.TaskName, "create", "任务创建中")
  102. }
  103. return nil
  104. }
  105. func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *ClustersWithDataDistributes, scheduledDatas []*types.DataScheduleResults) ([]*strategy.AssignedCluster, error) {
  106. assignedClusters := make([]*strategy.AssignedCluster, 0)
  107. if len(scheduledDatas) == 0 {
  108. for _, cluster := range clustersWithDataDistributes.Clusters {
  109. assignedClusters = append(assignedClusters, cluster)
  110. }
  111. } else {
  112. // handle pass-in scheduledDatas
  113. for _, cluster := range clustersWithDataDistributes.Clusters {
  114. for _, data := range scheduledDatas {
  115. switch data.DataType {
  116. case "dataset":
  117. for _, result := range data.Results {
  118. if !result.Status {
  119. continue
  120. }
  121. for _, c := range result.Clusters {
  122. if cluster.ClusterId == c.ClusterID {
  123. if c.JsonData == "" {
  124. continue
  125. }
  126. jsonData := entity.JsonData{}
  127. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  128. if err != nil {
  129. return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "dataset")
  130. }
  131. cluster.DatasetId = jsonData.Id
  132. }
  133. }
  134. }
  135. case "image":
  136. for _, result := range data.Results {
  137. if !result.Status {
  138. continue
  139. }
  140. for _, c := range result.Clusters {
  141. if cluster.ClusterId == c.ClusterID {
  142. if c.JsonData == "" {
  143. continue
  144. }
  145. jsonData := entity.JsonData{}
  146. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  147. if err != nil {
  148. return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "image")
  149. }
  150. cluster.ImageId = jsonData.Id
  151. }
  152. }
  153. }
  154. case "code":
  155. for _, result := range data.Results {
  156. if !result.Status {
  157. continue
  158. }
  159. for _, c := range result.Clusters {
  160. if cluster.ClusterId == c.ClusterID {
  161. if c.JsonData == "" {
  162. continue
  163. }
  164. jsonData := entity.JsonData{}
  165. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  166. if err != nil {
  167. return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "code")
  168. }
  169. cluster.CodeId = jsonData.Id
  170. }
  171. }
  172. }
  173. case "model":
  174. for _, result := range data.Results {
  175. if !result.Status {
  176. continue
  177. }
  178. for _, c := range result.Clusters {
  179. if cluster.ClusterId == c.ClusterID {
  180. if c.JsonData == "" {
  181. continue
  182. }
  183. jsonData := entity.JsonData{}
  184. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  185. if err != nil {
  186. return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "model")
  187. }
  188. cluster.ModelId = jsonData.Id
  189. }
  190. }
  191. }
  192. }
  193. }
  194. assignedClusters = append(assignedClusters, cluster)
  195. }
  196. }
  197. // handle db yaml clustersWithDataDistributes
  198. for _, cluster := range assignedClusters {
  199. if cluster.DatasetId == "" {
  200. for _, distribute := range clustersWithDataDistributes.DataDistributes.Dataset {
  201. for _, c := range distribute.Clusters {
  202. if cluster.ClusterId == c.ClusterID {
  203. if c.JsonData == "" {
  204. continue
  205. }
  206. jsonData := entity.JsonData{}
  207. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  208. if err != nil {
  209. return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "dataset")
  210. }
  211. cluster.DatasetId = jsonData.Id
  212. }
  213. }
  214. }
  215. }
  216. if cluster.ImageId == "" {
  217. for _, distribute := range clustersWithDataDistributes.DataDistributes.Image {
  218. for _, c := range distribute.Clusters {
  219. if cluster.ClusterId == c.ClusterID {
  220. if c.JsonData == "" {
  221. continue
  222. }
  223. jsonData := entity.JsonData{}
  224. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  225. if err != nil {
  226. return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "image")
  227. }
  228. cluster.ImageId = jsonData.Id
  229. }
  230. }
  231. }
  232. }
  233. //if cluster.CodeId == "" {
  234. for _, distribute := range clustersWithDataDistributes.DataDistributes.Code {
  235. for _, c := range distribute.Clusters {
  236. if cluster.ClusterId == c.ClusterID {
  237. cluster.Output = distribute.Output
  238. if cluster.CodeId == "" {
  239. if c.JsonData == "" {
  240. continue
  241. }
  242. jsonData := entity.JsonData{}
  243. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  244. if err != nil {
  245. return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "code")
  246. }
  247. cluster.CodeId = jsonData.Id
  248. }
  249. }
  250. }
  251. }
  252. if cluster.ModelId == "" {
  253. for _, distribute := range clustersWithDataDistributes.DataDistributes.Model {
  254. for _, c := range distribute.Clusters {
  255. if cluster.ClusterId == c.ClusterID {
  256. if c.JsonData == "" {
  257. continue
  258. }
  259. jsonData := entity.JsonData{}
  260. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  261. if err != nil {
  262. return nil, fmt.Errorf("jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "model")
  263. }
  264. cluster.ModelId = jsonData.Id
  265. }
  266. }
  267. }
  268. }
  269. }
  270. // check empty data
  271. for _, cluster := range assignedClusters {
  272. if cluster.DatasetId == "" {
  273. return nil, fmt.Errorf("failed to run task %d, cluster %s cannot find %s", taskId, cluster.ClusterId, "DatasetId")
  274. }
  275. if cluster.ImageId == "" {
  276. return nil, fmt.Errorf("failed to run task %d, cluster %s cannot find %s", taskId, cluster.ClusterId, "ImageId")
  277. }
  278. if cluster.CodeId == "" {
  279. return nil, fmt.Errorf("failed to run task %d, cluster %s cannot find %s", taskId, cluster.ClusterId, "CodeId")
  280. }
  281. }
  282. return assignedClusters, nil
  283. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.