You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 5.6 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package grampus
  2. import (
  3. "encoding/json"
  4. "strings"
  5. "code.gitea.io/gitea/modules/setting"
  6. "code.gitea.io/gitea/models"
  7. "code.gitea.io/gitea/modules/context"
  8. "code.gitea.io/gitea/modules/log"
  9. "code.gitea.io/gitea/modules/notification"
  10. "code.gitea.io/gitea/modules/timeutil"
  11. )
  12. const (
  13. JobPath = "job/"
  14. ProcessorTypeNPU = "npu.huawei.com/NPU"
  15. ProcessorTypeGPU = "nvidia.com/gpu"
  16. GpuWorkDir = "/"
  17. NpuWorkDir = "/cache/"
  18. CodeArchiveName = "master.zip"
  19. )
  20. var (
  21. poolInfos *models.PoolInfos
  22. FlavorInfos *models.FlavorInfos
  23. ImageInfos *models.ImageInfosModelArts
  24. SpecialPools *models.SpecialPools
  25. )
  26. type GenerateTrainJobReq struct {
  27. JobName string
  28. Command string
  29. ResourceSpecId string
  30. ImageUrl string //与image_id二选一,都有的情况下优先image_url
  31. ImageId string
  32. DisplayJobName string
  33. Uuid string
  34. Description string
  35. CodeObsPath string
  36. BootFile string
  37. BootFileUrl string
  38. DataUrl string
  39. TrainUrl string
  40. WorkServerNumber int
  41. EngineID int64
  42. CommitID string
  43. IsLatestVersion string
  44. BranchName string
  45. PreVersionId int64
  46. PreVersionName string
  47. FlavorName string
  48. VersionCount int
  49. EngineName string
  50. TotalVersionCount int
  51. ComputeResource string
  52. ProcessType string
  53. DatasetName string
  54. Params string
  55. }
  56. func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error) {
  57. createTime := timeutil.TimeStampNow()
  58. centerID, centerName := getCentersParamter(ctx, req)
  59. jobResult, err := createJob(models.CreateGrampusJobRequest{
  60. Name: req.JobName,
  61. Tasks: []models.GrampusTasks{
  62. {
  63. Name: req.JobName,
  64. Command: req.Command,
  65. ResourceSpecId: req.ResourceSpecId,
  66. ImageId: req.ImageId,
  67. ImageUrl: req.ImageUrl,
  68. CenterID: centerID,
  69. CenterName: centerName,
  70. ReplicaNum: 1,
  71. },
  72. },
  73. })
  74. if err != nil {
  75. log.Error("createJob failed: %v", err.Error())
  76. return err
  77. }
  78. jobID := jobResult.JobInfo.JobID
  79. err = models.CreateCloudbrain(&models.Cloudbrain{
  80. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  81. UserID: ctx.User.ID,
  82. RepoID: ctx.Repo.Repository.ID,
  83. JobID: jobID,
  84. JobName: req.JobName,
  85. DisplayJobName: req.DisplayJobName,
  86. JobType: string(models.JobTypeTrain),
  87. Type: models.TypeC2Net,
  88. Uuid: req.Uuid,
  89. DatasetName: req.DatasetName,
  90. CommitID: req.CommitID,
  91. IsLatestVersion: req.IsLatestVersion,
  92. ComputeResource: req.ComputeResource,
  93. ImageID: req.ImageId,
  94. TrainUrl: req.TrainUrl,
  95. BranchName: req.BranchName,
  96. Parameters: req.Params,
  97. BootFile: req.BootFile,
  98. DataUrl: req.DataUrl,
  99. FlavorCode: req.ResourceSpecId,
  100. Description: req.Description,
  101. WorkServerNumber: req.WorkServerNumber,
  102. FlavorName: req.FlavorName,
  103. EngineName: req.EngineName,
  104. VersionCount: req.VersionCount,
  105. TotalVersionCount: req.TotalVersionCount,
  106. CreatedUnix: createTime,
  107. UpdatedUnix: createTime,
  108. })
  109. if err != nil {
  110. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  111. return err
  112. }
  113. var actionType models.ActionType
  114. if req.ComputeResource == models.NPUResource {
  115. actionType = models.ActionCreateGrampusNPUTrainTask
  116. } else if req.ComputeResource == models.GPUResource {
  117. actionType = models.ActionCreateGrampusGPUTrainTask
  118. }
  119. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  120. return nil
  121. }
  122. func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
  123. var centerID []string
  124. var centerName []string
  125. includeCenters := make(map[string]string)
  126. excludeCenters := make(map[string]string)
  127. if SpecialPools != nil {
  128. for _, pool := range SpecialPools.Pools {
  129. if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
  130. org, _ := models.GetOrgByName(pool.Org)
  131. if org != nil {
  132. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  133. if isOrgMember {
  134. for _, info := range pool.Pool {
  135. includeCenters[info.Queue] = info.Value
  136. }
  137. } else {
  138. for _, info := range pool.Pool {
  139. excludeCenters[info.Queue] = info.Value
  140. }
  141. }
  142. }
  143. }
  144. }
  145. }
  146. if len(includeCenters) > 0 {
  147. //如果有专属资源池,根据专属资源池指定智算中心
  148. for k, v := range includeCenters {
  149. centerID = append(centerID, k)
  150. centerName = append(centerName, v)
  151. }
  152. } else if len(excludeCenters) > 0 {
  153. //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
  154. allCenters := make(map[string]string)
  155. specs, err := GetResourceSpecs(req.ProcessType)
  156. if err == nil {
  157. for _, info := range specs.Infos {
  158. for _, center := range info.Centers {
  159. allCenters[center.ID] = center.Name
  160. }
  161. }
  162. }
  163. for k, _ := range excludeCenters {
  164. delete(allCenters, k)
  165. }
  166. for k, v := range allCenters {
  167. centerID = append(centerID, k)
  168. centerName = append(centerName, v)
  169. }
  170. }
  171. return centerID, centerName
  172. }
  173. func TransTrainJobStatus(status string) string {
  174. if status == models.GrampusStatusPending {
  175. status = models.GrampusStatusWaiting
  176. }
  177. return strings.ToUpper(status)
  178. }
  179. func InitSpecialPool() {
  180. if SpecialPools == nil && setting.Grampus.SpecialPools != "" {
  181. json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools)
  182. }
  183. }