You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

schedulecreatetasklogic.go 8.5 kB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. package schedule
  2. import (
  3. "context"
  4. "fmt"
  5. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  12. "gopkg.in/yaml.v3"
  13. "slices"
  14. "strings"
  15. "time"
  16. "github.com/zeromicro/go-zero/core/logx"
  17. )
  18. const (
  19. TRAINNING_TASK_REPLICA = 1
  20. TRAINNING_TASK_SUFFIX_LEN = 10
  21. QUERY_RESOURCE_RETRY = 3
  22. )
  23. type ScheduleCreateTaskLogic struct {
  24. logx.Logger
  25. ctx context.Context
  26. svcCtx *svc.ServiceContext
  27. queryResource *QueryResourcesLogic
  28. }
  29. func NewScheduleCreateTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCreateTaskLogic {
  30. return &ScheduleCreateTaskLogic{
  31. Logger: logx.WithContext(ctx),
  32. ctx: ctx,
  33. svcCtx: svcCtx,
  34. queryResource: NewQueryResourcesLogic(ctx, svcCtx),
  35. }
  36. }
  37. func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) (resp *types.CreateTaskResp, err error) {
  38. resp = &types.CreateTaskResp{}
  39. if req.JobResources.ScheduleStrategy == "" {
  40. return nil, fmt.Errorf("must specify ScheduleStrategy")
  41. }
  42. if len(req.JobResources.Clusters) == 0 {
  43. return nil, fmt.Errorf("must specify at least one cluster")
  44. }
  45. var clusters []string
  46. if len(req.JobResources.Clusters) == 1 {
  47. clusters = append(clusters, req.JobResources.Clusters[0].ClusterID)
  48. schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters)
  49. if err != nil {
  50. return nil, err
  51. }
  52. assignedClusters := copyParams([]*strategy.AssignedCluster{{
  53. ClusterId: req.JobResources.Clusters[0].ClusterID,
  54. }}, req.JobResources.Clusters)
  55. taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, assignedClusters)
  56. if err != nil {
  57. return nil, err
  58. }
  59. resp.ScheduleDatas = schedatas
  60. resp.TaskID = taskId
  61. return resp, nil
  62. } else {
  63. assignedClusters, err := l.getAssignedClustersByStrategy(&req.JobResources)
  64. if err != nil {
  65. return nil, err
  66. }
  67. if len(assignedClusters) == 0 {
  68. return nil, fmt.Errorf("failed to create task, no scheduled cluster found")
  69. }
  70. for _, c := range assignedClusters {
  71. clusters = append(clusters, c.ClusterId)
  72. }
  73. schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters)
  74. if err != nil {
  75. return nil, err
  76. }
  77. taskId, err := l.createTask("SCHEDULE_TASK_"+utils.RandomString(TRAINNING_TASK_SUFFIX_LEN), req.JobResources.ScheduleStrategy, assignedClusters)
  78. if err != nil {
  79. return nil, err
  80. }
  81. resp.ScheduleDatas = schedatas
  82. resp.TaskID = taskId
  83. return resp, nil
  84. }
  85. }
  86. func (l *ScheduleCreateTaskLogic) getAssignedClustersByStrategy(resources *types.JobResources) ([]*strategy.AssignedCluster, error) {
  87. var assignedClusters []*strategy.AssignedCluster
  88. switch resources.ScheduleStrategy {
  89. case strategy.LEASTLOADFIRST:
  90. var resSpecs []*collector.ResourceSpec
  91. var resCount int
  92. for i := 0; i < QUERY_RESOURCE_RETRY; i++ {
  93. defer time.Sleep(time.Second)
  94. qResources, err := l.queryResource.queryResources(make([]string, 0))
  95. if err != nil {
  96. continue
  97. }
  98. for _, resource := range qResources {
  99. if resource.Resources != nil {
  100. resCount++
  101. }
  102. }
  103. if resCount >= 1 {
  104. resSpecs = qResources
  105. break
  106. } else {
  107. resCount = 0
  108. continue
  109. }
  110. }
  111. if resCount == 0 {
  112. return nil, fmt.Errorf("failed to create task, resources counting fails")
  113. }
  114. strtg := strategy.NewLeastLoadFirst(TRAINNING_TASK_REPLICA, resSpecs)
  115. clusters, err := strtg.Schedule()
  116. if err != nil {
  117. return nil, err
  118. }
  119. assignedClusters = copyParams(clusters, resources.Clusters)
  120. }
  121. return assignedClusters, nil
  122. }
  123. func copyParams(clusters []*strategy.AssignedCluster, clusterInfos []*types.JobClusterInfo) []*strategy.AssignedCluster {
  124. var result []*strategy.AssignedCluster
  125. for _, c := range clusters {
  126. for _, info := range clusterInfos {
  127. if c.ClusterId == info.ClusterID {
  128. var envs []string
  129. var params []string
  130. for k, v := range info.Runtime.Envs {
  131. val := common.ConvertTypeToString(v)
  132. if val != "" {
  133. env := k + storeLink.COMMA + val
  134. envs = append(envs, env)
  135. }
  136. }
  137. for k, v := range info.Runtime.Params {
  138. val := common.ConvertTypeToString(v)
  139. if val != "" {
  140. p := k + storeLink.COMMA + val
  141. params = append(params, p)
  142. }
  143. }
  144. cluster := &strategy.AssignedCluster{
  145. ClusterId: c.ClusterId,
  146. ClusterName: c.ClusterName,
  147. Replicas: c.Replicas,
  148. Cmd: info.Runtime.Command,
  149. Envs: envs,
  150. Params: params,
  151. }
  152. result = append(result, cluster)
  153. }
  154. }
  155. }
  156. return result
  157. }
  158. func (l *ScheduleCreateTaskLogic) createTask(taskName string, strategyName string, clusters []*strategy.AssignedCluster) (int64, error) {
  159. var synergyStatus int64
  160. if len(clusters) > 1 {
  161. synergyStatus = 1
  162. }
  163. y, err := yaml.Marshal(clusters)
  164. if err != nil {
  165. fmt.Printf("Error while Marshaling. %v", err)
  166. }
  167. taskId, err := l.svcCtx.Scheduler.CreateTask(taskName, synergyStatus, strategyName, string(y))
  168. if err != nil {
  169. return 0, err
  170. }
  171. return taskId, nil
  172. }
  173. func (l *ScheduleCreateTaskLogic) generateScheduleResult(distribute types.DataDistribute, clusters []string) ([]*types.ScheduleData, error) {
  174. var schedatas []*types.ScheduleData
  175. for _, d := range distribute.Dataset {
  176. data := &types.ScheduleData{
  177. DataType: "dataset",
  178. PackageID: d.PackageID,
  179. ClusterIDs: make([]string, 0),
  180. }
  181. var cSlc []string
  182. for _, cluster := range d.Clusters {
  183. cSlc = append(cSlc, cluster.ClusterID)
  184. }
  185. for _, cluster := range clusters {
  186. if !slices.Contains(cSlc, cluster) {
  187. data.ClusterIDs = append(data.ClusterIDs, cluster)
  188. } else {
  189. continue
  190. }
  191. }
  192. if len(data.ClusterIDs) != 0 {
  193. schedatas = append(schedatas, data)
  194. }
  195. }
  196. for _, d := range distribute.Code {
  197. data := &types.ScheduleData{
  198. DataType: "code",
  199. PackageID: d.PackageID,
  200. ClusterIDs: make([]string, 0),
  201. }
  202. var cSlc []string
  203. for _, cluster := range d.Clusters {
  204. cSlc = append(cSlc, cluster.ClusterID)
  205. }
  206. for _, cluster := range clusters {
  207. if !slices.Contains(cSlc, cluster) {
  208. data.ClusterIDs = append(data.ClusterIDs, cluster)
  209. } else {
  210. continue
  211. }
  212. }
  213. if len(data.ClusterIDs) != 0 {
  214. schedatas = append(schedatas, data)
  215. }
  216. }
  217. for _, d := range distribute.Image {
  218. data := &types.ScheduleData{
  219. DataType: "image",
  220. PackageID: d.PackageID,
  221. ClusterIDs: make([]string, 0),
  222. }
  223. var cSlc []string
  224. for _, cluster := range d.Clusters {
  225. cSlc = append(cSlc, cluster.ClusterID)
  226. }
  227. for _, cluster := range clusters {
  228. if !slices.Contains(cSlc, cluster) {
  229. data.ClusterIDs = append(data.ClusterIDs, cluster)
  230. } else {
  231. continue
  232. }
  233. }
  234. if len(data.ClusterIDs) != 0 {
  235. schedatas = append(schedatas, data)
  236. }
  237. }
  238. for _, d := range distribute.Model {
  239. data := &types.ScheduleData{
  240. DataType: "model",
  241. PackageID: d.PackageID,
  242. ClusterIDs: make([]string, 0),
  243. }
  244. var cSlc []string
  245. for _, cluster := range d.Clusters {
  246. cSlc = append(cSlc, cluster.ClusterID)
  247. }
  248. for _, cluster := range clusters {
  249. if !slices.Contains(cSlc, cluster) {
  250. data.ClusterIDs = append(data.ClusterIDs, cluster)
  251. } else {
  252. continue
  253. }
  254. }
  255. if len(data.ClusterIDs) != 0 {
  256. schedatas = append(schedatas, data)
  257. }
  258. }
  259. if len(schedatas) != 0 {
  260. err := l.updateStorageType(&schedatas)
  261. if err != nil {
  262. return nil, err
  263. }
  264. }
  265. return schedatas, nil
  266. }
  267. func (l *ScheduleCreateTaskLogic) updateStorageType(schedatas *[]*types.ScheduleData) error {
  268. for _, s := range *schedatas {
  269. var storageType string
  270. var sTypes []string
  271. for _, id := range s.ClusterIDs {
  272. cluster, err := l.svcCtx.Scheduler.AiStorages.GetClustersById(id)
  273. if err != nil {
  274. return err
  275. }
  276. stype, ok := storeLink.StorageTypeMap[strings.Title(cluster.Name)]
  277. if ok {
  278. sTypes = append(sTypes, stype)
  279. }
  280. }
  281. sTypes = common.Unique(sTypes)
  282. for _, st := range sTypes {
  283. storageType += st + storeLink.COMMA
  284. }
  285. storageType = strings.TrimSuffix(storageType, storeLink.COMMA)
  286. s.StorageType = storageType
  287. }
  288. return nil
  289. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.