You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

textToText.go 4.4 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package textInference
  2. import (
  3. "github.com/zeromicro/go-zero/core/logx"
  4. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/database"
  5. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  6. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/inference"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  9. "net/http"
  10. "strconv"
  11. "sync"
  12. "time"
  13. )
  14. const (
  15. CHAT = "chat"
  16. TEXTTOTEXT_AITYPE = "12"
  17. )
  18. type TextToText struct {
  19. opt *option.InferOption
  20. storage *database.AiStorage
  21. inferAdapter map[string]map[string]inference.ICluster
  22. instance *models.AiInferDeployInstance
  23. cs []*FilteredCluster
  24. }
  25. func NewTextToText(opt *option.InferOption, storage *database.AiStorage, inferAdapter map[string]map[string]inference.ICluster, instance *models.AiInferDeployInstance) (*TextToText, error) {
  26. cs, err := filterClusters(inferAdapter, instance)
  27. if err != nil {
  28. return nil, err
  29. }
  30. return &TextToText{
  31. opt: opt,
  32. storage: storage,
  33. inferAdapter: inferAdapter,
  34. cs: cs,
  35. }, nil
  36. }
  37. func (tt *TextToText) GetAiType() string {
  38. return TEXTTOTEXT_AITYPE
  39. }
  40. func (tt *TextToText) SaveAiTask(id int64, adapterName string) error {
  41. if len(tt.cs) == 0 {
  42. clusterId := tt.opt.AiClusterIds[0]
  43. clusterName, _ := tt.storage.GetClusterNameById(tt.opt.AiClusterIds[0])
  44. err := tt.storage.SaveAiTask(id, tt.opt, adapterName, clusterId, clusterName, "", constants.Failed, "")
  45. if err != nil {
  46. return err
  47. }
  48. tt.storage.AddNoticeInfo(tt.opt.AdapterId, adapterName, "", "", tt.opt.TaskName, "failed", "任务失败")
  49. }
  50. for _, c := range tt.cs {
  51. clusterName, _ := tt.storage.GetClusterNameById(c.clusterId)
  52. err := tt.storage.SaveAiTask(id, tt.opt, adapterName, c.clusterId, clusterName, "", constants.Saved, "")
  53. if err != nil {
  54. return err
  55. }
  56. }
  57. return nil
  58. }
  59. func filterClusters(inferAdapter map[string]map[string]inference.ICluster, instance *models.AiInferDeployInstance) ([]*FilteredCluster, error) {
  60. var cs []*FilteredCluster
  61. var inferurls []*inference.InferUrl
  62. clusterId := strconv.FormatInt(instance.ClusterId, 10)
  63. adapterId := strconv.FormatInt(instance.AdapterId, 10)
  64. r := http.Request{}
  65. deployInstance, err := inferAdapter[adapterId][clusterId].GetInferDeployInstance(r.Context(), instance.InstanceId)
  66. if err != nil {
  67. return nil, err
  68. }
  69. var url inference.InferUrl
  70. url.Url = deployInstance.InferUrl + inference.FORWARD_SLASH + CHAT
  71. url.Card = deployInstance.InferCard
  72. inferurls = append(inferurls, &url)
  73. clusterType := deployInstance.ClusterType
  74. clusterName := deployInstance.ClusterName
  75. var f FilteredCluster
  76. f.urls = inferurls
  77. f.clusterId = clusterId
  78. f.clusterName = clusterName
  79. f.clusterType = clusterType
  80. cs = append(cs, &f)
  81. return cs, nil
  82. }
  83. func filterClustersTemp(opt *option.InferOption, storage *database.AiStorage, inferAdapter map[string]map[string]inference.ICluster) ([]*FilteredCluster, error) {
  84. var wg sync.WaitGroup
  85. var ch = make(chan *FilteredCluster, len(opt.AiClusterIds))
  86. var cs []*FilteredCluster
  87. inferMap := inferAdapter[opt.AdapterId]
  88. for _, clusterId := range opt.AiClusterIds {
  89. wg.Add(1)
  90. go func(cId string) {
  91. r := http.Request{}
  92. clusterInferUrl, err := inferMap[cId].GetClusterInferUrl(r.Context(), opt)
  93. if err != nil {
  94. wg.Done()
  95. return
  96. }
  97. for i, _ := range clusterInferUrl.InferUrls {
  98. clusterInferUrl.InferUrls[i].Url = clusterInferUrl.InferUrls[i].Url + inference.FORWARD_SLASH + CHAT
  99. }
  100. clusterName, _ := storage.GetClusterNameById(cId)
  101. var f FilteredCluster
  102. f.urls = clusterInferUrl.InferUrls
  103. f.clusterId = cId
  104. f.clusterName = clusterName
  105. ch <- &f
  106. wg.Done()
  107. return
  108. }(clusterId)
  109. }
  110. wg.Wait()
  111. close(ch)
  112. for s := range ch {
  113. cs = append(cs, s)
  114. }
  115. return cs, nil
  116. }
  117. func (tt *TextToText) UpdateStatus(aiTaskList []*models.TaskAi, adapterName string) error {
  118. for i, t := range aiTaskList {
  119. if strconv.Itoa(int(t.ClusterId)) == tt.cs[i].clusterId {
  120. t.Status = constants.Completed
  121. t.EndTime = time.Now().Format(time.RFC3339)
  122. url := tt.cs[i].urls[0].Url
  123. t.InferUrl = url
  124. err := tt.storage.UpdateAiTask(t)
  125. if err != nil {
  126. logx.Errorf(err.Error())
  127. return err
  128. }
  129. }
  130. }
  131. tt.storage.AddNoticeInfo(tt.opt.AdapterId, adapterName, "", "", tt.opt.TaskName, "completed", "任务完成")
  132. return nil
  133. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.