You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

aiService.go 6.0 kB

11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package service
  2. import (
  3. "github.com/zeromicro/go-zero/zrpc"
  4. "gitlink.org.cn/JointCloud/pcm-ac/hpcacclient"
  5. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/config"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/collector"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/storeLink"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  13. "gitlink.org.cn/JointCloud/pcm-modelarts/client/imagesservice"
  14. "gitlink.org.cn/JointCloud/pcm-modelarts/client/modelartsservice"
  15. "gitlink.org.cn/JointCloud/pcm-octopus/octopusclient"
  16. "strconv"
  17. "sync"
  18. "time"
  19. )
  20. const (
  21. OCTOPUS = "octopus"
  22. MODELARTS = "modelarts"
  23. SHUGUANGAI = "shuguangAi"
  24. OPENI = "openI"
  25. )
  26. type AiService struct {
  27. AiExecutorAdapterMap map[string]map[string]executor.AiExecutor
  28. AiCollectorAdapterMap map[string]map[string]collector.AiCollector
  29. InferenceAdapterMap map[string]map[string]inference.ICluster
  30. Storage *database.AiStorage
  31. LocalCache map[string]interface{}
  32. Conf *config.Config
  33. TaskSyncLock sync.Mutex
  34. }
  35. func NewAiService(conf *config.Config, storages *database.AiStorage, localCache map[string]interface{}) (*AiService, error) {
  36. var aiType = "1"
  37. adapterIds, err := storages.GetAdapterIdsByType(aiType)
  38. if err != nil {
  39. return nil, err
  40. }
  41. aiService := &AiService{
  42. AiExecutorAdapterMap: make(map[string]map[string]executor.AiExecutor),
  43. AiCollectorAdapterMap: make(map[string]map[string]collector.AiCollector),
  44. InferenceAdapterMap: make(map[string]map[string]inference.ICluster),
  45. Storage: storages,
  46. LocalCache: localCache,
  47. Conf: conf,
  48. }
  49. for _, id := range adapterIds {
  50. clusters, err := storages.GetClustersByAdapterId(id)
  51. if err != nil {
  52. return nil, err
  53. }
  54. if len(clusters.List) == 0 {
  55. continue
  56. }
  57. exeClusterMap, colClusterMap, inferMap := InitAiClusterMap(conf, clusters.List)
  58. aiService.AiExecutorAdapterMap[id] = exeClusterMap
  59. aiService.AiCollectorAdapterMap[id] = colClusterMap
  60. aiService.InferenceAdapterMap[id] = inferMap
  61. }
  62. return aiService, nil
  63. }
  64. func InitAiClusterMap(conf *config.Config, clusters []types.ClusterInfo) (map[string]executor.AiExecutor, map[string]collector.AiCollector, map[string]inference.ICluster) {
  65. executorMap := make(map[string]executor.AiExecutor)
  66. collectorMap := make(map[string]collector.AiCollector)
  67. inferenceMap := make(map[string]inference.ICluster)
  68. for _, c := range clusters {
  69. switch c.Name {
  70. case OCTOPUS:
  71. id, _ := strconv.ParseInt(c.Id, 10, 64)
  72. octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf))
  73. octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
  74. collectorMap[c.Id] = octopus
  75. executorMap[c.Id] = octopus
  76. inferenceMap[c.Id] = octopus
  77. case MODELARTS:
  78. id, _ := strconv.ParseInt(c.Id, 10, 64)
  79. modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
  80. modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
  81. modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
  82. collectorMap[c.Id] = modelarts
  83. executorMap[c.Id] = modelarts
  84. inferenceMap[c.Id] = modelarts
  85. case SHUGUANGAI:
  86. id, _ := strconv.ParseInt(c.Id, 10, 64)
  87. aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf))
  88. sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
  89. collectorMap[c.Id] = sgai
  90. executorMap[c.Id] = sgai
  91. inferenceMap[c.Id] = sgai
  92. case OPENI:
  93. id, _ := strconv.ParseInt(c.Id, 10, 64)
  94. openi := storeLink.NewOpenI(c.Server, id, c.Username, c.Token, c.Nickname)
  95. collectorMap[c.Id] = openi
  96. executorMap[c.Id] = openi
  97. inferenceMap[c.Id] = openi
  98. }
  99. }
  100. return executorMap, collectorMap, inferenceMap
  101. }
  102. func (as *AiService) UpdateClusterMaps(conf *config.Config, adapterId string, clusters []types.ClusterInfo) {
  103. for _, c := range clusters {
  104. _, ok := as.AiExecutorAdapterMap[adapterId][c.Id]
  105. _, ok2 := as.AiCollectorAdapterMap[adapterId][c.Id]
  106. _, ok3 := as.InferenceAdapterMap[adapterId][c.Id]
  107. if !ok && !ok2 && !ok3 {
  108. switch c.Name {
  109. case OCTOPUS:
  110. id, _ := strconv.ParseInt(c.Id, 10, 64)
  111. octopusRpc := octopusclient.NewOctopus(zrpc.MustNewClient(conf.OctopusRpcConf))
  112. octopus := storeLink.NewOctopusLink(octopusRpc, c.Nickname, id)
  113. as.AiExecutorAdapterMap[adapterId][c.Id] = octopus
  114. as.AiCollectorAdapterMap[adapterId][c.Id] = octopus
  115. as.InferenceAdapterMap[adapterId][c.Id] = octopus
  116. case MODELARTS:
  117. id, _ := strconv.ParseInt(c.Id, 10, 64)
  118. modelArtsRpc := modelartsservice.NewModelArtsService(zrpc.MustNewClient(conf.ModelArtsRpcConf))
  119. modelArtsImgRpc := imagesservice.NewImagesService(zrpc.MustNewClient(conf.ModelArtsImgRpcConf))
  120. modelarts := storeLink.NewModelArtsLink(modelArtsRpc, modelArtsImgRpc, c.Name, id, c.Nickname)
  121. as.AiExecutorAdapterMap[adapterId][c.Id] = modelarts
  122. as.AiCollectorAdapterMap[adapterId][c.Id] = modelarts
  123. as.InferenceAdapterMap[adapterId][c.Id] = modelarts
  124. case SHUGUANGAI:
  125. id, _ := strconv.ParseInt(c.Id, 10, 64)
  126. aCRpc := hpcacclient.NewHpcAC(zrpc.MustNewClient(conf.ACRpcConf))
  127. sgai := storeLink.NewShuguangAi(aCRpc, c.Nickname, id)
  128. as.AiExecutorAdapterMap[adapterId][c.Id] = sgai
  129. as.AiCollectorAdapterMap[adapterId][c.Id] = sgai
  130. as.InferenceAdapterMap[adapterId][c.Id] = sgai
  131. }
  132. } else {
  133. continue
  134. }
  135. }
  136. }
  137. func (as *AiService) HandleDuplicateTaskName(name string) (string, error) {
  138. exist, err := as.Storage.DoesTaskNameExist(name)
  139. if err != nil {
  140. return "", err
  141. }
  142. if exist {
  143. return name + "_" + time.Now().Format(constants.Layout_Time_Suffix), nil
  144. }
  145. return name, nil
  146. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.