You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

resource_specification.go 21 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710
  1. package resource
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "code.gitea.io/gitea/models"
  10. "code.gitea.io/gitea/modules/cloudbrain"
  11. "code.gitea.io/gitea/modules/convert"
  12. "code.gitea.io/gitea/modules/grampus"
  13. "code.gitea.io/gitea/modules/log"
  14. "code.gitea.io/gitea/modules/modelarts"
  15. "code.gitea.io/gitea/modules/setting"
  16. api "code.gitea.io/gitea/modules/structs"
  17. "code.gitea.io/gitea/routers/response"
  18. "code.gitea.io/gitea/services/admin/operate_log"
  19. )
  20. func AddResourceSpecification(doerId int64, req models.ResourceSpecificationReq) error {
  21. if req.Status == 0 {
  22. req.Status = models.SpecNotVerified
  23. }
  24. spec := req.ToDTO()
  25. if _, err := models.InsertResourceSpecification(spec); err != nil {
  26. return err
  27. }
  28. return nil
  29. }
  30. func UpdateSpecUnitPrice(doerId int64, specId int64, unitPrice int) *response.BizError {
  31. oldSpec, err := models.GetResourceSpecification(&models.ResourceSpecification{ID: specId})
  32. if err != nil {
  33. return response.NewBizError(err)
  34. }
  35. if oldSpec == nil {
  36. return response.SPECIFICATION_NOT_EXIST
  37. }
  38. err = models.UpdateSpecUnitPriceById(specId, unitPrice)
  39. if err != nil {
  40. return response.NewBizError(err)
  41. }
  42. if oldSpec.UnitPrice != unitPrice {
  43. AddSpecOperateLog(doerId, "edit", operate_log.NewLogValues().Add("unitPrice", unitPrice), operate_log.NewLogValues().Add("unitPrice", oldSpec.UnitPrice), specId, fmt.Sprintf("修改资源规格单价从%d积分到%d积分", oldSpec.UnitPrice, unitPrice))
  44. }
  45. return nil
  46. }
  47. func SyncGrampusSpecs(doerId int64) error {
  48. r, err := grampus.GetResourceSpecs("")
  49. if err != nil {
  50. return err
  51. }
  52. log.Info("SyncGrampusSpecs result = %+v", r)
  53. specUpdateList := make([]models.ResourceSpecification, 0)
  54. specInsertList := make([]models.ResourceSpecification, 0)
  55. existIds := make([]int64, 0)
  56. for _, spec := range r.Infos {
  57. for _, c := range spec.Centers {
  58. computeResource := models.ParseComputeResourceFormGrampus(spec.SpecInfo.AccDeviceKind)
  59. if computeResource == "" {
  60. continue
  61. }
  62. accCardType := strings.ToUpper(spec.SpecInfo.AccDeviceModel)
  63. memGiB, err := models.ParseMemSizeFromGrampus(spec.SpecInfo.MemorySize)
  64. gpuMemGiB, err := models.ParseMemSizeFromGrampus(spec.SpecInfo.AccDeviceMemory)
  65. if err != nil {
  66. log.Error("ParseMemSizeFromGrampus error. MemorySize=%s AccDeviceMemory=%s", spec.SpecInfo.MemorySize, spec.SpecInfo.AccDeviceMemory)
  67. }
  68. // get resource queue.if queue not exist,skip it
  69. r, err := models.GetResourceQueue(&models.ResourceQueue{
  70. Cluster: models.C2NetCluster,
  71. AiCenterCode: c.ID,
  72. ComputeResource: computeResource,
  73. AccCardType: accCardType,
  74. })
  75. if err != nil || r == nil {
  76. continue
  77. }
  78. //Determine if this specification already exists.if exist,update params
  79. //if not exist,insert a new record and status is SpecNotVerified
  80. oldSpec, err := models.GetResourceSpecification(&models.ResourceSpecification{
  81. QueueId: r.ID,
  82. SourceSpecId: spec.ID,
  83. })
  84. if err != nil {
  85. return err
  86. }
  87. if oldSpec == nil {
  88. specInsertList = append(specInsertList, models.ResourceSpecification{
  89. QueueId: r.ID,
  90. SourceSpecId: spec.ID,
  91. AccCardsNum: spec.SpecInfo.AccDeviceNum,
  92. CpuCores: spec.SpecInfo.CpuCoreNum,
  93. MemGiB: memGiB,
  94. GPUMemGiB: gpuMemGiB,
  95. Status: models.SpecNotVerified,
  96. IsAutomaticSync: true,
  97. IsAvailable: true,
  98. CreatedBy: doerId,
  99. UpdatedBy: doerId,
  100. })
  101. } else {
  102. existIds = append(existIds, oldSpec.ID)
  103. specUpdateList = append(specUpdateList, models.ResourceSpecification{
  104. ID: oldSpec.ID,
  105. AccCardsNum: spec.SpecInfo.AccDeviceNum,
  106. CpuCores: spec.SpecInfo.CpuCoreNum,
  107. MemGiB: memGiB,
  108. GPUMemGiB: gpuMemGiB,
  109. IsAvailable: true,
  110. UpdatedBy: doerId,
  111. })
  112. }
  113. }
  114. }
  115. return models.SyncGrampusSpecs(specUpdateList, specInsertList, existIds)
  116. }
  117. //GetResourceSpecificationList returns specification and queue
  118. func GetResourceSpecificationList(opts models.SearchResourceSpecificationOptions) (*models.ResourceSpecAndQueueListRes, error) {
  119. n, r, err := models.SearchResourceSpecification(opts)
  120. if err != nil {
  121. return nil, err
  122. }
  123. return models.NewResourceSpecAndQueueListRes(n, r), nil
  124. }
  125. //GetAllDistinctResourceSpecification returns specification and queue after distinct
  126. //totalSize is always 0 here
  127. func GetAllDistinctResourceSpecification(opts models.SearchResourceSpecificationOptions) (*models.ResourceSpecAndQueueListRes, error) {
  128. opts.Page = 0
  129. opts.PageSize = 1000
  130. opts.OrderBy = models.SearchSpecOrder4Standard
  131. _, r, err := models.SearchResourceSpecification(opts)
  132. if err != nil {
  133. return nil, err
  134. }
  135. nr := distinctResourceSpecAndQueue(r)
  136. return models.NewResourceSpecAndQueueListRes(0, nr), nil
  137. }
  138. func distinctResourceSpecAndQueue(r []models.ResourceSpecAndQueue) []models.ResourceSpecAndQueue {
  139. specs := make([]models.ResourceSpecAndQueue, 0, len(r))
  140. sourceSpecIdMap := make(map[string]models.ResourceSpecAndQueue, 0)
  141. for i := 0; i < len(r); i++ {
  142. spec := r[i]
  143. if spec.SourceSpecId == "" {
  144. specs = append(specs, spec)
  145. continue
  146. }
  147. if _, has := sourceSpecIdMap[spec.SourceSpecId]; has {
  148. //prefer to use on-shelf spec
  149. if sourceSpecIdMap[spec.SourceSpecId].Status != spec.Status && spec.Status == models.SpecOnShelf {
  150. for k, v := range specs {
  151. if v.ResourceSpecification.ID == sourceSpecIdMap[spec.SourceSpecId].ResourceSpecification.ID {
  152. specs[k] = spec
  153. }
  154. }
  155. }
  156. continue
  157. }
  158. specs = append(specs, spec)
  159. sourceSpecIdMap[spec.SourceSpecId] = spec
  160. }
  161. return specs
  162. }
  163. func GetResourceSpecificationScenes(specId int64) ([]models.ResourceSceneBriefRes, error) {
  164. r, err := models.GetSpecScenes(specId)
  165. if err != nil {
  166. return nil, err
  167. }
  168. return r, nil
  169. }
  170. func ResourceSpecOnShelf(doerId int64, id int64, unitPrice int) *response.BizError {
  171. spec, err := models.GetResourceSpecification(&models.ResourceSpecification{ID: id})
  172. if err != nil {
  173. return response.NewBizError(err)
  174. }
  175. if spec == nil {
  176. return response.SPECIFICATION_NOT_EXIST
  177. }
  178. if q, err := models.GetResourceQueue(&models.ResourceQueue{ID: spec.QueueId}); err != nil || q == nil {
  179. return response.RESOURCE_QUEUE_NOT_AVAILABLE
  180. }
  181. if !spec.IsAvailable {
  182. return response.SPECIFICATION_NOT_AVAILABLE
  183. }
  184. err = models.ResourceSpecOnShelf(id, unitPrice)
  185. if err != nil {
  186. return response.NewBizError(err)
  187. }
  188. if spec.UnitPrice != unitPrice {
  189. AddSpecOperateLog(doerId, "on-shelf", operate_log.NewLogValues().Add("UnitPrice", unitPrice), operate_log.NewLogValues().Add("UnitPrice", spec.UnitPrice), id, fmt.Sprintf("定价上架资源规格,单价为%d", unitPrice))
  190. } else {
  191. AddSpecOperateLog(doerId, "on-shelf", nil, nil, id, "上架资源规格")
  192. }
  193. return nil
  194. }
  195. func ResourceSpecOffShelf(doerId int64, id int64) *response.BizError {
  196. _, err := models.ResourceSpecOffShelf(id)
  197. if err != nil {
  198. return response.NewBizError(err)
  199. }
  200. AddSpecOperateLog(doerId, "off-shelf", nil, nil, id, "下架资源规格")
  201. return nil
  202. }
  203. func AddSpecOperateLog(doerId int64, operateType string, newValue, oldValue *models.LogValues, specId int64, comment string) {
  204. var newString = ""
  205. var oldString = ""
  206. if newValue != nil {
  207. newString = newValue.JsonString()
  208. }
  209. if oldValue != nil {
  210. oldString = oldValue.JsonString()
  211. }
  212. operate_log.Log(models.AdminOperateLog{
  213. BizType: "SpecOperate",
  214. OperateType: operateType,
  215. OldValue: oldString,
  216. NewValue: newString,
  217. RelatedId: fmt.Sprint(specId),
  218. CreatedBy: doerId,
  219. Comment: comment,
  220. })
  221. }
  222. func FindAvailableSpecs(userId int64, opts models.FindSpecsOptions) ([]*models.Specification, error) {
  223. opts.SpecStatus = models.SpecOnShelf
  224. r, err := models.FindSpecs(opts)
  225. if err != nil {
  226. log.Error("FindAvailableSpecs error.%v", err)
  227. return nil, err
  228. }
  229. //filter exclusive specs
  230. specs := filterExclusiveSpecs(r, userId)
  231. //distinct by sourceSpecId
  232. specs = distinctSpecs(specs)
  233. return specs, err
  234. }
  235. func FindAvailableSpecs4Show(userId int64, opts models.FindSpecsOptions) ([]*api.SpecificationShow, error) {
  236. specs, err := FindAvailableSpecs(userId, opts)
  237. if err != nil {
  238. return nil, err
  239. }
  240. result := make([]*api.SpecificationShow, len(specs))
  241. for i, v := range specs {
  242. result[i] = convert.ToSpecification(v)
  243. }
  244. return result, nil
  245. }
  246. func filterExclusiveSpecs(r []*models.Specification, userId int64) []*models.Specification {
  247. specs := make([]*models.Specification, 0, len(r))
  248. specMap := make(map[int64]string, 0)
  249. for i := 0; i < len(r); i++ {
  250. spec := r[i]
  251. if _, has := specMap[spec.ID]; has {
  252. continue
  253. }
  254. if !spec.IsExclusive {
  255. specs = append(specs, spec)
  256. specMap[spec.ID] = ""
  257. continue
  258. }
  259. orgs := strings.Split(spec.ExclusiveOrg, ";")
  260. for _, org := range orgs {
  261. isMember, _ := models.IsOrganizationMemberByOrgName(org, userId)
  262. if isMember {
  263. specs = append(specs, spec)
  264. specMap[spec.ID] = ""
  265. break
  266. }
  267. }
  268. }
  269. return specs
  270. }
  271. func distinctSpecs(r []*models.Specification) []*models.Specification {
  272. specs := make([]*models.Specification, 0, len(r))
  273. sourceSpecIdMap := make(map[string]string, 0)
  274. for i := 0; i < len(r); i++ {
  275. spec := r[i]
  276. if spec.SourceSpecId == "" {
  277. specs = append(specs, spec)
  278. continue
  279. }
  280. if _, has := sourceSpecIdMap[spec.SourceSpecId]; has {
  281. continue
  282. }
  283. specs = append(specs, spec)
  284. sourceSpecIdMap[spec.SourceSpecId] = ""
  285. }
  286. return specs
  287. }
  288. func GetAndCheckSpec(userId int64, specId int64, opts models.FindSpecsOptions) (*models.Specification, error) {
  289. if specId == 0 {
  290. return nil, nil
  291. }
  292. opts.SpecId = specId
  293. r, err := FindAvailableSpecs(userId, opts)
  294. if err != nil {
  295. return nil, err
  296. }
  297. if r == nil || len(r) == 0 {
  298. return nil, nil
  299. }
  300. return r[0], nil
  301. }
  302. func InsertCloudbrainSpec(cloudbrainId int64, s *models.Specification) error {
  303. c := models.CloudbrainSpec{
  304. CloudbrainID: cloudbrainId,
  305. SpecId: s.ID,
  306. SourceSpecId: s.SourceSpecId,
  307. AccCardsNum: s.AccCardsNum,
  308. AccCardType: s.AccCardType,
  309. CpuCores: s.CpuCores,
  310. MemGiB: s.MemGiB,
  311. GPUMemGiB: s.GPUMemGiB,
  312. ShareMemGiB: s.ShareMemGiB,
  313. ComputeResource: s.ComputeResource,
  314. UnitPrice: s.UnitPrice,
  315. QueueId: s.QueueId,
  316. QueueCode: s.QueueCode,
  317. Cluster: s.Cluster,
  318. AiCenterCode: s.AiCenterCode,
  319. AiCenterName: s.AiCenterName,
  320. IsExclusive: s.IsExclusive,
  321. ExclusiveOrg: s.ExclusiveOrg,
  322. }
  323. _, err := models.InsertCloudbrainSpec(c)
  324. if err != nil {
  325. log.Error("InsertCloudbrainSpec error.CloudbrainSpec=%v. err=%v", c, err)
  326. return err
  327. }
  328. return nil
  329. }
  330. func GetCloudbrainSpec(cloudbrainId int64) (*models.Specification, error) {
  331. c, err := models.GetCloudbrainSpecByID(cloudbrainId)
  332. if err != nil {
  333. return nil, err
  334. }
  335. if c == nil {
  336. return nil, nil
  337. }
  338. return c.ConvertToSpecification(), nil
  339. }
  340. func RefreshHistorySpec(scopeAll bool, ids []int64) (int64, int64, error) {
  341. var success int64
  342. var total int64
  343. if !scopeAll {
  344. if ids == nil || len(ids) == 0 {
  345. return 0, 0, nil
  346. }
  347. total = int64(len(ids))
  348. tasks, err := models.GetCloudbrainWithDeletedByIDs(ids)
  349. if err != nil {
  350. return total, 0, err
  351. }
  352. for _, task := range tasks {
  353. err = RefreshOneHistorySpec(task)
  354. if err != nil {
  355. log.Error("RefreshOneHistorySpec error.%v", err)
  356. continue
  357. }
  358. success++
  359. }
  360. } else {
  361. page := 1
  362. pageSize := 100
  363. n, err := models.CountNoSpecHistoricTask()
  364. if err != nil {
  365. log.Error("FindNoSpecHistoricTask CountNoSpecHistoricTask error. e=%v", err)
  366. return 0, 0, err
  367. }
  368. total = n
  369. for i := 0; i < 500; i++ {
  370. list, err := models.FindCloudbrainTask(page, pageSize)
  371. page++
  372. if err != nil {
  373. log.Error("FindCloudbrainTask error.page=%d pageSize=%d e=%v", page, pageSize, err)
  374. return total, success, err
  375. }
  376. if len(list) == 0 {
  377. log.Info("RefreshHistorySpec. list is empty")
  378. break
  379. }
  380. for _, task := range list {
  381. s, err := GetCloudbrainSpec(task.ID)
  382. if err != nil {
  383. log.Error("RefreshHistorySpec GetCloudbrainSpec error.%v", err)
  384. continue
  385. }
  386. if s != nil {
  387. continue
  388. }
  389. err = RefreshOneHistorySpec(task)
  390. if err != nil {
  391. log.Error("RefreshOneHistorySpec error.%v", err)
  392. continue
  393. }
  394. success++
  395. }
  396. if len(list) < pageSize {
  397. log.Info("RefreshHistorySpec. list < pageSize")
  398. break
  399. }
  400. }
  401. }
  402. return total, success, nil
  403. }
  404. func RefreshOneHistorySpec(task *models.Cloudbrain) error {
  405. var spec *models.Specification
  406. var err error
  407. switch task.Type {
  408. case models.TypeCloudBrainOne:
  409. spec, err = getCloudbrainOneSpec(task)
  410. case models.TypeCloudBrainTwo:
  411. spec, err = getCloudbrainTwoSpec(task)
  412. case models.TypeC2Net:
  413. spec, err = getGrampusSpec(task)
  414. }
  415. if err != nil {
  416. log.Error("find spec error,task.ID=%d err=%v", task.ID, err)
  417. return err
  418. }
  419. if spec == nil {
  420. log.Error("find spec failed,task.ID=%d", task.ID)
  421. return errors.New("find spec failed")
  422. }
  423. return InsertCloudbrainSpec(task.ID, spec)
  424. }
  425. func getCloudbrainOneSpec(task *models.Cloudbrain) (*models.Specification, error) {
  426. if task.GpuQueue == "" {
  427. log.Info("gpu queue is empty.task.ID = %d", task.ID)
  428. return nil, nil
  429. }
  430. //find from config
  431. spec, err := findCloudbrainOneSpecFromConfig(task)
  432. if err != nil {
  433. log.Error("getCloudbrainOneSpec findCloudbrainOneSpecFromConfig error.%v", err)
  434. return nil, err
  435. }
  436. if spec != nil {
  437. return spec, nil
  438. }
  439. //find from remote
  440. return findCloudbrainOneSpecFromRemote(task)
  441. }
  442. func findCloudbrainOneSpecFromRemote(task *models.Cloudbrain) (*models.Specification, error) {
  443. time.Sleep(200 * time.Millisecond)
  444. log.Info("start findCloudbrainOneSpecFromRemote")
  445. result, err := cloudbrain.GetJob(task.JobID)
  446. if err != nil {
  447. log.Error("getCloudbrainOneSpec error. %v", err)
  448. return nil, err
  449. }
  450. if result == nil {
  451. log.Info("findCloudbrainOneSpecFromRemote failed,result is empty.task.ID=%d", task.ID)
  452. return nil, nil
  453. }
  454. jobRes, _ := models.ConvertToJobResultPayload(result.Payload)
  455. memSize, _ := models.ParseMemSizeFromGrampus(jobRes.Resource.Memory)
  456. if task.ComputeResource == "CPU/GPU" {
  457. task.ComputeResource = models.GPU
  458. }
  459. var shmMB float32
  460. if jobRes.Config.TaskRoles != nil && len(jobRes.Config.TaskRoles) > 0 {
  461. shmMB = float32(jobRes.Config.TaskRoles[0].ShmMB) / 1024
  462. if jobRes.Config.TaskRoles[0].ShmMB == 103600 {
  463. shmMB = 100
  464. } else if jobRes.Config.TaskRoles[0].ShmMB == 51800 {
  465. shmMB = 50
  466. }
  467. }
  468. opt := models.FindSpecsOptions{
  469. ComputeResource: task.ComputeResource,
  470. Cluster: models.OpenICluster,
  471. AiCenterCode: models.AICenterOfCloudBrainOne,
  472. QueueCode: task.GpuQueue,
  473. AccCardsNum: jobRes.Resource.NvidiaComGpu,
  474. UseAccCardsNum: true,
  475. CpuCores: jobRes.Resource.CPU,
  476. UseCpuCores: true,
  477. MemGiB: memSize,
  478. UseMemGiB: memSize > 0,
  479. ShareMemGiB: shmMB,
  480. UseShareMemGiB: shmMB > 0,
  481. RequestAll: true,
  482. }
  483. specs, err := models.FindSpecs(opt)
  484. if err != nil {
  485. log.Error("getCloudbrainOneSpec from remote error,%v", err)
  486. return nil, err
  487. }
  488. if len(specs) == 1 {
  489. return specs[0], nil
  490. }
  491. if len(specs) == 0 {
  492. s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加")
  493. if err != nil {
  494. log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err)
  495. return nil, nil
  496. }
  497. return s, nil
  498. }
  499. log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt)
  500. return nil, nil
  501. }
  502. func findCloudbrainOneSpecFromConfig(task *models.Cloudbrain) (*models.Specification, error) {
  503. //find from config
  504. var specConfig *models.ResourceSpec
  505. hasSpec := false
  506. if task.JobType == string(models.JobTypeTrain) {
  507. if cloudbrain.TrainResourceSpecs == nil {
  508. json.Unmarshal([]byte(setting.TrainResourceSpecs), &cloudbrain.TrainResourceSpecs)
  509. }
  510. for _, tmp := range cloudbrain.TrainResourceSpecs.ResourceSpec {
  511. if tmp.Id == task.ResourceSpecId {
  512. hasSpec = true
  513. specConfig = tmp
  514. break
  515. }
  516. }
  517. } else if task.JobType == string(models.JobTypeInference) {
  518. if cloudbrain.InferenceResourceSpecs == nil {
  519. json.Unmarshal([]byte(setting.InferenceResourceSpecs), &cloudbrain.InferenceResourceSpecs)
  520. }
  521. for _, tmp := range cloudbrain.InferenceResourceSpecs.ResourceSpec {
  522. if tmp.Id == task.ResourceSpecId {
  523. hasSpec = true
  524. specConfig = tmp
  525. break
  526. }
  527. }
  528. } else {
  529. if cloudbrain.ResourceSpecs == nil {
  530. json.Unmarshal([]byte(setting.ResourceSpecs), &cloudbrain.ResourceSpecs)
  531. }
  532. for _, tmp := range cloudbrain.ResourceSpecs.ResourceSpec {
  533. if tmp.Id == task.ResourceSpecId {
  534. hasSpec = true
  535. specConfig = tmp
  536. break
  537. }
  538. }
  539. }
  540. if !hasSpec && cloudbrain.SpecialPools != nil {
  541. for _, specialPool := range cloudbrain.SpecialPools.Pools {
  542. if specialPool.ResourceSpec != nil {
  543. for _, spec := range specialPool.ResourceSpec {
  544. if task.ResourceSpecId == spec.Id {
  545. hasSpec = true
  546. specConfig = spec
  547. break
  548. }
  549. }
  550. }
  551. }
  552. }
  553. if specConfig == nil {
  554. log.Error("getCloudbrainOneSpec from config failed,task.ResourceSpecId=%d", task.ResourceSpecId)
  555. return nil, nil
  556. }
  557. if task.ComputeResource == "CPU/GPU" {
  558. task.ComputeResource = models.GPU
  559. }
  560. shareMemMiB := float32(specConfig.ShareMemMiB) / 1024
  561. if specConfig.ShareMemMiB == 103600 {
  562. shareMemMiB = 100
  563. } else if specConfig.ShareMemMiB == 51800 {
  564. shareMemMiB = 50
  565. }
  566. opt := models.FindSpecsOptions{
  567. JobType: models.JobType(task.JobType),
  568. ComputeResource: task.ComputeResource,
  569. Cluster: models.OpenICluster,
  570. AiCenterCode: models.AICenterOfCloudBrainOne,
  571. QueueCode: task.GpuQueue,
  572. AccCardsNum: specConfig.GpuNum,
  573. UseAccCardsNum: true,
  574. CpuCores: specConfig.CpuNum,
  575. UseCpuCores: true,
  576. MemGiB: float32(specConfig.MemMiB) / 1024,
  577. UseMemGiB: true,
  578. ShareMemGiB: shareMemMiB,
  579. UseShareMemGiB: true,
  580. RequestAll: true,
  581. }
  582. specs, err := models.FindSpecs(opt)
  583. if err != nil {
  584. log.Error("getCloudbrainOneSpec from config error,%v", err)
  585. return nil, err
  586. }
  587. if len(specs) > 1 {
  588. log.Error("Too many results matched.size=%d opt=%+v", len(specs), opt)
  589. return nil, nil
  590. }
  591. if len(specs) == 0 {
  592. s, err := InitQueueAndSpec(opt, "云脑一", "处理历史云脑任务时自动添加")
  593. if err != nil {
  594. log.Error("getCloudbrainOneSpec InitQueueAndSpec error.err=%v", err)
  595. return nil, nil
  596. }
  597. return s, nil
  598. }
  599. return specs[0], nil
  600. }
  601. func getCloudbrainTwoSpec(task *models.Cloudbrain) (*models.Specification, error) {
  602. specMap, err := models.GetCloudbrainTwoSpecs()
  603. if err != nil {
  604. log.Error("InitCloudbrainTwoSpecs err.%v", err)
  605. return nil, err
  606. }
  607. if task.FlavorCode != "" {
  608. return specMap[task.FlavorCode], nil
  609. }
  610. time.Sleep(200 * time.Millisecond)
  611. log.Info("start getCloudbrainTwoSpec FromRemote")
  612. if task.JobType == string(models.JobTypeDebug) {
  613. result, err := modelarts.GetNotebook2(task.JobID)
  614. if err != nil {
  615. log.Error("getCloudbrainTwoSpec GetNotebook2 error.%v", err)
  616. return nil, err
  617. }
  618. if result != nil {
  619. return specMap[result.Flavor], nil
  620. }
  621. } else if task.JobType == string(models.JobTypeTrain) || task.JobType == string(models.JobTypeInference) {
  622. result, err := modelarts.GetTrainJob(task.JobID, strconv.FormatInt(task.VersionID, 10))
  623. if err != nil {
  624. log.Error("getCloudbrainTwoSpec GetTrainJob error:%v", task.JobName, err)
  625. return nil, err
  626. }
  627. if result != nil {
  628. return specMap[result.Flavor.Code], nil
  629. }
  630. }
  631. return nil, nil
  632. }
  633. func getGrampusSpec(task *models.Cloudbrain) (*models.Specification, error) {
  634. specMap, err := models.GetGrampusSpecs()
  635. if err != nil {
  636. log.Error("GetGrampusSpecs err.%v", err)
  637. return nil, err
  638. }
  639. if task.AiCenter != "" {
  640. c := strings.Split(task.AiCenter, "+")
  641. spec := specMap[task.FlavorCode+"_"+c[0]]
  642. if spec != nil {
  643. return spec, nil
  644. }
  645. }
  646. return specMap[task.FlavorCode], nil
  647. }
  648. func InitQueueAndSpec(opt models.FindSpecsOptions, aiCenterName string, remark string) (*models.Specification, error) {
  649. return models.InitQueueAndSpec(models.ResourceQueue{
  650. QueueCode: opt.QueueCode,
  651. Cluster: opt.Cluster,
  652. AiCenterCode: opt.AiCenterCode,
  653. AiCenterName: aiCenterName,
  654. ComputeResource: opt.ComputeResource,
  655. AccCardType: models.GetCloudbrainOneAccCardType(opt.QueueCode),
  656. Remark: remark,
  657. }, models.ResourceSpecification{
  658. AccCardsNum: opt.AccCardsNum,
  659. CpuCores: opt.CpuCores,
  660. MemGiB: opt.MemGiB,
  661. GPUMemGiB: opt.GPUMemGiB,
  662. ShareMemGiB: opt.ShareMemGiB,
  663. Status: models.SpecOffShelf,
  664. IsAvailable: true,
  665. })
  666. }