You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

scheduleruntasklogic.go 9.5 kB

11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
11 months ago
10 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. package schedule
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  14. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  15. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  16. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  17. "gopkg.in/yaml.v2"
  18. "strings"
  19. )
  20. type ScheduleRunTaskLogic struct {
  21. logx.Logger
  22. ctx context.Context
  23. svcCtx *svc.ServiceContext
  24. }
  25. func NewScheduleRunTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleRunTaskLogic {
  26. return &ScheduleRunTaskLogic{
  27. Logger: logx.WithContext(ctx),
  28. ctx: ctx,
  29. svcCtx: svcCtx,
  30. }
  31. }
  32. func (l *ScheduleRunTaskLogic) ScheduleRunTask(req *types.RunTaskReq) (resp *types.RunTaskResp, err error) {
  33. // find task
  34. task, err := l.svcCtx.Scheduler.AiStorages.GetTaskById(req.TaskID)
  35. if err != nil {
  36. return nil, err
  37. }
  38. if task == nil {
  39. return nil, errors.New("task not found ")
  40. }
  41. if task.Status != constants.Saved {
  42. switch task.Status {
  43. case constants.Cancelled:
  44. return nil, errors.New("task has been cancelled ")
  45. case constants.Failed:
  46. return nil, errors.New("task was already failed ")
  47. case constants.Running:
  48. return nil, errors.New("task is running ")
  49. case constants.Succeeded:
  50. return nil, errors.New("task is completed ")
  51. default:
  52. return nil, fmt.Errorf("task is being: %s", task.Status)
  53. }
  54. }
  55. var clustersWithDataDistributes ClustersWithDataDistributes
  56. err = yaml.Unmarshal([]byte(task.YamlString), &clustersWithDataDistributes)
  57. if err != nil {
  58. return nil, err
  59. }
  60. opt := &option.AiOption{
  61. AdapterId: ADAPTERID,
  62. TaskName: task.Name,
  63. TaskId: task.Id,
  64. StrategyName: "",
  65. }
  66. // update assignedClusters
  67. assignedClusters, err := updateClustersByScheduledDatas(task.Id, &clustersWithDataDistributes, req.ScheduledDatas)
  68. if err != nil {
  69. return nil, err
  70. }
  71. aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt)
  72. if err != nil {
  73. return nil, err
  74. }
  75. results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, executor.SUBMIT_MODE_STORAGE_SCHEDULE, assignedClusters)
  76. if err != nil {
  77. return nil, err
  78. }
  79. rs := (results).([]*schedulers.AiResult)
  80. err = l.SaveResult(task, rs, opt)
  81. if err != nil {
  82. return nil, err
  83. }
  84. return
  85. }
  86. func (l *ScheduleRunTaskLogic) SaveResult(task *models.Task, results []*schedulers.AiResult, opt *option.AiOption) error {
  87. for _, r := range results {
  88. opt.ComputeCard = strings.ToUpper(r.Card)
  89. opt.Replica = r.Replica
  90. opt.Output = r.Output
  91. adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(r.AdapterId)
  92. if err != nil {
  93. return err
  94. }
  95. clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(r.ClusterId)
  96. err = l.svcCtx.Scheduler.AiStorages.SaveAiTask(task.Id, opt, adapterName, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg)
  97. if err != nil {
  98. return err
  99. }
  100. l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(r.AdapterId, adapterName, r.ClusterId, clusterName, r.TaskName, "create", "任务创建中")
  101. }
  102. return nil
  103. }
  104. func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *ClustersWithDataDistributes, scheduledDatas []*types.DataScheduleResults) ([]*strategy.AssignedCluster, error) {
  105. assignedClusters := make([]*strategy.AssignedCluster, 0)
  106. if len(scheduledDatas) == 0 {
  107. for _, cluster := range clustersWithDataDistributes.Clusters {
  108. assignedClusters = append(assignedClusters, cluster)
  109. }
  110. } else {
  111. // handle pass-in scheduledDatas
  112. for _, cluster := range clustersWithDataDistributes.Clusters {
  113. for _, data := range scheduledDatas {
  114. switch data.DataType {
  115. case "dataset":
  116. for _, result := range data.Results {
  117. if !result.Status {
  118. continue
  119. }
  120. for _, c := range result.Clusters {
  121. if cluster.ClusterId == c.ClusterID {
  122. if c.JsonData == "" {
  123. continue
  124. }
  125. jsonData := entity.JsonData{}
  126. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  127. if err != nil {
  128. return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "dataset")
  129. }
  130. cluster.DatasetId = jsonData.Id
  131. }
  132. }
  133. }
  134. case "image":
  135. for _, result := range data.Results {
  136. if !result.Status {
  137. continue
  138. }
  139. for _, c := range result.Clusters {
  140. if cluster.ClusterId == c.ClusterID {
  141. if c.JsonData == "" {
  142. continue
  143. }
  144. jsonData := entity.JsonData{}
  145. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  146. if err != nil {
  147. return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "image")
  148. }
  149. cluster.ImageId = jsonData.Id
  150. }
  151. }
  152. }
  153. case "code":
  154. for _, result := range data.Results {
  155. if !result.Status {
  156. continue
  157. }
  158. for _, c := range result.Clusters {
  159. if cluster.ClusterId == c.ClusterID {
  160. if c.JsonData == "" {
  161. continue
  162. }
  163. jsonData := entity.JsonData{}
  164. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  165. if err != nil {
  166. return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "code")
  167. }
  168. cluster.CodeId = jsonData.Id
  169. }
  170. }
  171. }
  172. case "model":
  173. for _, result := range data.Results {
  174. if !result.Status {
  175. continue
  176. }
  177. for _, c := range result.Clusters {
  178. if cluster.ClusterId == c.ClusterID {
  179. if c.JsonData == "" {
  180. continue
  181. }
  182. jsonData := entity.JsonData{}
  183. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  184. if err != nil {
  185. return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "model")
  186. }
  187. cluster.ModelId = jsonData.Id
  188. }
  189. }
  190. }
  191. }
  192. }
  193. assignedClusters = append(assignedClusters, cluster)
  194. }
  195. }
  196. // handle db yaml clustersWithDataDistributes
  197. for _, cluster := range assignedClusters {
  198. if cluster.DatasetId == "" {
  199. for _, distribute := range clustersWithDataDistributes.DataDistributes.Dataset {
  200. for _, c := range distribute.Clusters {
  201. if cluster.ClusterId == c.ClusterID {
  202. if c.JsonData == "" {
  203. continue
  204. }
  205. jsonData := entity.JsonData{}
  206. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  207. if err != nil {
  208. return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "dataset")
  209. }
  210. cluster.DatasetId = jsonData.Id
  211. }
  212. }
  213. }
  214. }
  215. if cluster.ImageId == "" {
  216. for _, distribute := range clustersWithDataDistributes.DataDistributes.Image {
  217. for _, c := range distribute.Clusters {
  218. if cluster.ClusterId == c.ClusterID {
  219. if c.JsonData == "" {
  220. continue
  221. }
  222. jsonData := entity.JsonData{}
  223. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  224. if err != nil {
  225. return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "image")
  226. }
  227. cluster.ImageId = jsonData.Id
  228. }
  229. }
  230. }
  231. }
  232. //if cluster.CodeId == "" {
  233. for _, distribute := range clustersWithDataDistributes.DataDistributes.Code {
  234. for _, c := range distribute.Clusters {
  235. if cluster.ClusterId == c.ClusterID {
  236. cluster.Output = distribute.Output
  237. if cluster.CodeId == "" {
  238. if c.JsonData == "" {
  239. continue
  240. }
  241. jsonData := entity.JsonData{}
  242. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  243. if err != nil {
  244. return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "code")
  245. }
  246. cluster.CodeId = jsonData.Id
  247. }
  248. }
  249. }
  250. }
  251. if cluster.ModelId == "" {
  252. for _, distribute := range clustersWithDataDistributes.DataDistributes.Model {
  253. for _, c := range distribute.Clusters {
  254. if cluster.ClusterId == c.ClusterID {
  255. if c.JsonData == "" {
  256. continue
  257. }
  258. jsonData := entity.JsonData{}
  259. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  260. if err != nil {
  261. return nil, fmt.Errorf("jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "model")
  262. }
  263. cluster.ModelId = jsonData.Id
  264. }
  265. }
  266. }
  267. }
  268. }
  269. // check empty data
  270. for _, cluster := range assignedClusters {
  271. if cluster.DatasetId == "" {
  272. return nil, fmt.Errorf("failed to run task %d, cluster %s cannot find %s", taskId, cluster.ClusterId, "DatasetId")
  273. }
  274. if cluster.ImageId == "" {
  275. return nil, fmt.Errorf("failed to run task %d, cluster %s cannot find %s", taskId, cluster.ClusterId, "ImageId")
  276. }
  277. if cluster.CodeId == "" {
  278. return nil, fmt.Errorf("failed to run task %d, cluster %s cannot find %s", taskId, cluster.ClusterId, "CodeId")
  279. }
  280. }
  281. return assignedClusters, nil
  282. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.