You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

schedulecreatetasklogic.go 12 kB

3 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
8 months ago
10 months ago
3 months ago
11 months ago
6 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
3 months ago
11 months ago
6 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
3 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package schedule
  2. import (
  3. "context"
  4. "fmt"
  5. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity"
  6. "slices"
  7. "strings"
  8. "time"
  9. "github.com/pkg/errors"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/task"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  14. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  15. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  16. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  17. "gopkg.in/yaml.v3"
  18. "github.com/zeromicro/go-zero/core/logx"
  19. )
  20. const (
  21. TRAINNING_TASK_REPLICA = 1
  22. TRAINNING_TASK_SUFFIX_LEN = 10
  23. QUERY_RESOURCE_RETRY = 3
  24. )
  25. type ScheduleCreateTaskLogic struct {
  26. logx.Logger
  27. ctx context.Context
  28. svcCtx *svc.ServiceContext
  29. queryResource *QueryResourcesLogic
  30. }
  31. func NewScheduleCreateTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCreateTaskLogic {
  32. return &ScheduleCreateTaskLogic{
  33. Logger: logx.WithContext(ctx),
  34. ctx: ctx,
  35. svcCtx: svcCtx,
  36. queryResource: NewQueryResourcesLogic(ctx, svcCtx),
  37. }
  38. }
  39. func generateFilteredDataDistributes(clusters []*strategy.AssignedCluster, distribute types.DataDistribute) *entity.ClustersWithDataDistributes {
  40. var clusterIds []string
  41. for _, c := range clusters {
  42. clusterIds = append(clusterIds, c.ClusterId)
  43. }
  44. clustersWithDataDistributes := &entity.ClustersWithDataDistributes{
  45. Clusters: clusters,
  46. DataDistributes: &types.DataDistribute{
  47. Dataset: make([]*types.DatasetDistribute, 0),
  48. Image: make([]*types.ImageDistribute, 0),
  49. Model: make([]*types.ModelDistribute, 0),
  50. Code: make([]*types.CodeDistribute, 0),
  51. },
  52. }
  53. for _, datasetDistribute := range distribute.Dataset {
  54. dataset := &types.DatasetDistribute{}
  55. dataset.DataName = datasetDistribute.DataName
  56. dataset.PackageID = datasetDistribute.PackageID
  57. clusterScheduledList := make([]*types.ClusterScheduled, 0)
  58. if len(datasetDistribute.Clusters) != 0 {
  59. for _, cluster := range datasetDistribute.Clusters {
  60. if slices.Contains(clusterIds, cluster.ClusterID) {
  61. clusterScheduledList = append(clusterScheduledList, cluster)
  62. }
  63. }
  64. }
  65. dataset.Clusters = clusterScheduledList
  66. clustersWithDataDistributes.DataDistributes.Dataset = append(clustersWithDataDistributes.DataDistributes.Dataset, dataset)
  67. }
  68. for _, imageDistribute := range distribute.Image {
  69. image := &types.ImageDistribute{}
  70. image.DataName = imageDistribute.DataName
  71. image.PackageID = imageDistribute.PackageID
  72. clusterScheduledList := make([]*types.ClusterScheduled, 0)
  73. if len(imageDistribute.Clusters) != 0 {
  74. for _, cluster := range imageDistribute.Clusters {
  75. if slices.Contains(clusterIds, cluster.ClusterID) {
  76. clusterScheduledList = append(clusterScheduledList, cluster)
  77. }
  78. }
  79. }
  80. image.Clusters = clusterScheduledList
  81. clustersWithDataDistributes.DataDistributes.Image = append(clustersWithDataDistributes.DataDistributes.Image, image)
  82. }
  83. for _, codeDistribute := range distribute.Code {
  84. code := &types.CodeDistribute{}
  85. code.DataName = codeDistribute.DataName
  86. code.PackageID = codeDistribute.PackageID
  87. code.Output = codeDistribute.Output
  88. clusterScheduledList := make([]*types.ClusterScheduled, 0)
  89. if len(codeDistribute.Clusters) != 0 {
  90. for _, cluster := range codeDistribute.Clusters {
  91. if slices.Contains(clusterIds, cluster.ClusterID) {
  92. clusterScheduledList = append(clusterScheduledList, cluster)
  93. }
  94. }
  95. }
  96. code.Clusters = clusterScheduledList
  97. clustersWithDataDistributes.DataDistributes.Code = append(clustersWithDataDistributes.DataDistributes.Code, code)
  98. }
  99. for _, modelDistribute := range distribute.Model {
  100. model := &types.ModelDistribute{}
  101. model.DataName = modelDistribute.DataName
  102. model.PackageID = modelDistribute.PackageID
  103. clusterScheduledList := make([]*types.ClusterScheduled, 0)
  104. if len(modelDistribute.Clusters) != 0 {
  105. for _, cluster := range modelDistribute.Clusters {
  106. if slices.Contains(clusterIds, cluster.ClusterID) {
  107. clusterScheduledList = append(clusterScheduledList, cluster)
  108. }
  109. }
  110. }
  111. model.Clusters = clusterScheduledList
  112. clustersWithDataDistributes.DataDistributes.Model = append(clustersWithDataDistributes.DataDistributes.Model, model)
  113. }
  114. return clustersWithDataDistributes
  115. }
  116. func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) (resp *types.CreateTaskResp, err error) {
  117. resp = &types.CreateTaskResp{}
  118. err = task.ValidateJobResources(req.JobResources, "training")
  119. if err != nil {
  120. return nil, err
  121. }
  122. taskName, err := l.svcCtx.Scheduler.AiService.HandleDuplicateTaskName(req.Name, "training")
  123. if err != nil {
  124. return nil, err
  125. }
  126. var clusters []string
  127. if len(req.JobResources.Clusters) == 1 {
  128. clusters = append(clusters, req.JobResources.Clusters[0].ClusterID)
  129. schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters)
  130. if err != nil {
  131. return nil, err
  132. }
  133. assignedClusters := task.CopyParams([]*strategy.AssignedCluster{{
  134. ClusterId: req.JobResources.Clusters[0].ClusterID, Replicas: 1,
  135. }}, req.JobResources.Clusters, "")
  136. // filter data distribution
  137. clustersWithDataDistributes := generateFilteredDataDistributes(assignedClusters, req.DataDistributes)
  138. taskId, err := l.createTask(taskName, req.Description, req.UserId, req.JobResources.ScheduleStrategy, clustersWithDataDistributes, req.Token, req.UserIp, req.UserName)
  139. if err != nil {
  140. return nil, err
  141. }
  142. resp.ScheduleDatas = schedatas
  143. resp.TaskID = taskId
  144. resp.TaskName = taskName
  145. return resp, nil
  146. } else {
  147. assignedClusters, err := l.getAssignedClustersByStrategy(&req.JobResources, &req.DataDistributes)
  148. if err != nil {
  149. return nil, err
  150. }
  151. if len(assignedClusters) == 0 {
  152. return nil, fmt.Errorf("failed to create task, no scheduled cluster found")
  153. }
  154. for _, c := range assignedClusters {
  155. clusters = append(clusters, c.ClusterId)
  156. }
  157. schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters)
  158. if err != nil {
  159. return nil, err
  160. }
  161. // filter data distribution
  162. clustersWithDataDistributes := generateFilteredDataDistributes(assignedClusters, req.DataDistributes)
  163. taskId, err := l.createTask(taskName, req.Description, req.UserId, req.JobResources.ScheduleStrategy, clustersWithDataDistributes, req.Token, req.UserIp, req.UserName)
  164. if err != nil {
  165. return nil, err
  166. }
  167. resp.ScheduleDatas = schedatas
  168. resp.TaskID = taskId
  169. resp.TaskName = taskName
  170. return resp, nil
  171. }
  172. }
  173. func (l *ScheduleCreateTaskLogic) getAssignedClustersByStrategy(resources *types.JobResources, dataDistribute *types.DataDistribute) ([]*strategy.AssignedCluster, error) {
  174. var assignedClusters []*strategy.AssignedCluster
  175. switch resources.ScheduleStrategy {
  176. case strategy.LEASTLOADFIRST:
  177. var resSpecs []*collector.ResourceSpec
  178. var resCount int
  179. for i := 0; i < QUERY_RESOURCE_RETRY; i++ {
  180. defer time.Sleep(time.Second)
  181. qResources, err := l.queryResource.QueryResourcesByClusterId(nil, "Train")
  182. if err != nil {
  183. continue
  184. }
  185. for _, resource := range qResources {
  186. if resource.Resources != nil {
  187. resCount++
  188. }
  189. }
  190. if resCount >= 1 {
  191. resSpecs = qResources
  192. break
  193. } else {
  194. resCount = 0
  195. continue
  196. }
  197. }
  198. if resCount == 0 {
  199. return nil, fmt.Errorf("failed to create task, resources counting fails")
  200. }
  201. strtg := strategy.NewLeastLoadFirst(TRAINNING_TASK_REPLICA, resSpecs)
  202. clusters, err := strtg.Schedule()
  203. if err != nil {
  204. return nil, err
  205. }
  206. assignedClusters = task.CopyParams(clusters, resources.Clusters, "")
  207. case strategy.DATA_LOCALITY:
  208. strtg := strategy.NewDataLocality(TRAINNING_TASK_REPLICA, dataDistribute)
  209. clusters, err := strtg.Schedule()
  210. if err != nil {
  211. return nil, err
  212. }
  213. assignedClusters = task.CopyParams(clusters, resources.Clusters, "")
  214. default:
  215. return nil, errors.New("no strategy has been chosen")
  216. }
  217. return assignedClusters, nil
  218. }
  219. func (l *ScheduleCreateTaskLogic) createTask(taskName string, desc string, userId int64, strategyName string, clustersWithDataDistributes *entity.ClustersWithDataDistributes, token string, userIp string, userName string) (int64, error) {
  220. var synergyStatus int64
  221. if len(clustersWithDataDistributes.Clusters) > 1 {
  222. synergyStatus = 1
  223. }
  224. y, err := yaml.Marshal(clustersWithDataDistributes)
  225. if err != nil {
  226. fmt.Printf("Error while Marshaling. %v", err)
  227. }
  228. taskId, err := l.svcCtx.Scheduler.CreateTask(taskName, desc, userId, synergyStatus, strategyName, string(y), token, userIp, &l.svcCtx.Config, userName)
  229. if err != nil {
  230. return 0, err
  231. }
  232. return taskId, nil
  233. }
  234. func (l *ScheduleCreateTaskLogic) generateScheduleResult(distribute types.DataDistribute, clusters []string) ([]*types.ScheduleData, error) {
  235. var schedatas []*types.ScheduleData
  236. for _, d := range distribute.Dataset {
  237. data := &types.ScheduleData{
  238. DataType: "dataset",
  239. PackageID: d.PackageID,
  240. ClusterIDs: make([]string, 0),
  241. }
  242. var cSlc []string
  243. for _, cluster := range d.Clusters {
  244. cSlc = append(cSlc, cluster.ClusterID)
  245. }
  246. for _, cluster := range clusters {
  247. if !slices.Contains(cSlc, cluster) {
  248. data.ClusterIDs = append(data.ClusterIDs, cluster)
  249. } else {
  250. continue
  251. }
  252. }
  253. if len(data.ClusterIDs) != 0 {
  254. schedatas = append(schedatas, data)
  255. }
  256. }
  257. for _, d := range distribute.Code {
  258. data := &types.ScheduleData{
  259. DataType: "code",
  260. PackageID: d.PackageID,
  261. ClusterIDs: make([]string, 0),
  262. }
  263. var cSlc []string
  264. for _, cluster := range d.Clusters {
  265. cSlc = append(cSlc, cluster.ClusterID)
  266. }
  267. for _, cluster := range clusters {
  268. if !slices.Contains(cSlc, cluster) {
  269. data.ClusterIDs = append(data.ClusterIDs, cluster)
  270. } else {
  271. continue
  272. }
  273. }
  274. if len(data.ClusterIDs) != 0 {
  275. schedatas = append(schedatas, data)
  276. }
  277. }
  278. for _, d := range distribute.Image {
  279. data := &types.ScheduleData{
  280. DataType: "image",
  281. PackageID: d.PackageID,
  282. ClusterIDs: make([]string, 0),
  283. }
  284. var cSlc []string
  285. for _, cluster := range d.Clusters {
  286. cSlc = append(cSlc, cluster.ClusterID)
  287. }
  288. for _, cluster := range clusters {
  289. if !slices.Contains(cSlc, cluster) {
  290. data.ClusterIDs = append(data.ClusterIDs, cluster)
  291. } else {
  292. continue
  293. }
  294. }
  295. if len(data.ClusterIDs) != 0 {
  296. schedatas = append(schedatas, data)
  297. }
  298. }
  299. for _, d := range distribute.Model {
  300. data := &types.ScheduleData{
  301. DataType: "model",
  302. PackageID: d.PackageID,
  303. ClusterIDs: make([]string, 0),
  304. }
  305. var cSlc []string
  306. for _, cluster := range d.Clusters {
  307. cSlc = append(cSlc, cluster.ClusterID)
  308. }
  309. for _, cluster := range clusters {
  310. if !slices.Contains(cSlc, cluster) {
  311. data.ClusterIDs = append(data.ClusterIDs, cluster)
  312. } else {
  313. continue
  314. }
  315. }
  316. if len(data.ClusterIDs) != 0 {
  317. schedatas = append(schedatas, data)
  318. }
  319. }
  320. if len(schedatas) != 0 {
  321. err := l.updateStorageType(&schedatas)
  322. if err != nil {
  323. return nil, err
  324. }
  325. }
  326. return schedatas, nil
  327. }
  328. func (l *ScheduleCreateTaskLogic) updateStorageType(schedatas *[]*types.ScheduleData) error {
  329. for _, s := range *schedatas {
  330. var storageType string
  331. var sTypes []string
  332. for _, id := range s.ClusterIDs {
  333. cluster, err := l.svcCtx.Scheduler.AiStorages.GetClustersById(id)
  334. if err != nil {
  335. return err
  336. }
  337. stype, ok := storeLink.StorageTypeMap[strings.Title(cluster.Name)]
  338. if ok {
  339. sTypes = append(sTypes, stype)
  340. }
  341. }
  342. sTypes = common.Unique(sTypes)
  343. for _, st := range sTypes {
  344. storageType += st + storeLink.COMMA
  345. }
  346. storageType = strings.TrimSuffix(storageType, storeLink.COMMA)
  347. s.StorageType = storageType
  348. }
  349. return nil
  350. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.