You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

schedulecreatetasklogic.go 14 kB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
8 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. package schedule
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/pkg/errors"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  13. "gopkg.in/yaml.v3"
  14. "slices"
  15. "strings"
  16. "time"
  17. "github.com/zeromicro/go-zero/core/logx"
  18. )
  19. const (
  20. TRAINNING_TASK_REPLICA = 1
  21. TRAINNING_TASK_SUFFIX_LEN = 10
  22. QUERY_RESOURCE_RETRY = 3
  23. )
  24. type ClustersWithDataDistributes struct {
  25. Clusters []*strategy.AssignedCluster
  26. DataDistributes *types.DataDistribute
  27. }
  28. type ScheduleCreateTaskLogic struct {
  29. logx.Logger
  30. ctx context.Context
  31. svcCtx *svc.ServiceContext
  32. queryResource *QueryResourcesLogic
  33. }
  34. func NewScheduleCreateTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCreateTaskLogic {
  35. return &ScheduleCreateTaskLogic{
  36. Logger: logx.WithContext(ctx),
  37. ctx: ctx,
  38. svcCtx: svcCtx,
  39. queryResource: NewQueryResourcesLogic(ctx, svcCtx),
  40. }
  41. }
  42. func generateFilteredDataDistributes(clusters []*strategy.AssignedCluster, distribute types.DataDistribute) *ClustersWithDataDistributes {
  43. var clusterIds []string
  44. for _, c := range clusters {
  45. clusterIds = append(clusterIds, c.ClusterId)
  46. }
  47. clustersWithDataDistributes := &ClustersWithDataDistributes{
  48. Clusters: clusters,
  49. DataDistributes: &types.DataDistribute{
  50. Dataset: make([]*types.DatasetDistribute, 0),
  51. Image: make([]*types.ImageDistribute, 0),
  52. Model: make([]*types.ModelDistribute, 0),
  53. Code: make([]*types.CodeDistribute, 0),
  54. },
  55. }
  56. for _, datasetDistribute := range distribute.Dataset {
  57. dataset := &types.DatasetDistribute{}
  58. dataset.DataName = datasetDistribute.DataName
  59. dataset.PackageID = datasetDistribute.PackageID
  60. clusterScheduledList := make([]*types.ClusterScheduled, 0)
  61. if len(datasetDistribute.Clusters) != 0 {
  62. for _, cluster := range datasetDistribute.Clusters {
  63. if slices.Contains(clusterIds, cluster.ClusterID) {
  64. clusterScheduledList = append(clusterScheduledList, cluster)
  65. }
  66. }
  67. }
  68. dataset.Clusters = clusterScheduledList
  69. clustersWithDataDistributes.DataDistributes.Dataset = append(clustersWithDataDistributes.DataDistributes.Dataset, dataset)
  70. }
  71. for _, imageDistribute := range distribute.Image {
  72. image := &types.ImageDistribute{}
  73. image.DataName = imageDistribute.DataName
  74. image.PackageID = imageDistribute.PackageID
  75. clusterScheduledList := make([]*types.ClusterScheduled, 0)
  76. if len(imageDistribute.Clusters) != 0 {
  77. for _, cluster := range imageDistribute.Clusters {
  78. if slices.Contains(clusterIds, cluster.ClusterID) {
  79. clusterScheduledList = append(clusterScheduledList, cluster)
  80. }
  81. }
  82. }
  83. image.Clusters = clusterScheduledList
  84. clustersWithDataDistributes.DataDistributes.Image = append(clustersWithDataDistributes.DataDistributes.Image, image)
  85. }
  86. for _, codeDistribute := range distribute.Code {
  87. code := &types.CodeDistribute{}
  88. code.DataName = codeDistribute.DataName
  89. code.PackageID = codeDistribute.PackageID
  90. code.Output = codeDistribute.Output
  91. clusterScheduledList := make([]*types.ClusterScheduled, 0)
  92. if len(codeDistribute.Clusters) != 0 {
  93. for _, cluster := range codeDistribute.Clusters {
  94. if slices.Contains(clusterIds, cluster.ClusterID) {
  95. clusterScheduledList = append(clusterScheduledList, cluster)
  96. }
  97. }
  98. }
  99. code.Clusters = clusterScheduledList
  100. clustersWithDataDistributes.DataDistributes.Code = append(clustersWithDataDistributes.DataDistributes.Code, code)
  101. }
  102. for _, modelDistribute := range distribute.Model {
  103. model := &types.ModelDistribute{}
  104. model.DataName = modelDistribute.DataName
  105. model.PackageID = modelDistribute.PackageID
  106. clusterScheduledList := make([]*types.ClusterScheduled, 0)
  107. if len(modelDistribute.Clusters) != 0 {
  108. for _, cluster := range modelDistribute.Clusters {
  109. if slices.Contains(clusterIds, cluster.ClusterID) {
  110. clusterScheduledList = append(clusterScheduledList, cluster)
  111. }
  112. }
  113. }
  114. model.Clusters = clusterScheduledList
  115. clustersWithDataDistributes.DataDistributes.Model = append(clustersWithDataDistributes.DataDistributes.Model, model)
  116. }
  117. return clustersWithDataDistributes
  118. }
  119. func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) (resp *types.CreateTaskResp, err error) {
  120. resp = &types.CreateTaskResp{}
  121. err = validateJobResources(req.JobResources)
  122. if err != nil {
  123. return nil, err
  124. }
  125. taskName, err := l.handleDuplicateTaskName(req.Name)
  126. if err != nil {
  127. return nil, err
  128. }
  129. var clusters []string
  130. if len(req.JobResources.Clusters) == 1 {
  131. clusters = append(clusters, req.JobResources.Clusters[0].ClusterID)
  132. schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters)
  133. if err != nil {
  134. return nil, err
  135. }
  136. assignedClusters := copyParams([]*strategy.AssignedCluster{{
  137. ClusterId: req.JobResources.Clusters[0].ClusterID, Replicas: 1,
  138. }}, req.JobResources.Clusters)
  139. // filter data distribution
  140. clustersWithDataDistributes := generateFilteredDataDistributes(assignedClusters, req.DataDistributes)
  141. taskId, err := l.createTask(taskName, req.Description, req.JobResources.ScheduleStrategy, clustersWithDataDistributes, req.Token, req.UserIp)
  142. if err != nil {
  143. return nil, err
  144. }
  145. resp.ScheduleDatas = schedatas
  146. resp.TaskID = taskId
  147. return resp, nil
  148. } else {
  149. assignedClusters, err := l.getAssignedClustersByStrategy(&req.JobResources, &req.DataDistributes)
  150. if err != nil {
  151. return nil, err
  152. }
  153. if len(assignedClusters) == 0 {
  154. return nil, fmt.Errorf("failed to create task, no scheduled cluster found")
  155. }
  156. for _, c := range assignedClusters {
  157. clusters = append(clusters, c.ClusterId)
  158. }
  159. schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters)
  160. if err != nil {
  161. return nil, err
  162. }
  163. // filter data distribution
  164. clustersWithDataDistributes := generateFilteredDataDistributes(assignedClusters, req.DataDistributes)
  165. taskId, err := l.createTask(taskName, req.Description, req.JobResources.ScheduleStrategy, clustersWithDataDistributes, req.Token, req.UserIp)
  166. if err != nil {
  167. return nil, err
  168. }
  169. resp.ScheduleDatas = schedatas
  170. resp.TaskID = taskId
  171. return resp, nil
  172. }
  173. }
  174. func validateJobResources(resources types.JobResources) error {
  175. if resources.ScheduleStrategy == "" {
  176. return fmt.Errorf("must specify ScheduleStrategy")
  177. }
  178. if len(resources.Clusters) == 0 {
  179. return fmt.Errorf("must specify at least one cluster")
  180. }
  181. for _, c := range resources.Clusters {
  182. if c.ClusterID == "" {
  183. return fmt.Errorf("must specify clusterID")
  184. }
  185. if len(c.Resources) == 0 {
  186. return fmt.Errorf("cluster: %s must specify at least one compute resource", c.ClusterID)
  187. //return errors.Wrapf(xerr.NewErrCodeMsg(1234, fmt.Sprintf("cluster: %s must specify at least one compute resource", c.ClusterID)), "")
  188. }
  189. }
  190. return nil
  191. }
  192. func (l *ScheduleCreateTaskLogic) handleDuplicateTaskName(name string) (string, error) {
  193. exist, err := l.svcCtx.Scheduler.AiStorages.DoesTaskNameExist(name)
  194. if err != nil {
  195. return "", err
  196. }
  197. if exist {
  198. return name + "_" + time.Now().Format(constants.Layout_Time_Suffix), nil
  199. }
  200. return name, nil
  201. }
  202. func (l *ScheduleCreateTaskLogic) getAssignedClustersByStrategy(resources *types.JobResources, dataDistribute *types.DataDistribute) ([]*strategy.AssignedCluster, error) {
  203. var assignedClusters []*strategy.AssignedCluster
  204. switch resources.ScheduleStrategy {
  205. case strategy.LEASTLOADFIRST:
  206. var resSpecs []*collector.ResourceSpec
  207. var resCount int
  208. for i := 0; i < QUERY_RESOURCE_RETRY; i++ {
  209. defer time.Sleep(time.Second)
  210. qResources, err := l.queryResource.QueryResourcesByClusterId(nil)
  211. if err != nil {
  212. continue
  213. }
  214. for _, resource := range qResources {
  215. if resource.Resources != nil {
  216. resCount++
  217. }
  218. }
  219. if resCount >= 1 {
  220. resSpecs = qResources
  221. break
  222. } else {
  223. resCount = 0
  224. continue
  225. }
  226. }
  227. if resCount == 0 {
  228. return nil, fmt.Errorf("failed to create task, resources counting fails")
  229. }
  230. strtg := strategy.NewLeastLoadFirst(TRAINNING_TASK_REPLICA, resSpecs)
  231. clusters, err := strtg.Schedule()
  232. if err != nil {
  233. return nil, err
  234. }
  235. assignedClusters = copyParams(clusters, resources.Clusters)
  236. case strategy.DATA_LOCALITY:
  237. strtg := strategy.NewDataLocality(TRAINNING_TASK_REPLICA, dataDistribute)
  238. clusters, err := strtg.Schedule()
  239. if err != nil {
  240. return nil, err
  241. }
  242. assignedClusters = copyParams(clusters, resources.Clusters)
  243. default:
  244. return nil, errors.New("no strategy has been chosen")
  245. }
  246. return assignedClusters, nil
  247. }
  248. func copyParams(clusters []*strategy.AssignedCluster, clusterInfos []*types.JobClusterInfo) []*strategy.AssignedCluster {
  249. var result []*strategy.AssignedCluster
  250. for _, c := range clusters {
  251. for _, info := range clusterInfos {
  252. if c.ClusterId == info.ClusterID {
  253. var envs []string
  254. var params []string
  255. for k, v := range info.Runtime.Envs {
  256. val := common.ConvertTypeToString(v)
  257. if val != "" {
  258. env := k + storeLink.COMMA + val
  259. envs = append(envs, env)
  260. }
  261. }
  262. for k, v := range info.Runtime.Params {
  263. val := common.ConvertTypeToString(v)
  264. if val != "" {
  265. p := k + storeLink.COMMA + val
  266. params = append(params, p)
  267. }
  268. }
  269. cluster := &strategy.AssignedCluster{
  270. ClusterId: c.ClusterId,
  271. ClusterName: c.ClusterName,
  272. Replicas: c.Replicas,
  273. ResourcesRequired: info.Resources,
  274. Cmd: info.Runtime.Command,
  275. Envs: envs,
  276. Params: params,
  277. }
  278. result = append(result, cluster)
  279. }
  280. }
  281. }
  282. return result
  283. }
  284. func (l *ScheduleCreateTaskLogic) createTask(taskName string, desc string, strategyName string, clustersWithDataDistributes *ClustersWithDataDistributes, token string, userIp string) (int64, error) {
  285. var synergyStatus int64
  286. if len(clustersWithDataDistributes.Clusters) > 1 {
  287. synergyStatus = 1
  288. }
  289. y, err := yaml.Marshal(clustersWithDataDistributes)
  290. if err != nil {
  291. fmt.Printf("Error while Marshaling. %v", err)
  292. }
  293. taskId, err := l.svcCtx.Scheduler.CreateTask(taskName, desc, synergyStatus, strategyName, string(y), token, userIp, &l.svcCtx.Config)
  294. if err != nil {
  295. return 0, err
  296. }
  297. return taskId, nil
  298. }
  299. func (l *ScheduleCreateTaskLogic) generateScheduleResult(distribute types.DataDistribute, clusters []string) ([]*types.ScheduleData, error) {
  300. var schedatas []*types.ScheduleData
  301. for _, d := range distribute.Dataset {
  302. data := &types.ScheduleData{
  303. DataType: "dataset",
  304. PackageID: d.PackageID,
  305. ClusterIDs: make([]string, 0),
  306. }
  307. var cSlc []string
  308. for _, cluster := range d.Clusters {
  309. cSlc = append(cSlc, cluster.ClusterID)
  310. }
  311. for _, cluster := range clusters {
  312. if !slices.Contains(cSlc, cluster) {
  313. data.ClusterIDs = append(data.ClusterIDs, cluster)
  314. } else {
  315. continue
  316. }
  317. }
  318. if len(data.ClusterIDs) != 0 {
  319. schedatas = append(schedatas, data)
  320. }
  321. }
  322. for _, d := range distribute.Code {
  323. data := &types.ScheduleData{
  324. DataType: "code",
  325. PackageID: d.PackageID,
  326. ClusterIDs: make([]string, 0),
  327. }
  328. var cSlc []string
  329. for _, cluster := range d.Clusters {
  330. cSlc = append(cSlc, cluster.ClusterID)
  331. }
  332. for _, cluster := range clusters {
  333. if !slices.Contains(cSlc, cluster) {
  334. data.ClusterIDs = append(data.ClusterIDs, cluster)
  335. } else {
  336. continue
  337. }
  338. }
  339. if len(data.ClusterIDs) != 0 {
  340. schedatas = append(schedatas, data)
  341. }
  342. }
  343. for _, d := range distribute.Image {
  344. data := &types.ScheduleData{
  345. DataType: "image",
  346. PackageID: d.PackageID,
  347. ClusterIDs: make([]string, 0),
  348. }
  349. var cSlc []string
  350. for _, cluster := range d.Clusters {
  351. cSlc = append(cSlc, cluster.ClusterID)
  352. }
  353. for _, cluster := range clusters {
  354. if !slices.Contains(cSlc, cluster) {
  355. data.ClusterIDs = append(data.ClusterIDs, cluster)
  356. } else {
  357. continue
  358. }
  359. }
  360. if len(data.ClusterIDs) != 0 {
  361. schedatas = append(schedatas, data)
  362. }
  363. }
  364. for _, d := range distribute.Model {
  365. data := &types.ScheduleData{
  366. DataType: "model",
  367. PackageID: d.PackageID,
  368. ClusterIDs: make([]string, 0),
  369. }
  370. var cSlc []string
  371. for _, cluster := range d.Clusters {
  372. cSlc = append(cSlc, cluster.ClusterID)
  373. }
  374. for _, cluster := range clusters {
  375. if !slices.Contains(cSlc, cluster) {
  376. data.ClusterIDs = append(data.ClusterIDs, cluster)
  377. } else {
  378. continue
  379. }
  380. }
  381. if len(data.ClusterIDs) != 0 {
  382. schedatas = append(schedatas, data)
  383. }
  384. }
  385. if len(schedatas) != 0 {
  386. err := l.updateStorageType(&schedatas)
  387. if err != nil {
  388. return nil, err
  389. }
  390. }
  391. return schedatas, nil
  392. }
  393. func (l *ScheduleCreateTaskLogic) updateStorageType(schedatas *[]*types.ScheduleData) error {
  394. for _, s := range *schedatas {
  395. var storageType string
  396. var sTypes []string
  397. for _, id := range s.ClusterIDs {
  398. cluster, err := l.svcCtx.Scheduler.AiStorages.GetClustersById(id)
  399. if err != nil {
  400. return err
  401. }
  402. stype, ok := storeLink.StorageTypeMap[strings.Title(cluster.Name)]
  403. if ok {
  404. sTypes = append(sTypes, stype)
  405. }
  406. }
  407. sTypes = common.Unique(sTypes)
  408. for _, st := range sTypes {
  409. storageType += st + storeLink.COMMA
  410. }
  411. storageType = strings.TrimSuffix(storageType, storeLink.COMMA)
  412. s.StorageType = storageType
  413. }
  414. return nil
  415. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.