You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 5.9 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. package grampus
  2. import (
  3. "encoding/json"
  4. "strings"
  5. "code.gitea.io/gitea/modules/setting"
  6. "code.gitea.io/gitea/models"
  7. "code.gitea.io/gitea/modules/context"
  8. "code.gitea.io/gitea/modules/log"
  9. "code.gitea.io/gitea/modules/notification"
  10. "code.gitea.io/gitea/modules/timeutil"
  11. )
  12. const (
  13. JobPath = "job/"
  14. ProcessorTypeNPU = "npu.huawei.com/NPU"
  15. ProcessorTypeGPU = "nvidia.com/gpu"
  16. GpuWorkDir = "/tmp/"
  17. NpuWorkDir = "/cache/"
  18. CommandPrepareScript = ";mkdir -p output;mkdir -p code;mkdir -p dataset;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/script_for_grampus/archive/master.zip;" +
  19. "echo \"finish loading script\";unzip -q master.zip;cd script_for_grampus;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;"
  20. CodeArchiveName = "master.zip"
  21. )
  22. var (
  23. poolInfos *models.PoolInfos
  24. FlavorInfos *setting.StFlavorInfos
  25. ImageInfos *setting.StImageInfosModelArts
  26. SpecialPools *models.SpecialPools
  27. )
  28. type GenerateTrainJobReq struct {
  29. JobName string
  30. Command string
  31. ImageUrl string //与image_id二选一,都有的情况下优先image_url
  32. ImageId string
  33. DisplayJobName string
  34. Uuid string
  35. Description string
  36. CodeObsPath string
  37. BootFile string
  38. BootFileUrl string
  39. DataUrl string
  40. TrainUrl string
  41. WorkServerNumber int
  42. EngineID int64
  43. CommitID string
  44. IsLatestVersion string
  45. BranchName string
  46. PreVersionId int64
  47. PreVersionName string
  48. VersionCount int
  49. EngineName string
  50. TotalVersionCount int
  51. ComputeResource string
  52. ProcessType string
  53. DatasetName string
  54. Params string
  55. Spec *models.Specification
  56. }
  57. func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (err error) {
  58. createTime := timeutil.TimeStampNow()
  59. centerID, centerName := getCentersParamter(ctx, req)
  60. jobResult, err := createJob(models.CreateGrampusJobRequest{
  61. Name: req.JobName,
  62. Tasks: []models.GrampusTasks{
  63. {
  64. Name: req.JobName,
  65. Command: req.Command,
  66. ResourceSpecId: req.Spec.SourceSpecId,
  67. ImageId: req.ImageId,
  68. ImageUrl: req.ImageUrl,
  69. CenterID: centerID,
  70. CenterName: centerName,
  71. ReplicaNum: 1,
  72. },
  73. },
  74. })
  75. if err != nil {
  76. log.Error("createJob failed: %v", err.Error())
  77. return err
  78. }
  79. jobID := jobResult.JobInfo.JobID
  80. err = models.CreateCloudbrain(&models.Cloudbrain{
  81. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  82. UserID: ctx.User.ID,
  83. RepoID: ctx.Repo.Repository.ID,
  84. JobID: jobID,
  85. JobName: req.JobName,
  86. DisplayJobName: req.DisplayJobName,
  87. JobType: string(models.JobTypeTrain),
  88. Type: models.TypeC2Net,
  89. Uuid: req.Uuid,
  90. DatasetName: req.DatasetName,
  91. CommitID: req.CommitID,
  92. IsLatestVersion: req.IsLatestVersion,
  93. ComputeResource: req.ComputeResource,
  94. ImageID: req.ImageId,
  95. TrainUrl: req.TrainUrl,
  96. BranchName: req.BranchName,
  97. Parameters: req.Params,
  98. BootFile: req.BootFile,
  99. DataUrl: req.DataUrl,
  100. Description: req.Description,
  101. WorkServerNumber: req.WorkServerNumber,
  102. EngineName: req.EngineName,
  103. VersionCount: req.VersionCount,
  104. TotalVersionCount: req.TotalVersionCount,
  105. CreatedUnix: createTime,
  106. UpdatedUnix: createTime,
  107. Spec: req.Spec,
  108. })
  109. if err != nil {
  110. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  111. return err
  112. }
  113. var actionType models.ActionType
  114. if req.ComputeResource == models.NPUResource {
  115. actionType = models.ActionCreateGrampusNPUTrainTask
  116. } else if req.ComputeResource == models.GPUResource {
  117. actionType = models.ActionCreateGrampusGPUTrainTask
  118. }
  119. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  120. return nil
  121. }
  122. func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
  123. var centerID []string
  124. var centerName []string
  125. includeCenters := make(map[string]string)
  126. excludeCenters := make(map[string]string)
  127. if SpecialPools != nil {
  128. for _, pool := range SpecialPools.Pools {
  129. if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
  130. org, _ := models.GetOrgByName(pool.Org)
  131. if org != nil {
  132. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  133. if isOrgMember {
  134. for _, info := range pool.Pool {
  135. includeCenters[info.Queue] = info.Value
  136. }
  137. } else {
  138. for _, info := range pool.Pool {
  139. excludeCenters[info.Queue] = info.Value
  140. }
  141. }
  142. }
  143. }
  144. }
  145. }
  146. if len(includeCenters) > 0 {
  147. //如果有专属资源池,根据专属资源池指定智算中心
  148. for k, v := range includeCenters {
  149. centerID = append(centerID, k)
  150. centerName = append(centerName, v)
  151. }
  152. } else if len(excludeCenters) > 0 {
  153. //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
  154. allCenters := make(map[string]string)
  155. specs, err := GetResourceSpecs(req.ProcessType)
  156. if err == nil {
  157. for _, info := range specs.Infos {
  158. for _, center := range info.Centers {
  159. allCenters[center.ID] = center.Name
  160. }
  161. }
  162. }
  163. for k, _ := range excludeCenters {
  164. delete(allCenters, k)
  165. }
  166. for k, v := range allCenters {
  167. centerID = append(centerID, k)
  168. centerName = append(centerName, v)
  169. }
  170. }
  171. return centerID, centerName
  172. }
  173. func TransTrainJobStatus(status string) string {
  174. if status == models.GrampusStatusPending {
  175. status = models.GrampusStatusWaiting
  176. }
  177. return strings.ToUpper(status)
  178. }
  179. func InitSpecialPool() {
  180. if SpecialPools == nil && setting.Grampus.SpecialPools != "" {
  181. json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools)
  182. }
  183. }