You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 12 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago

  1. package grampus
  2. import (
  3. "encoding/json"
  4. "strings"
  5. "code.gitea.io/gitea/models"
  6. "code.gitea.io/gitea/modules/cloudbrain"
  7. "code.gitea.io/gitea/modules/context"
  8. "code.gitea.io/gitea/modules/log"
  9. "code.gitea.io/gitea/modules/notification"
  10. "code.gitea.io/gitea/modules/setting"
  11. "code.gitea.io/gitea/modules/timeutil"
  12. )
  13. const (
  14. JobPath = "job/"
  15. ProcessorTypeNPU = "npu.huawei.com/NPU"
  16. ProcessorTypeGPU = "nvidia.com/gpu"
  17. GpuWorkDir = "/tmp/"
  18. NpuWorkDir = "/cache/"
  19. NpuLocalLogUrl = "/tmp/train.log"
  20. CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;"
  21. CodeArchiveName = "master.zip"
  22. BucketRemote = "grampus"
  23. RemoteModelPath = "/output/" + models.ModelSuffix
  24. autoStopDurationMs = 4 * 60 * 60 * 1000
  25. )
  26. var (
  27. poolInfos *models.PoolInfos
  28. FlavorInfos *setting.StFlavorInfos
  29. ImageInfos *setting.StImageInfosModelArts
  30. SpecialPools *models.SpecialPools
  31. CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" +
  32. "echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;"
  33. )
  34. type GenerateTrainJobReq struct {
  35. JobName string
  36. Command string
  37. ImageUrl string //与image_id二选一,都有的情况下优先image_url
  38. ImageId string
  39. DisplayJobName string
  40. Uuid string
  41. Description string
  42. CodeObsPath string
  43. BootFile string
  44. BootFileUrl string
  45. DataUrl string
  46. TrainUrl string
  47. WorkServerNumber int
  48. EngineID int64
  49. CommitID string
  50. IsLatestVersion string
  51. BranchName string
  52. PreVersionId int64
  53. PreVersionName string
  54. VersionCount int
  55. EngineName string
  56. TotalVersionCount int
  57. ComputeResource string
  58. ProcessType string
  59. DatasetNames string
  60. DatasetInfos map[string]models.DatasetInfo
  61. Params string
  62. ModelName string
  63. LabelName string
  64. CkptName string
  65. ModelVersion string
  66. PreTrainModelPath string
  67. PreTrainModelUrl string
  68. Spec *models.Specification
  69. CodeName string
  70. }
  71. type GenerateNotebookJobReq struct {
  72. JobName string
  73. Command string
  74. ImageUrl string
  75. ImageId string
  76. DisplayJobName string
  77. Uuid string
  78. Description string
  79. CodeStoragePath string
  80. CommitID string
  81. BranchName string
  82. ComputeResource string
  83. ProcessType string
  84. DatasetNames string
  85. DatasetInfos map[string]models.DatasetInfo
  86. Spec *models.Specification
  87. CodeName string
  88. }
  89. func getEndPoint() string {
  90. index := strings.Index(setting.Endpoint, "//")
  91. endpoint := setting.Endpoint[index+2:]
  92. return endpoint
  93. }
  94. func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.GrampusDataset {
  95. var datasetGrampus []models.GrampusDataset
  96. endPoint := getEndPoint()
  97. for _, datasetInfo := range datasetInfos {
  98. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  99. Name: datasetInfo.FullName,
  100. Bucket: setting.Bucket,
  101. EndPoint: endPoint,
  102. ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName,
  103. })
  104. }
  105. return datasetGrampus
  106. }
  107. func GenerateNotebookJob(ctx *context.Context, req *GenerateNotebookJobReq) (jobId string, err error) {
  108. createTime := timeutil.TimeStampNow()
  109. var datasetGrampus []models.GrampusDataset
  110. var codeGrampus models.GrampusDataset
  111. if ProcessorTypeNPU == req.ProcessType {
  112. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  113. codeGrampus = models.GrampusDataset{
  114. Name: req.CodeName,
  115. Bucket: setting.Bucket,
  116. EndPoint: getEndPoint(),
  117. ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip",
  118. }
  119. } else {
  120. codeGrampus = models.GrampusDataset{
  121. Name: req.CodeName,
  122. Bucket: setting.Bucket,
  123. EndPoint: getEndPoint(),
  124. ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip",
  125. }
  126. }
  127. jobResult, err := createNotebookJob(models.CreateGrampusNotebookRequest{
  128. Name: req.JobName,
  129. Tasks: []models.GrampusNotebookTask{
  130. {
  131. Name: req.JobName,
  132. ResourceSpecId: req.Spec.SourceSpecId,
  133. ImageId: req.ImageId,
  134. ImageUrl: req.ImageUrl,
  135. Datasets: datasetGrampus,
  136. Code: codeGrampus,
  137. AutoStopDuration: autoStopDurationMs,
  138. Capacity: setting.Capacity,
  139. },
  140. },
  141. })
  142. if err != nil {
  143. log.Error("createNotebookJob failed: %v", err.Error())
  144. return "", err
  145. }
  146. jobID := jobResult.JobInfo.JobID
  147. err = models.CreateCloudbrain(&models.Cloudbrain{
  148. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  149. UserID: ctx.User.ID,
  150. RepoID: ctx.Repo.Repository.ID,
  151. JobID: jobID,
  152. JobName: req.JobName,
  153. DisplayJobName: req.DisplayJobName,
  154. JobType: string(models.JobTypeDebug),
  155. Type: models.TypeC2Net,
  156. Uuid: req.Uuid,
  157. DatasetName: req.DatasetNames,
  158. CommitID: req.CommitID,
  159. IsLatestVersion: "1",
  160. ComputeResource: req.ComputeResource,
  161. ImageID: req.ImageId,
  162. BranchName: req.BranchName,
  163. Description: req.Description,
  164. WorkServerNumber: 1,
  165. EngineName: req.ImageUrl,
  166. CreatedUnix: createTime,
  167. UpdatedUnix: createTime,
  168. Spec: req.Spec,
  169. })
  170. if err != nil {
  171. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  172. return "", err
  173. }
  174. var actionType models.ActionType
  175. if req.ComputeResource == models.NPUResource {
  176. actionType = models.ActionCreateGrampusNPUDebugTask
  177. } else if req.ComputeResource == models.GPUResource {
  178. actionType = models.ActionCreateGrampusGPUDebugTask
  179. }
  180. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  181. return jobID, nil
  182. }
  183. func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) {
  184. createTime := timeutil.TimeStampNow()
  185. centerID, centerName := getCentersParamter(ctx, req)
  186. var datasetGrampus, modelGrampus []models.GrampusDataset
  187. var codeGrampus models.GrampusDataset
  188. if ProcessorTypeNPU == req.ProcessType {
  189. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  190. if len(req.ModelName) != 0 {
  191. modelGrampus = []models.GrampusDataset{
  192. {
  193. Name: req.ModelName,
  194. Bucket: setting.Bucket,
  195. EndPoint: getEndPoint(),
  196. ObjectKey: req.PreTrainModelPath,
  197. },
  198. }
  199. }
  200. codeGrampus = models.GrampusDataset{
  201. Name: req.CodeName,
  202. Bucket: setting.Bucket,
  203. EndPoint: getEndPoint(),
  204. ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip",
  205. }
  206. }
  207. jobResult, err := createJob(models.CreateGrampusJobRequest{
  208. Name: req.JobName,
  209. Tasks: []models.GrampusTasks{
  210. {
  211. Name: req.JobName,
  212. Command: req.Command,
  213. ResourceSpecId: req.Spec.SourceSpecId,
  214. ImageId: req.ImageId,
  215. ImageUrl: req.ImageUrl,
  216. CenterID: centerID,
  217. CenterName: centerName,
  218. ReplicaNum: 1,
  219. Datasets: datasetGrampus,
  220. Models: modelGrampus,
  221. Code: codeGrampus,
  222. BootFile: req.BootFile,
  223. },
  224. },
  225. })
  226. if err != nil {
  227. log.Error("createJob failed: %v", err.Error())
  228. return "", err
  229. }
  230. jobID := jobResult.JobInfo.JobID
  231. err = models.CreateCloudbrain(&models.Cloudbrain{
  232. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  233. UserID: ctx.User.ID,
  234. RepoID: ctx.Repo.Repository.ID,
  235. JobID: jobID,
  236. JobName: req.JobName,
  237. DisplayJobName: req.DisplayJobName,
  238. JobType: string(models.JobTypeTrain),
  239. Type: models.TypeC2Net,
  240. Uuid: req.Uuid,
  241. DatasetName: req.DatasetNames,
  242. CommitID: req.CommitID,
  243. IsLatestVersion: req.IsLatestVersion,
  244. ComputeResource: req.ComputeResource,
  245. ImageID: req.ImageId,
  246. TrainUrl: req.TrainUrl,
  247. BranchName: req.BranchName,
  248. Parameters: req.Params,
  249. BootFile: req.BootFile,
  250. DataUrl: req.DataUrl,
  251. Description: req.Description,
  252. WorkServerNumber: req.WorkServerNumber,
  253. EngineName: req.EngineName,
  254. VersionCount: req.VersionCount,
  255. TotalVersionCount: req.TotalVersionCount,
  256. CreatedUnix: createTime,
  257. UpdatedUnix: createTime,
  258. Spec: req.Spec,
  259. ModelName: req.ModelName,
  260. ModelVersion: req.ModelVersion,
  261. LabelName: req.LabelName,
  262. PreTrainModelUrl: req.PreTrainModelUrl,
  263. CkptName: req.CkptName,
  264. })
  265. if err != nil {
  266. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  267. return "", err
  268. }
  269. var actionType models.ActionType
  270. if req.ComputeResource == models.NPUResource {
  271. actionType = models.ActionCreateGrampusNPUTrainTask
  272. } else if req.ComputeResource == models.GPUResource {
  273. actionType = models.ActionCreateGrampusGPUTrainTask
  274. }
  275. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  276. return jobID, nil
  277. }
  278. func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
  279. var centerID []string
  280. var centerName []string
  281. includeCenters := make(map[string]string)
  282. excludeCenters := make(map[string]string)
  283. if SpecialPools != nil {
  284. for _, pool := range SpecialPools.Pools {
  285. if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
  286. org, _ := models.GetOrgByName(pool.Org)
  287. if org != nil {
  288. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  289. if isOrgMember {
  290. for _, info := range pool.Pool {
  291. includeCenters[info.Queue] = info.Value
  292. }
  293. } else {
  294. for _, info := range pool.Pool {
  295. excludeCenters[info.Queue] = info.Value
  296. }
  297. }
  298. }
  299. }
  300. }
  301. }
  302. if len(includeCenters) > 0 {
  303. //如果有专属资源池,根据专属资源池指定智算中心
  304. for k, v := range includeCenters {
  305. centerID = append(centerID, k)
  306. centerName = append(centerName, v)
  307. }
  308. } else if len(excludeCenters) > 0 {
  309. //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
  310. allCenters := make(map[string]string)
  311. specs, err := GetResourceSpecs(req.ProcessType)
  312. if err == nil {
  313. for _, info := range specs.Infos {
  314. for _, center := range info.Centers {
  315. allCenters[center.ID] = center.Name
  316. }
  317. }
  318. }
  319. for k, _ := range excludeCenters {
  320. delete(allCenters, k)
  321. }
  322. for k, v := range allCenters {
  323. centerID = append(centerID, k)
  324. centerName = append(centerName, v)
  325. }
  326. }
  327. return centerID, centerName
  328. }
  329. func TransTrainJobStatus(status string) string {
  330. if status == models.GrampusStatusPending {
  331. status = models.GrampusStatusWaiting
  332. }
  333. return strings.ToUpper(status)
  334. }
  335. func InitSpecialPool() {
  336. if SpecialPools == nil && setting.Grampus.SpecialPools != "" {
  337. json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools)
  338. }
  339. }
  340. func GetNpuModelRemoteObsUrl(jobName string) string {
  341. return "s3:///" + BucketRemote + "/" + GetNpuModelObjectKey(jobName)
  342. }
  343. func GetNpuModelObjectKey(jobName string) string {
  344. return setting.CodePathPrefix + jobName + RemoteModelPath
  345. }
  346. func GetRemoteEndPoint(aiCenterID string) string {
  347. var endPoint string
  348. for _, info := range setting.CenterInfos.Info {
  349. if info.CenterID == aiCenterID {
  350. endPoint = info.Endpoint
  351. break
  352. }
  353. }
  354. return endPoint
  355. }
  356. func GetCenterProxy(aiCenterID string) string {
  357. var proxy string
  358. for _, info := range setting.CenterInfos.Info {
  359. if info.CenterID == aiCenterID {
  360. proxy = info.StorageProxyServer
  361. break
  362. }
  363. }
  364. return proxy
  365. }