You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

schedulecreatetasklogic.go 9.9 kB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
10 months ago
10 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
10 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
10 months ago
10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. package schedule
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/pkg/errors"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/common"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  13. "gopkg.in/yaml.v3"
  14. "slices"
  15. "strings"
  16. "time"
  17. "github.com/zeromicro/go-zero/core/logx"
  18. )
  19. const (
  20. TRAINNING_TASK_REPLICA = 1
  21. TRAINNING_TASK_SUFFIX_LEN = 10
  22. QUERY_RESOURCE_RETRY = 3
  23. )
  24. type ScheduleCreateTaskLogic struct {
  25. logx.Logger
  26. ctx context.Context
  27. svcCtx *svc.ServiceContext
  28. queryResource *QueryResourcesLogic
  29. }
  30. func NewScheduleCreateTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCreateTaskLogic {
  31. return &ScheduleCreateTaskLogic{
  32. Logger: logx.WithContext(ctx),
  33. ctx: ctx,
  34. svcCtx: svcCtx,
  35. queryResource: NewQueryResourcesLogic(ctx, svcCtx),
  36. }
  37. }
  38. func (l *ScheduleCreateTaskLogic) ScheduleCreateTask(req *types.CreateTaskReq) (resp *types.CreateTaskResp, err error) {
  39. resp = &types.CreateTaskResp{}
  40. err = validateJobResources(req.JobResources)
  41. if err != nil {
  42. return nil, err
  43. }
  44. taskName, err := l.handleDuplicateTaskName(req.Name)
  45. if err != nil {
  46. return nil, err
  47. }
  48. var clusters []string
  49. if len(req.JobResources.Clusters) == 1 {
  50. clusters = append(clusters, req.JobResources.Clusters[0].ClusterID)
  51. schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters)
  52. if err != nil {
  53. return nil, err
  54. }
  55. assignedClusters := copyParams([]*strategy.AssignedCluster{{
  56. ClusterId: req.JobResources.Clusters[0].ClusterID,
  57. }}, req.JobResources.Clusters)
  58. taskId, err := l.createTask(taskName, req.Description, req.JobResources.ScheduleStrategy, assignedClusters, req.Token)
  59. if err != nil {
  60. return nil, err
  61. }
  62. resp.ScheduleDatas = schedatas
  63. resp.TaskID = taskId
  64. return resp, nil
  65. } else {
  66. assignedClusters, err := l.getAssignedClustersByStrategy(&req.JobResources, &req.DataDistributes)
  67. if err != nil {
  68. return nil, err
  69. }
  70. if len(assignedClusters) == 0 {
  71. return nil, fmt.Errorf("failed to create task, no scheduled cluster found")
  72. }
  73. for _, c := range assignedClusters {
  74. clusters = append(clusters, c.ClusterId)
  75. }
  76. schedatas, err := l.generateScheduleResult(req.DataDistributes, clusters)
  77. if err != nil {
  78. return nil, err
  79. }
  80. taskId, err := l.createTask(taskName, req.Description, req.JobResources.ScheduleStrategy, assignedClusters, req.Token)
  81. if err != nil {
  82. return nil, err
  83. }
  84. resp.ScheduleDatas = schedatas
  85. resp.TaskID = taskId
  86. return resp, nil
  87. }
  88. }
  89. func validateJobResources(resources types.JobResources) error {
  90. if resources.ScheduleStrategy == "" {
  91. return fmt.Errorf("must specify ScheduleStrategy")
  92. }
  93. if len(resources.Clusters) == 0 {
  94. return fmt.Errorf("must specify at least one cluster")
  95. }
  96. for _, c := range resources.Clusters {
  97. if c.ClusterID == "" {
  98. return fmt.Errorf("must specify clusterID")
  99. }
  100. if len(c.Resources) == 0 {
  101. return fmt.Errorf("cluster: %s must specify at least one compute resource", c.ClusterID)
  102. //return errors.Wrapf(xerr.NewErrCodeMsg(1234, fmt.Sprintf("cluster: %s must specify at least one compute resource", c.ClusterID)), "")
  103. }
  104. }
  105. return nil
  106. }
  107. func (l *ScheduleCreateTaskLogic) handleDuplicateTaskName(name string) (string, error) {
  108. exist, err := l.svcCtx.Scheduler.AiStorages.DoesTaskNameExist(name)
  109. if err != nil {
  110. return "", err
  111. }
  112. if exist {
  113. return name + "_" + time.Now().Format(constants.Layout_Time_Suffix), nil
  114. }
  115. return name, nil
  116. }
  117. func (l *ScheduleCreateTaskLogic) getAssignedClustersByStrategy(resources *types.JobResources, dataDistribute *types.DataDistribute) ([]*strategy.AssignedCluster, error) {
  118. var assignedClusters []*strategy.AssignedCluster
  119. switch resources.ScheduleStrategy {
  120. case strategy.LEASTLOADFIRST:
  121. var resSpecs []*collector.ResourceSpec
  122. var resCount int
  123. for i := 0; i < QUERY_RESOURCE_RETRY; i++ {
  124. defer time.Sleep(time.Second)
  125. qResources, err := l.queryResource.QueryResourcesByClusterId(nil)
  126. if err != nil {
  127. continue
  128. }
  129. for _, resource := range qResources {
  130. if resource.Resources != nil {
  131. resCount++
  132. }
  133. }
  134. if resCount >= 1 {
  135. resSpecs = qResources
  136. break
  137. } else {
  138. resCount = 0
  139. continue
  140. }
  141. }
  142. if resCount == 0 {
  143. return nil, fmt.Errorf("failed to create task, resources counting fails")
  144. }
  145. strtg := strategy.NewLeastLoadFirst(TRAINNING_TASK_REPLICA, resSpecs)
  146. clusters, err := strtg.Schedule()
  147. if err != nil {
  148. return nil, err
  149. }
  150. assignedClusters = copyParams(clusters, resources.Clusters)
  151. case strategy.DATA_LOCALITY:
  152. strtg := strategy.NewDataLocality(TRAINNING_TASK_REPLICA, dataDistribute)
  153. clusters, err := strtg.Schedule()
  154. if err != nil {
  155. return nil, err
  156. }
  157. assignedClusters = copyParams(clusters, resources.Clusters)
  158. default:
  159. return nil, errors.New("no strategy has been chosen")
  160. }
  161. return assignedClusters, nil
  162. }
  163. func copyParams(clusters []*strategy.AssignedCluster, clusterInfos []*types.JobClusterInfo) []*strategy.AssignedCluster {
  164. var result []*strategy.AssignedCluster
  165. for _, c := range clusters {
  166. for _, info := range clusterInfos {
  167. if c.ClusterId == info.ClusterID {
  168. var envs []string
  169. var params []string
  170. for k, v := range info.Runtime.Envs {
  171. val := common.ConvertTypeToString(v)
  172. if val != "" {
  173. env := k + storeLink.COMMA + val
  174. envs = append(envs, env)
  175. }
  176. }
  177. for k, v := range info.Runtime.Params {
  178. val := common.ConvertTypeToString(v)
  179. if val != "" {
  180. p := k + storeLink.COMMA + val
  181. params = append(params, p)
  182. }
  183. }
  184. cluster := &strategy.AssignedCluster{
  185. ClusterId: c.ClusterId,
  186. ClusterName: c.ClusterName,
  187. Replicas: c.Replicas,
  188. ResourcesRequired: info.Resources,
  189. Cmd: info.Runtime.Command,
  190. Envs: envs,
  191. Params: params,
  192. }
  193. result = append(result, cluster)
  194. }
  195. }
  196. }
  197. return result
  198. }
  199. func (l *ScheduleCreateTaskLogic) createTask(taskName string, desc string, strategyName string, clusters []*strategy.AssignedCluster, token string) (int64, error) {
  200. var synergyStatus int64
  201. if len(clusters) > 1 {
  202. synergyStatus = 1
  203. }
  204. y, err := yaml.Marshal(clusters)
  205. if err != nil {
  206. fmt.Printf("Error while Marshaling. %v", err)
  207. }
  208. taskId, err := l.svcCtx.Scheduler.CreateTask(taskName, desc, synergyStatus, strategyName, string(y), token, &l.svcCtx.Config)
  209. if err != nil {
  210. return 0, err
  211. }
  212. return taskId, nil
  213. }
  214. func (l *ScheduleCreateTaskLogic) generateScheduleResult(distribute types.DataDistribute, clusters []string) ([]*types.ScheduleData, error) {
  215. var schedatas []*types.ScheduleData
  216. for _, d := range distribute.Dataset {
  217. data := &types.ScheduleData{
  218. DataType: "dataset",
  219. PackageID: d.PackageID,
  220. ClusterIDs: make([]string, 0),
  221. }
  222. var cSlc []string
  223. for _, cluster := range d.Clusters {
  224. cSlc = append(cSlc, cluster.ClusterID)
  225. }
  226. for _, cluster := range clusters {
  227. if !slices.Contains(cSlc, cluster) {
  228. data.ClusterIDs = append(data.ClusterIDs, cluster)
  229. } else {
  230. continue
  231. }
  232. }
  233. if len(data.ClusterIDs) != 0 {
  234. schedatas = append(schedatas, data)
  235. }
  236. }
  237. for _, d := range distribute.Code {
  238. data := &types.ScheduleData{
  239. DataType: "code",
  240. PackageID: d.PackageID,
  241. ClusterIDs: make([]string, 0),
  242. }
  243. var cSlc []string
  244. for _, cluster := range d.Clusters {
  245. cSlc = append(cSlc, cluster.ClusterID)
  246. }
  247. for _, cluster := range clusters {
  248. if !slices.Contains(cSlc, cluster) {
  249. data.ClusterIDs = append(data.ClusterIDs, cluster)
  250. } else {
  251. continue
  252. }
  253. }
  254. if len(data.ClusterIDs) != 0 {
  255. schedatas = append(schedatas, data)
  256. }
  257. }
  258. for _, d := range distribute.Image {
  259. data := &types.ScheduleData{
  260. DataType: "image",
  261. PackageID: d.PackageID,
  262. ClusterIDs: make([]string, 0),
  263. }
  264. var cSlc []string
  265. for _, cluster := range d.Clusters {
  266. cSlc = append(cSlc, cluster.ClusterID)
  267. }
  268. for _, cluster := range clusters {
  269. if !slices.Contains(cSlc, cluster) {
  270. data.ClusterIDs = append(data.ClusterIDs, cluster)
  271. } else {
  272. continue
  273. }
  274. }
  275. if len(data.ClusterIDs) != 0 {
  276. schedatas = append(schedatas, data)
  277. }
  278. }
  279. for _, d := range distribute.Model {
  280. data := &types.ScheduleData{
  281. DataType: "model",
  282. PackageID: d.PackageID,
  283. ClusterIDs: make([]string, 0),
  284. }
  285. var cSlc []string
  286. for _, cluster := range d.Clusters {
  287. cSlc = append(cSlc, cluster.ClusterID)
  288. }
  289. for _, cluster := range clusters {
  290. if !slices.Contains(cSlc, cluster) {
  291. data.ClusterIDs = append(data.ClusterIDs, cluster)
  292. } else {
  293. continue
  294. }
  295. }
  296. if len(data.ClusterIDs) != 0 {
  297. schedatas = append(schedatas, data)
  298. }
  299. }
  300. if len(schedatas) != 0 {
  301. err := l.updateStorageType(&schedatas)
  302. if err != nil {
  303. return nil, err
  304. }
  305. }
  306. return schedatas, nil
  307. }
  308. func (l *ScheduleCreateTaskLogic) updateStorageType(schedatas *[]*types.ScheduleData) error {
  309. for _, s := range *schedatas {
  310. var storageType string
  311. var sTypes []string
  312. for _, id := range s.ClusterIDs {
  313. cluster, err := l.svcCtx.Scheduler.AiStorages.GetClustersById(id)
  314. if err != nil {
  315. return err
  316. }
  317. stype, ok := storeLink.StorageTypeMap[strings.Title(cluster.Name)]
  318. if ok {
  319. sTypes = append(sTypes, stype)
  320. }
  321. }
  322. sTypes = common.Unique(sTypes)
  323. for _, st := range sTypes {
  324. storageType += st + storeLink.COMMA
  325. }
  326. storageType = strings.TrimSuffix(storageType, storeLink.COMMA)
  327. s.StorageType = storageType
  328. }
  329. return nil
  330. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.