You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiService.go 6.6 kB

4 months ago
4 months ago
4 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package service
  2. import (
  3. "github.com/zeromicro/go-zero/zrpc"
  4. "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
  5. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/task/tasksync"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink/octopusHttp"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  14. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  15. "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
  16. "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
  17. "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
  18. "strconv"
  19. "sync"
  20. "time"
  21. )
  22. const (
  23. OCTOPUS = "octopus"
  24. MODELARTS = "modelarts"
  25. SHUGUANGAI = "shuguangAi"
  26. OPENI = "openI"
  27. )
  28. type AiService struct {
  29. AiExecutorAdapterMap map[string]map[string]executor.AiExecutor
  30. AiCollectorAdapterMap map[string]map[string]collector.AiCollector
  31. InferenceAdapterMap map[string]map[string]inference.ICluster
  32. Storage *database.AiStorage
  33. LocalCache map[string]interface{}
  34. Conf *config.Config
  35. TaskSyncLock sync.Mutex
  36. St *tasksync.SyncTrain
  37. Si *tasksync.SyncInfer
  38. }
  39. func NewAiService(conf *config.Config, storages *database.AiStorage, localCache map[string]interface{}) (*AiService, error) {
  40. //var aiType = "1"
  41. var tempAdapterId = "1777144940459986944"
  42. adapterIds := []string{tempAdapterId}
  43. //adapterIds, err := storages.GetAdapterIdsByType(aiType)
  44. //if err != nil {
  45. // return nil, err
  46. //}
  47. aiService := &AiService{
  48. AiExecutorAdapterMap: make(map[string]map[string]executor.AiExecutor),
  49. AiCollectorAdapterMap: make(map[string]map[string]collector.AiCollector),
  50. InferenceAdapterMap: make(map[string]map[string]inference.ICluster),
  51. Storage: storages,
  52. LocalCache: localCache,
  53. Conf: conf,
  54. }
  55. for _, id := range adapterIds {
  56. clusters, err := storages.GetClustersByAdapterId(id)
  57. if err != nil {
  58. return nil, err
  59. }
  60. if len(clusters.List) == 0 {
  61. continue
  62. }
  63. exeClusterMap, colClusterMap, inferMap := InitAiClusterMap(conf, clusters.List)
  64. aiService.AiExecutorAdapterMap[id] = exeClusterMap
  65. aiService.AiCollectorAdapterMap[id] = colClusterMap
  66. aiService.InferenceAdapterMap[id] = inferMap
  67. }
  68. st := tasksync.NewTrainTask(storages, aiService.AiCollectorAdapterMap, conf)
  69. si := tasksync.NewInferTask(storages, aiService.InferenceAdapterMap, conf)
  70. aiService.St = st
  71. aiService.Si = si
  72. return aiService, nil
  73. }
  74. func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector, map[string]inference.ICluster) {
  75. executorMap := make(map[string]executor.AiExecutor)
  76. collectorMap := make(map[string]collector.AiCollector)
  77. inferenceMap := make(map[string]inference.ICluster)
  78. for _, c := range clusters {
  79. switch c.Driver {
  80. case OCTOPUS:
  81. id, _ := strconv.ParseInt(c.Id, 10, 64)
  82. octopus := octopusHttp.NewOctopusHttp(id, c.Nickname, c.Server, c.Address, c.Username, c.Password)
  83. collectorMap[c.Id] = octopus
  84. executorMap[c.Id] = octopus
  85. inferenceMap[c.Id] = octopus
  86. case MODELARTS:
  87. id, _ := strconv.ParseInt(c.Id, 10, 64)
  88. modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
  89. modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
  90. modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
  91. collectorMap[c.Id] = modelarts
  92. executorMap[c.Id] = modelarts
  93. inferenceMap[c.Id] = modelarts
  94. case SHUGUANGAI:
  95. id, _ := strconv.ParseInt(c.Id, 10, 64)
  96. aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf))
  97. sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
  98. collectorMap[c.Id] = sgai
  99. executorMap[c.Id] = sgai
  100. inferenceMap[c.Id] = sgai
  101. case OPENI:
  102. id, _ := strconv.ParseInt(c.Id, 10, 64)
  103. openi := storeLink.NewOpenI("http://localhost:2024", id, c.Username, c.Token, c.Nickname)
  104. collectorMap[c.Id] = openi
  105. executorMap[c.Id] = openi
  106. inferenceMap[c.Id] = openi
  107. }
  108. }
  109. return executorMap, collectorMap, inferenceMap
  110. }
  111. func (as *AiService) UpdateClusterMaps(conf *config.Config, adapterId string, clusters []types.ClusterInfo) {
  112. for _, c := range clusters {
  113. _, ok := as.AiExecutorAdapterMap[adapterId][c.Id]
  114. _, ok2 := as.AiCollectorAdapterMap[adapterId][c.Id]
  115. _, ok3 := as.InferenceAdapterMap[adapterId][c.Id]
  116. if !ok && !ok2 && !ok3 {
  117. switch c.Name {
  118. case OCTOPUS:
  119. id, _ := strconv.ParseInt(c.Id, 10, 64)
  120. octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf))
  121. octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
  122. as.AiExecutorAdapterMap[adapterId][c.Id] = octopus
  123. as.AiCollectorAdapterMap[adapterId][c.Id] = octopus
  124. as.InferenceAdapterMap[adapterId][c.Id] = octopus
  125. case MODELARTS:
  126. id, _ := strconv.ParseInt(c.Id, 10, 64)
  127. modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
  128. modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
  129. modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
  130. as.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
  131. as.AiCollectorAdapterMap[adapterId][c.Id] = modelarts
  132. as.InferenceAdapterMap[adapterId][c.Id] = modelarts
  133. case SHUGUANGAI:
  134. id, _ := strconv.ParseInt(c.Id, 10, 64)
  135. aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf))
  136. sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
  137. as.AiExecutorAdapterMap[adapterId][c.Id] = sgai
  138. as.AiCollectorAdapterMap[adapterId][c.Id] = sgai
  139. as.InferenceAdapterMap[adapterId][c.Id] = sgai
  140. }
  141. } else {
  142. continue
  143. }
  144. }
  145. }
  146. func (as *AiService) HandleDuplicateTaskName(name string, taskType string) (string, error) {
  147. exist, err := as.Storage.DoesTaskNameExist(name, taskType)
  148. if err != nil {
  149. return "", err
  150. }
  151. if exist {
  152. return name + "_" + time.Now().Format(constants.Layout_Time_Suffix), nil
  153. }
  154. return name, nil
  155. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.