You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

createinferencetasklogic.go 8.8 kB

6 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. package inference
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/entity"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/utils/task"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/strategy"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  15. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  16. "github.com/zeromicro/go-zero/core/logx"
  17. )
  18. type CreateInferenceTaskLogic struct {
  19. logx.Logger
  20. ctx context.Context
  21. svcCtx *svc.ServiceContext
  22. }
  23. func NewCreateInferenceTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateInferenceTaskLogic {
  24. return &CreateInferenceTaskLogic{
  25. Logger: logx.WithContext(ctx),
  26. ctx: ctx,
  27. svcCtx: svcCtx,
  28. }
  29. }
  30. const AdapterId = "1777144940459986944"
  31. func (l *CreateInferenceTaskLogic) CreateInferenceTask(req *types.CreateInferenceTaskReq) (resp *types.CreateInferenceTaskResp, err error) {
  32. resp = &types.CreateInferenceTaskResp{}
  33. err = task.ValidateJobResources(req.JobResources, "inference")
  34. if err != nil {
  35. return nil, err
  36. }
  37. clusters, err := generateClustersForTaskCreation(req.DataDistributes, req.Name)
  38. if err != nil {
  39. return nil, err
  40. }
  41. modelName, err := generateModelName(clusters)
  42. if err != nil {
  43. return nil, err
  44. }
  45. taskName, err := l.svcCtx.Scheduler.AiService.HandleDuplicateTaskName(req.Name, "inference")
  46. if err != nil {
  47. return nil, err
  48. }
  49. assignedClusters := task.CopyParams(clusters, req.JobResources.Clusters, "inference")
  50. opt := &option.InferOption{
  51. TaskName: taskName,
  52. TaskDesc: req.Description,
  53. ModelType: "",
  54. ModelName: modelName,
  55. Cmd: "",
  56. }
  57. taskId, err := l.svcCtx.Scheduler.AiStorages.SaveInferDeployTask(taskName, modelName, "", req.Description)
  58. if err != nil {
  59. return nil, err
  60. }
  61. adapterClusterMap := make(map[string][]*strategy.AssignedCluster)
  62. adapterClusterMap[AdapterId] = assignedClusters
  63. err = l.createInferenceTask(taskId, adapterClusterMap, opt)
  64. if err != nil {
  65. return nil, err
  66. }
  67. resp.TaskId = strconv.FormatInt(taskId, 10)
  68. resp.TaskName = taskName
  69. return
  70. }
  71. func (l *CreateInferenceTaskLogic) createInferenceTask(taskId int64, adapterClusterMap map[string][]*strategy.AssignedCluster, option *option.InferOption) error {
  72. var clusterlen int
  73. for _, c := range adapterClusterMap {
  74. clusterlen += len(c)
  75. }
  76. var errCh = make(chan interface{}, clusterlen)
  77. var errs []interface{}
  78. buf := make(chan bool, 2)
  79. var wg sync.WaitGroup
  80. for aid, v := range adapterClusterMap {
  81. for _, c := range v {
  82. wg.Add(1)
  83. cluster := c
  84. buf <- true
  85. go func() {
  86. opt, _ := cloneOption(option)
  87. updateInferOption(cluster, opt)
  88. err := l.createDeployInstance(taskId, aid, cluster.ClusterId, opt)
  89. if err != nil {
  90. e := struct {
  91. err error
  92. clusterId string
  93. }{
  94. err: err,
  95. clusterId: cluster.ClusterId,
  96. }
  97. errCh <- e
  98. wg.Done()
  99. <-buf
  100. return
  101. }
  102. wg.Done()
  103. <-buf
  104. }()
  105. }
  106. }
  107. wg.Wait()
  108. close(errCh)
  109. for e := range errCh {
  110. errs = append(errs, e)
  111. }
  112. if len(errs) != 0 {
  113. var msg string
  114. for _, err := range errs {
  115. e := (err).(struct {
  116. err error
  117. clusterId string
  118. })
  119. clusterName, err := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(e.clusterId)
  120. if err != nil {
  121. clusterName = e.clusterId
  122. }
  123. msg += fmt.Sprintf("CreateInstance Failed # clusterName: %v, error: %v \n", clusterName, e.err.Error())
  124. }
  125. return errors.New(msg)
  126. }
  127. return nil
  128. }
  129. func updateInferOption(cluster *strategy.AssignedCluster, opt *option.InferOption) {
  130. opt.Cmd = cluster.Cmd
  131. opt.Envs = cluster.Envs
  132. opt.Params = cluster.Params
  133. opt.ImageId = cluster.ImageId
  134. opt.AlgorithmId = cluster.CodeId
  135. opt.ModelID = cluster.ModelId
  136. opt.ResourcesRequired = cluster.ResourcesRequired
  137. opt.Output = cluster.Output
  138. }
  139. func generateClustersForTaskCreation(distributes types.DataDistribute, taskName string) ([]*strategy.AssignedCluster, error) {
  140. var assignedClusters []*strategy.AssignedCluster
  141. clusterMap := make(map[string]*strategy.AssignedCluster)
  142. for _, distribute := range distributes.Model {
  143. if len(distribute.Clusters) == 0 {
  144. return nil, fmt.Errorf("Model distribute: must specify at least one cluster")
  145. }
  146. for _, c := range distribute.Clusters {
  147. if c.ClusterID == "" {
  148. return nil, fmt.Errorf("Model distribute: clusterId can not be empty")
  149. }
  150. cluster := &strategy.AssignedCluster{}
  151. cluster.ClusterId = c.ClusterID
  152. jsonData := entity.JsonData{}
  153. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  154. if err != nil {
  155. return nil, fmt.Errorf("jsonData convert failed, task %d, cluster %s, datatype: %s", taskName, c.ClusterID, "Model")
  156. }
  157. if jsonData.Id == "" {
  158. continue
  159. }
  160. cluster.ModelId = jsonData.Id
  161. cluster.ModelName = jsonData.Name
  162. clusterMap[c.ClusterID] = cluster
  163. }
  164. }
  165. for _, distribute := range distributes.Code {
  166. if len(distribute.Clusters) == 0 {
  167. return nil, fmt.Errorf("Code distribute: must specify at least one cluster")
  168. }
  169. for _, c := range distribute.Clusters {
  170. if c.ClusterID == "" {
  171. return nil, fmt.Errorf("Code distribute: clusterId can not be empty")
  172. }
  173. jsonData := entity.JsonData{}
  174. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  175. if err != nil {
  176. return nil, fmt.Errorf("jsonData convert failed, task %d, cluster %s, datatype: %s", taskName, c.ClusterID, "Code")
  177. }
  178. if jsonData.Id == "" {
  179. continue
  180. }
  181. cluster, ok := clusterMap[c.ClusterID]
  182. if ok {
  183. cluster.CodeId = jsonData.Id
  184. }
  185. }
  186. }
  187. for _, distribute := range distributes.Image {
  188. if len(distribute.Clusters) == 0 {
  189. return nil, fmt.Errorf("Image distribute: must specify at least one cluster")
  190. }
  191. for _, c := range distribute.Clusters {
  192. if c.ClusterID == "" {
  193. return nil, fmt.Errorf("Image distribute: clusterId can not be empty")
  194. }
  195. jsonData := entity.JsonData{}
  196. err := json.Unmarshal([]byte(c.JsonData), &jsonData)
  197. if err != nil {
  198. return nil, fmt.Errorf("jsonData convert failed, task %d, cluster %s, datatype: %s", taskName, c.ClusterID, "Image")
  199. }
  200. cluster, ok := clusterMap[c.ClusterID]
  201. if ok {
  202. cluster.ImageId = jsonData.Id
  203. }
  204. }
  205. }
  206. for _, c := range clusterMap {
  207. if c.ModelId == "" {
  208. return nil, fmt.Errorf("create inference task failed, cluster %s, empty data : %s", c.ClusterId, "ModelId")
  209. }
  210. if c.CodeId == "" {
  211. return nil, fmt.Errorf("create inference task failed, cluster %s, empty data : %s", c.ClusterId, "CodeId")
  212. }
  213. if c.ImageId == "" {
  214. return nil, fmt.Errorf("create inference task failed, cluster %s, empty data : %s", c.ClusterId, "ImageId")
  215. }
  216. assignedClusters = append(assignedClusters, c)
  217. }
  218. if len(assignedClusters) == 0 {
  219. return nil, fmt.Errorf("no model provided")
  220. }
  221. return assignedClusters, nil
  222. }
  223. func generateModelName(clusters []*strategy.AssignedCluster) (string, error) {
  224. if len(clusters) == 1 {
  225. return clusters[0].ModelName, nil
  226. }
  227. var modelName string
  228. for _, c := range clusters {
  229. modelName += c.ModelName + ","
  230. }
  231. modelName = strings.TrimSuffix(modelName, ",")
  232. return modelName, nil
  233. }
  234. func (l *CreateInferenceTaskLogic) createDeployInstance(taskId int64, adapterId string, clusterId string, opt *option.InferOption) error {
  235. cmap, found := l.svcCtx.Scheduler.AiService.InferenceAdapterMap[adapterId]
  236. if !found {
  237. return errors.New("adapterId not exist: " + adapterId)
  238. }
  239. iCluster, found := cmap[clusterId]
  240. if !found {
  241. return errors.New("clusterId not exist: " + clusterId)
  242. }
  243. insId, err := iCluster.CreateInferDeployInstance(l.ctx, opt)
  244. if err != nil {
  245. return err
  246. }
  247. aid, err := strconv.ParseInt(adapterId, 10, 64)
  248. if err != nil {
  249. return err
  250. }
  251. cid, err := strconv.ParseInt(clusterId, 10, 64)
  252. if err != nil {
  253. return err
  254. }
  255. adapterName, err := l.svcCtx.Scheduler.AiStorages.GetAdapterNameById(adapterId)
  256. if err != nil {
  257. return err
  258. }
  259. clusterName, err := l.svcCtx.Scheduler.AiStorages.GetClusterNameById(clusterId)
  260. if err != nil {
  261. return err
  262. }
  263. ins, err := iCluster.GetInferDeployInstance(l.ctx, insId)
  264. if err != nil {
  265. return err
  266. }
  267. _, err = l.svcCtx.Scheduler.AiStorages.SaveInferDeployInstance(taskId, ins.InstanceId, ins.InstanceName, aid, adapterName, cid, clusterName, ins.ModelName, ins.ModelType, ins.InferCard, ins.ClusterType)
  268. if err != nil {
  269. return err
  270. }
  271. return nil
  272. }
  273. func cloneOption(opt *option.InferOption) (*option.InferOption, error) {
  274. origJSON, err := json.Marshal(opt)
  275. if err != nil {
  276. return nil, err
  277. }
  278. clone := option.InferOption{}
  279. if err = json.Unmarshal(origJSON, &clone); err != nil {
  280. return nil, err
  281. }
  282. return &clone, nil
  283. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.