You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

scheduleruntasklogic.go 10 kB

11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
10 months ago
10 months ago
10 months ago
11 months ago
11 months ago
10 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. package schedule
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "github.com/zeromicro/go-zero/core/logx"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  14. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  15. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  16. "gopkg.in/yaml.v2"
  17. "strings"
  18. )
  19. type ScheduleRunTaskLogic struct {
  20. logx.Logger
  21. ctx context.Context
  22. svcCtx *svc.ServiceContext
  23. }
  24. func NewScheduleRunTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleRunTaskLogic {
  25. return &ScheduleRunTaskLogic{
  26. Logger: logx.WithContext(ctx),
  27. ctx: ctx,
  28. svcCtx: svcCtx,
  29. }
  30. }
  31. func (l *ScheduleRunTaskLogic) ScheduleRunTask(req *types.RunTaskReq) (resp *types.RunTaskResp, err error) {
  32. // find task
  33. task, err := l.svcCtx.Scheduler.AiStorages.GetTaskById(req.TaskID)
  34. if err != nil {
  35. return nil, err
  36. }
  37. if task == nil {
  38. return nil, errors.New("task not found ")
  39. }
  40. if task.Status != constants.Saved {
  41. switch task.Status {
  42. case constants.Cancelled:
  43. return nil, errors.New("task has been cancelled ")
  44. case constants.Failed:
  45. return nil, errors.New("task was already failed ")
  46. case constants.Running:
  47. return nil, errors.New("task is running ")
  48. case constants.Succeeded:
  49. return nil, errors.New("task is completed ")
  50. default:
  51. return nil, fmt.Errorf("task is being: %s", task.Status)
  52. }
  53. }
  54. var clustersWithDataDistributes ClustersWithDataDistributes
  55. err = yaml.Unmarshal([]byte(task.YamlString), &clustersWithDataDistributes)
  56. if err != nil {
  57. return nil, err
  58. }
  59. opt := &option.AiOption{
  60. AdapterId: ADAPTERID,
  61. TaskName: task.Name,
  62. TaskId: task.Id,
  63. StrategyName: "",
  64. }
  65. // update assignedClusters
  66. assignedClusters, err := updateClustersByScheduledDatas(task.Id, &clustersWithDataDistributes, req.ScheduledDatas)
  67. if err != nil {
  68. return nil, err
  69. }
  70. aiSchdl, err := schedulers.NewAiScheduler(l.ctx, "", l.svcCtx.Scheduler, opt)
  71. if err != nil {
  72. return nil, err
  73. }
  74. results, err := l.svcCtx.Scheduler.AssignAndSchedule(aiSchdl, executor.SUBMIT_MODE_STORAGE_SCHEDULE, assignedClusters)
  75. if err != nil {
  76. return nil, err
  77. }
  78. rs := (results).([]*schedulers.AiResult)
  79. err = l.SaveResult(task, rs, opt)
  80. if err != nil {
  81. return nil, err
  82. }
  83. return
  84. }
  85. func (l *ScheduleRunTaskLogic) SaveResult(task *models.Task, results []*schedulers.AiResult, opt *option.AiOption) error {
  86. for _, r := range results {
  87. opt.ComputeCard = strings.ToUpper(r.Card)
  88. opt.Replica = r.Replica
  89. opt.Output = r.Output
  90. adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(r.AdapterId)
  91. if err != nil {
  92. return err
  93. }
  94. clusterName, _ := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(r.ClusterId)
  95. err = l.svcCtx.Scheduler.AiStorages.SaveAiTask(task.Id, opt, adapterName, r.ClusterId, clusterName, r.JobId, constants.Saved, r.Msg)
  96. if err != nil {
  97. return err
  98. }
  99. l.svcCtx.Scheduler.AiStorages.AddNoticeInfo(r.AdapterId, adapterName, r.ClusterId, clusterName, r.TaskName, "create", "任务创建中")
  100. }
  101. return nil
  102. }
  103. func updateClustersByScheduledDatas(taskId int64, clustersWithDataDistributes *ClustersWithDataDistributes, scheduledDatas []*types.DataScheduleResults) ([]*strategy.AssignedCluster, error) {
  104. assignedClusters := make([]*strategy.AssignedCluster, 0)
  105. if len(scheduledDatas) == 0 {
  106. for _, cluster := range clustersWithDataDistributes.Clusters {
  107. assignedClusters = append(assignedClusters, cluster)
  108. }
  109. } else {
  110. // handle pass-in scheduledDatas
  111. for _, cluster := range clustersWithDataDistributes.Clusters {
  112. for _, data := range scheduledDatas {
  113. switch data.DataType {
  114. case "dataset":
  115. for _, result := range data.Results {
  116. if !result.Status {
  117. continue
  118. }
  119. for _, c := range result.Clusters {
  120. if cluster.ClusterId == c.ClusterID {
  121. if c.JsonData == "" {
  122. continue
  123. }
  124. jsonData := struct {
  125. Name string `json:"name"`
  126. Id string `json:"id"`
  127. }{}
  128. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  129. if err != nil {
  130. return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "dataset")
  131. }
  132. cluster.DatasetId = jsonData.Id
  133. }
  134. }
  135. }
  136. case "image":
  137. for _, result := range data.Results {
  138. if !result.Status {
  139. continue
  140. }
  141. for _, c := range result.Clusters {
  142. if cluster.ClusterId == c.ClusterID {
  143. if c.JsonData == "" {
  144. continue
  145. }
  146. jsonData := struct {
  147. Name string `json:"name"`
  148. Id string `json:"id"`
  149. }{}
  150. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  151. if err != nil {
  152. return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "image")
  153. }
  154. cluster.ImageId = jsonData.Id
  155. }
  156. }
  157. }
  158. case "code":
  159. for _, result := range data.Results {
  160. if !result.Status {
  161. continue
  162. }
  163. for _, c := range result.Clusters {
  164. if cluster.ClusterId == c.ClusterID {
  165. if c.JsonData == "" {
  166. continue
  167. }
  168. jsonData := struct {
  169. Name string `json:"name"`
  170. Id string `json:"id"`
  171. }{}
  172. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  173. if err != nil {
  174. return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "code")
  175. }
  176. cluster.CodeId = jsonData.Id
  177. }
  178. }
  179. }
  180. case "model":
  181. for _, result := range data.Results {
  182. if !result.Status {
  183. continue
  184. }
  185. for _, c := range result.Clusters {
  186. if cluster.ClusterId == c.ClusterID {
  187. if c.JsonData == "" {
  188. continue
  189. }
  190. jsonData := struct {
  191. Name string `json:"name"`
  192. Id string `json:"id"`
  193. }{}
  194. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  195. if err != nil {
  196. return nil, fmt.Errorf("pass-in jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "model")
  197. }
  198. cluster.ModelId = jsonData.Id
  199. }
  200. }
  201. }
  202. }
  203. }
  204. assignedClusters = append(assignedClusters, cluster)
  205. }
  206. }
  207. // handle db yaml clustersWithDataDistributes
  208. for _, cluster := range assignedClusters {
  209. if cluster.DatasetId == "" {
  210. for _, distribute := range clustersWithDataDistributes.DataDistributes.Dataset {
  211. for _, c := range distribute.Clusters {
  212. if cluster.ClusterId == c.ClusterID {
  213. if c.JsonData == "" {
  214. continue
  215. }
  216. jsonData := struct {
  217. Name string `json:"name"`
  218. Id string `json:"id"`
  219. }{}
  220. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  221. if err != nil {
  222. return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "dataset")
  223. }
  224. cluster.DatasetId = jsonData.Id
  225. }
  226. }
  227. }
  228. }
  229. if cluster.ImageId == "" {
  230. for _, distribute := range clustersWithDataDistributes.DataDistributes.Image {
  231. for _, c := range distribute.Clusters {
  232. if cluster.ClusterId == c.ClusterID {
  233. if c.JsonData == "" {
  234. continue
  235. }
  236. jsonData := struct {
  237. Name string `json:"name"`
  238. Id string `json:"id"`
  239. }{}
  240. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  241. if err != nil {
  242. return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "image")
  243. }
  244. cluster.ImageId = jsonData.Id
  245. }
  246. }
  247. }
  248. }
  249. //if cluster.CodeId == "" {
  250. for _, distribute := range clustersWithDataDistributes.DataDistributes.Code {
  251. for _, c := range distribute.Clusters {
  252. if cluster.ClusterId == c.ClusterID {
  253. cluster.Output = distribute.Output
  254. if cluster.CodeId == "" {
  255. if c.JsonData == "" {
  256. continue
  257. }
  258. jsonData := struct {
  259. Name string `json:"name"`
  260. Id string `json:"id"`
  261. }{}
  262. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  263. if err != nil {
  264. return nil, fmt.Errorf("db yaml jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "code")
  265. }
  266. cluster.CodeId = jsonData.Id
  267. }
  268. }
  269. }
  270. }
  271. if cluster.ModelId == "" {
  272. for _, distribute := range clustersWithDataDistributes.DataDistributes.Model {
  273. for _, c := range distribute.Clusters {
  274. if cluster.ClusterId == c.ClusterID {
  275. if c.JsonData == "" {
  276. continue
  277. }
  278. jsonData := struct {
  279. Name string `json:"name"`
  280. Id string `json:"id"`
  281. }{}
  282. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  283. if err != nil {
  284. return nil, fmt.Errorf("jsonData convert failed, task %d, cluster %s, datatype: %s", taskId, cluster.ClusterId, "model")
  285. }
  286. cluster.ModelId = jsonData.Id
  287. }
  288. }
  289. }
  290. }
  291. }
  292. // check empty data
  293. for _, cluster := range assignedClusters {
  294. if cluster.DatasetId == "" {
  295. return nil, fmt.Errorf("failed to run task %d, cluster %s cannot find %s", taskId, cluster.ClusterId, "DatasetId")
  296. }
  297. if cluster.ImageId == "" {
  298. return nil, fmt.Errorf("failed to run task %d, cluster %s cannot find %s", taskId, cluster.ClusterId, "ImageId")
  299. }
  300. if cluster.CodeId == "" {
  301. return nil, fmt.Errorf("failed to run task %d, cluster %s cannot find %s", taskId, cluster.ClusterId, "CodeId")
  302. }
  303. }
  304. return assignedClusters, nil
  305. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.