You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

grampus.go 13 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. package grampus
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "strings"
  6. "code.gitea.io/gitea/models"
  7. "code.gitea.io/gitea/modules/cloudbrain"
  8. "code.gitea.io/gitea/modules/context"
  9. "code.gitea.io/gitea/modules/log"
  10. "code.gitea.io/gitea/modules/notification"
  11. "code.gitea.io/gitea/modules/setting"
  12. "code.gitea.io/gitea/modules/timeutil"
  13. )
  14. const (
  15. JobPath = "job/"
  16. ProcessorTypeNPU = "npu.huawei.com/NPU"
  17. ProcessorTypeGPU = "nvidia.com/gpu"
  18. GpuWorkDir = "/tmp/"
  19. NpuWorkDir = "/cache/"
  20. NpuLocalLogUrl = "/tmp/train.log"
  21. CommandPrepareScriptNpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;"
  22. CodeArchiveName = "master.zip"
  23. BucketRemote = "grampus"
  24. RemoteModelPath = "/output/" + models.ModelSuffix
  25. autoStopDurationMs = 4 * 60 * 60 * 1000
  26. CommandGpuDebug = "%s! [ -x \"$(command -v jupyter)\" ] && pip install jupyterlab==3 -i https://pypi.tuna.tsinghua.edu.cn/simple;jupyter lab --no-browser --ip=0.0.0.0 --allow-root --notebook-dir='/code' --port=$OCTOPUS_NOTEBOOK_PORT --LabApp.token='' --LabApp.allow_origin='*' --LabApp.base_url=$OCTOPUS_NOTEBOOK_BASE_URL;"
  27. )
  28. var (
  29. poolInfos *models.PoolInfos
  30. FlavorInfos *setting.StFlavorInfos
  31. ImageInfos *setting.StImageInfosModelArts
  32. SpecialPools *models.SpecialPools
  33. CommandPrepareScriptGpu = ";mkdir -p output;mkdir -p code;mkdir -p dataset;mkdir -p pretrainmodel;echo \"start loading script\";wget -q https://git.openi.org.cn/OpenIOSSG/%s/archive/master.zip;" +
  34. "echo \"finish loading script\";unzip -q master.zip;cd %s;chmod 777 downloader_for_obs uploader_for_npu downloader_for_minio uploader_for_gpu;"
  35. )
  36. type GenerateTrainJobReq struct {
  37. JobName string
  38. Command string
  39. ImageUrl string //与image_id二选一,都有的情况下优先image_url
  40. ImageId string
  41. DisplayJobName string
  42. Uuid string
  43. Description string
  44. CodeObsPath string
  45. BootFile string
  46. BootFileUrl string
  47. DataUrl string
  48. TrainUrl string
  49. WorkServerNumber int
  50. EngineID int64
  51. CommitID string
  52. IsLatestVersion string
  53. BranchName string
  54. PreVersionId int64
  55. PreVersionName string
  56. VersionCount int
  57. EngineName string
  58. TotalVersionCount int
  59. ComputeResource string
  60. ProcessType string
  61. DatasetNames string
  62. DatasetInfos map[string]models.DatasetInfo
  63. Params string
  64. ModelName string
  65. LabelName string
  66. CkptName string
  67. ModelVersion string
  68. PreTrainModelPath string
  69. PreTrainModelUrl string
  70. Spec *models.Specification
  71. CodeName string
  72. }
  73. type GenerateNotebookJobReq struct {
  74. JobName string
  75. Command string
  76. ImageUrl string
  77. ImageId string
  78. DisplayJobName string
  79. Uuid string
  80. Description string
  81. CodeStoragePath string
  82. CommitID string
  83. BranchName string
  84. ComputeResource string
  85. ProcessType string
  86. DatasetNames string
  87. DatasetInfos map[string]models.DatasetInfo
  88. Spec *models.Specification
  89. CodeName string
  90. }
  91. func getEndPoint() string {
  92. index := strings.Index(setting.Endpoint, "//")
  93. endpoint := setting.Endpoint[index+2:]
  94. return endpoint
  95. }
  96. func getDatasetGrampus(datasetInfos map[string]models.DatasetInfo) []models.GrampusDataset {
  97. var datasetGrampus []models.GrampusDataset
  98. endPoint := getEndPoint()
  99. for _, datasetInfo := range datasetInfos {
  100. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  101. Name: datasetInfo.FullName,
  102. Bucket: setting.Bucket,
  103. EndPoint: endPoint,
  104. ObjectKey: datasetInfo.DataLocalPath + datasetInfo.FullName,
  105. })
  106. }
  107. return datasetGrampus
  108. }
  109. func getDatasetGPUGrampus(datasetInfos map[string]models.DatasetInfo) ([]models.GrampusDataset, string) {
  110. var datasetGrampus []models.GrampusDataset
  111. var command = ""
  112. for uuid, datasetInfo := range datasetInfos {
  113. datasetGrampus = append(datasetGrampus, models.GrampusDataset{
  114. Name: datasetInfo.FullName,
  115. Bucket: setting.Attachment.Minio.Bucket,
  116. EndPoint: setting.Attachment.Minio.Endpoint,
  117. ObjectKey: datasetInfo.DataLocalPath,
  118. ReadOnly: true,
  119. ContainerPath: "/dataset1/" + datasetInfo.Name,
  120. })
  121. command += "cp /dataset1/'" + datasetInfo.Name + "'/" + uuid + " /dataset/'" + datasetInfo.FullName + "';"
  122. }
  123. return datasetGrampus, command
  124. }
  125. func GenerateNotebookJob(ctx *context.Context, req *GenerateNotebookJobReq) (jobId string, err error) {
  126. createTime := timeutil.TimeStampNow()
  127. var datasetGrampus []models.GrampusDataset
  128. var codeGrampus models.GrampusDataset
  129. var cpCommand string
  130. imageUrl := req.ImageUrl
  131. if ProcessorTypeNPU == req.ProcessType {
  132. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  133. codeGrampus = models.GrampusDataset{
  134. Name: req.CodeName,
  135. Bucket: setting.Bucket,
  136. EndPoint: getEndPoint(),
  137. ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip",
  138. }
  139. imageUrl = ""
  140. req.Command = ""
  141. } else {
  142. datasetGrampus, cpCommand = getDatasetGPUGrampus(req.DatasetInfos)
  143. codeGrampus = models.GrampusDataset{
  144. Name: req.CodeName,
  145. Bucket: setting.Attachment.Minio.Bucket,
  146. EndPoint: setting.Attachment.Minio.Endpoint,
  147. ObjectKey: req.CodeStoragePath + cloudbrain.DefaultBranchName + ".zip",
  148. ReadOnly: false,
  149. ContainerPath: "/code",
  150. }
  151. req.Command = fmt.Sprintf(CommandGpuDebug, cpCommand)
  152. log.Info("debug command:" + req.Command)
  153. }
  154. jobResult, err := createNotebookJob(models.CreateGrampusNotebookRequest{
  155. Name: req.JobName,
  156. Tasks: []models.GrampusNotebookTask{
  157. {
  158. Name: req.JobName,
  159. ResourceSpecId: req.Spec.SourceSpecId,
  160. ImageId: req.ImageId,
  161. ImageUrl: imageUrl,
  162. Datasets: datasetGrampus,
  163. Code: codeGrampus,
  164. AutoStopDuration: autoStopDurationMs,
  165. Capacity: setting.Capacity,
  166. Command: req.Command,
  167. },
  168. },
  169. })
  170. if err != nil {
  171. log.Error("createNotebookJob failed: %v", err.Error())
  172. return "", err
  173. }
  174. jobID := jobResult.JobInfo.JobID
  175. err = models.CreateCloudbrain(&models.Cloudbrain{
  176. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  177. UserID: ctx.User.ID,
  178. RepoID: ctx.Repo.Repository.ID,
  179. JobID: jobID,
  180. JobName: req.JobName,
  181. DisplayJobName: req.DisplayJobName,
  182. JobType: string(models.JobTypeDebug),
  183. Type: models.TypeC2Net,
  184. Uuid: req.Uuid,
  185. DatasetName: req.DatasetNames,
  186. CommitID: req.CommitID,
  187. IsLatestVersion: "1",
  188. ComputeResource: req.ComputeResource,
  189. ImageID: req.ImageId,
  190. BranchName: req.BranchName,
  191. Description: req.Description,
  192. WorkServerNumber: 1,
  193. EngineName: req.ImageUrl,
  194. CreatedUnix: createTime,
  195. UpdatedUnix: createTime,
  196. Spec: req.Spec,
  197. })
  198. if err != nil {
  199. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  200. return "", err
  201. }
  202. var actionType models.ActionType
  203. if req.ComputeResource == models.NPUResource {
  204. actionType = models.ActionCreateGrampusNPUDebugTask
  205. } else if req.ComputeResource == models.GPUResource {
  206. actionType = models.ActionCreateGrampusGPUDebugTask
  207. }
  208. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  209. return jobID, nil
  210. }
  211. func GenerateTrainJob(ctx *context.Context, req *GenerateTrainJobReq) (jobId string, err error) {
  212. createTime := timeutil.TimeStampNow()
  213. centerID, centerName := getCentersParamter(ctx, req)
  214. var datasetGrampus, modelGrampus []models.GrampusDataset
  215. var codeGrampus models.GrampusDataset
  216. if ProcessorTypeNPU == req.ProcessType {
  217. datasetGrampus = getDatasetGrampus(req.DatasetInfos)
  218. if len(req.ModelName) != 0 {
  219. modelGrampus = []models.GrampusDataset{
  220. {
  221. Name: req.ModelName,
  222. Bucket: setting.Bucket,
  223. EndPoint: getEndPoint(),
  224. ObjectKey: req.PreTrainModelPath,
  225. },
  226. }
  227. }
  228. codeGrampus = models.GrampusDataset{
  229. Name: req.CodeName,
  230. Bucket: setting.Bucket,
  231. EndPoint: getEndPoint(),
  232. ObjectKey: req.CodeObsPath + cloudbrain.DefaultBranchName + ".zip",
  233. }
  234. }
  235. jobResult, err := createJob(models.CreateGrampusJobRequest{
  236. Name: req.JobName,
  237. Tasks: []models.GrampusTasks{
  238. {
  239. Name: req.JobName,
  240. Command: req.Command,
  241. ResourceSpecId: req.Spec.SourceSpecId,
  242. ImageId: req.ImageId,
  243. ImageUrl: req.ImageUrl,
  244. CenterID: centerID,
  245. CenterName: centerName,
  246. ReplicaNum: 1,
  247. Datasets: datasetGrampus,
  248. Models: modelGrampus,
  249. Code: codeGrampus,
  250. BootFile: req.BootFile,
  251. },
  252. },
  253. })
  254. if err != nil {
  255. log.Error("createJob failed: %v", err.Error())
  256. return "", err
  257. }
  258. jobID := jobResult.JobInfo.JobID
  259. err = models.CreateCloudbrain(&models.Cloudbrain{
  260. Status: TransTrainJobStatus(jobResult.JobInfo.Status),
  261. UserID: ctx.User.ID,
  262. RepoID: ctx.Repo.Repository.ID,
  263. JobID: jobID,
  264. JobName: req.JobName,
  265. DisplayJobName: req.DisplayJobName,
  266. JobType: string(models.JobTypeTrain),
  267. Type: models.TypeC2Net,
  268. Uuid: req.Uuid,
  269. DatasetName: req.DatasetNames,
  270. CommitID: req.CommitID,
  271. IsLatestVersion: req.IsLatestVersion,
  272. ComputeResource: req.ComputeResource,
  273. ImageID: req.ImageId,
  274. TrainUrl: req.TrainUrl,
  275. BranchName: req.BranchName,
  276. Parameters: req.Params,
  277. BootFile: req.BootFile,
  278. DataUrl: req.DataUrl,
  279. Description: req.Description,
  280. WorkServerNumber: req.WorkServerNumber,
  281. EngineName: req.EngineName,
  282. VersionCount: req.VersionCount,
  283. TotalVersionCount: req.TotalVersionCount,
  284. CreatedUnix: createTime,
  285. UpdatedUnix: createTime,
  286. Spec: req.Spec,
  287. ModelName: req.ModelName,
  288. ModelVersion: req.ModelVersion,
  289. LabelName: req.LabelName,
  290. PreTrainModelUrl: req.PreTrainModelUrl,
  291. CkptName: req.CkptName,
  292. })
  293. if err != nil {
  294. log.Error("CreateCloudbrain(%s) failed:%v", req.DisplayJobName, err.Error())
  295. return "", err
  296. }
  297. var actionType models.ActionType
  298. if req.ComputeResource == models.NPUResource {
  299. actionType = models.ActionCreateGrampusNPUTrainTask
  300. } else if req.ComputeResource == models.GPUResource {
  301. actionType = models.ActionCreateGrampusGPUTrainTask
  302. }
  303. notification.NotifyOtherTask(ctx.User, ctx.Repo.Repository, jobID, req.DisplayJobName, actionType)
  304. return jobID, nil
  305. }
  306. func getCentersParamter(ctx *context.Context, req *GenerateTrainJobReq) ([]string, []string) {
  307. var centerID []string
  308. var centerName []string
  309. includeCenters := make(map[string]string)
  310. excludeCenters := make(map[string]string)
  311. if SpecialPools != nil {
  312. for _, pool := range SpecialPools.Pools {
  313. if !pool.IsExclusive && strings.Contains(req.ComputeResource, pool.Type) {
  314. org, _ := models.GetOrgByName(pool.Org)
  315. if org != nil {
  316. isOrgMember, _ := models.IsOrganizationMember(org.ID, ctx.User.ID)
  317. if isOrgMember {
  318. for _, info := range pool.Pool {
  319. includeCenters[info.Queue] = info.Value
  320. }
  321. } else {
  322. for _, info := range pool.Pool {
  323. excludeCenters[info.Queue] = info.Value
  324. }
  325. }
  326. }
  327. }
  328. }
  329. }
  330. if len(includeCenters) > 0 {
  331. //如果有专属资源池,根据专属资源池指定智算中心
  332. for k, v := range includeCenters {
  333. centerID = append(centerID, k)
  334. centerName = append(centerName, v)
  335. }
  336. } else if len(excludeCenters) > 0 {
  337. //否则,有要排除的中心,先获取所有中心,删除其中的排除中心,得到指定的智算中心
  338. allCenters := make(map[string]string)
  339. specs, err := GetResourceSpecs(req.ProcessType)
  340. if err == nil {
  341. for _, info := range specs.Infos {
  342. for _, center := range info.Centers {
  343. allCenters[center.ID] = center.Name
  344. }
  345. }
  346. }
  347. for k, _ := range excludeCenters {
  348. delete(allCenters, k)
  349. }
  350. for k, v := range allCenters {
  351. centerID = append(centerID, k)
  352. centerName = append(centerName, v)
  353. }
  354. }
  355. return centerID, centerName
  356. }
  357. func TransTrainJobStatus(status string) string {
  358. if status == models.GrampusStatusPending {
  359. status = models.GrampusStatusWaiting
  360. }
  361. return strings.ToUpper(status)
  362. }
  363. func InitSpecialPool() {
  364. if SpecialPools == nil && setting.Grampus.SpecialPools != "" {
  365. json.Unmarshal([]byte(setting.Grampus.SpecialPools), &SpecialPools)
  366. }
  367. }
  368. func GetNpuModelRemoteObsUrl(jobName string) string {
  369. return "s3:///" + BucketRemote + "/" + GetNpuModelObjectKey(jobName)
  370. }
  371. func GetNpuModelObjectKey(jobName string) string {
  372. return setting.CodePathPrefix + jobName + RemoteModelPath
  373. }
  374. func GetRemoteEndPoint(aiCenterID string) string {
  375. var endPoint string
  376. for _, info := range setting.CenterInfos.Info {
  377. if info.CenterID == aiCenterID {
  378. endPoint = info.Endpoint
  379. break
  380. }
  381. }
  382. return endPoint
  383. }
  384. func GetCenterProxy(aiCenterID string) string {
  385. var proxy string
  386. for _, info := range setting.CenterInfos.Info {
  387. if info.CenterID == aiCenterID {
  388. proxy = info.StorageProxyServer
  389. break
  390. }
  391. }
  392. return proxy
  393. }