You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

commitgeneraltasklogic.go 6.8 kB

10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package cloud
  2. import (
  3. "bytes"
  4. "context"
  5. "github.com/pkg/errors"
  6. clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  14. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/remoteUtil"
  15. "io"
  16. "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  17. "k8s.io/apimachinery/pkg/runtime"
  18. syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
  19. "k8s.io/apimachinery/pkg/util/json"
  20. kyaml "k8s.io/apimachinery/pkg/util/yaml"
  21. "strconv"
  22. "strings"
  23. "time"
  24. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  25. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  26. "github.com/zeromicro/go-zero/core/logx"
  27. )
  28. type CommitGeneralTaskLogic struct {
  29. logx.Logger
  30. ctx context.Context
  31. svcCtx *svc.ServiceContext
  32. }
  33. func NewCommitGeneralTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitGeneralTaskLogic {
  34. return &CommitGeneralTaskLogic{
  35. Logger: logx.WithContext(ctx),
  36. ctx: ctx,
  37. svcCtx: svcCtx,
  38. }
  39. }
  40. func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) (resp *types.GeneralTaskResp, err error) {
  41. // todo: add your logic here and delete this line
  42. resp = &types.GeneralTaskResp{}
  43. tx := l.svcCtx.DbEngin.Begin()
  44. // 执行回滚或者提交操作
  45. defer func() {
  46. if p := recover(); p != nil {
  47. tx.Rollback()
  48. logx.Error(p)
  49. } else if tx.Error != nil {
  50. logx.Info("rollback, error", tx.Error)
  51. tx.Rollback()
  52. } else {
  53. tx = tx.Commit()
  54. logx.Info("commit success")
  55. }
  56. }()
  57. adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64)
  58. var clusters []*models.CloudModel
  59. err = tx.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error
  60. if err != nil {
  61. logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
  62. return nil, errors.Errorf("the cluster does not match the drive resources. Check the data")
  63. }
  64. taskCloud := cloud.TaskCloudModel{}
  65. opt := &option.CloudOption{}
  66. utils.Convert(&req, &opt)
  67. sc, _ := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, tx, l.svcCtx.PromClient)
  68. results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc, executor.SUBMIT_MODE_JOINT_CLOUD, nil)
  69. if err != nil {
  70. logx.Errorf("AssignAndSchedule() => execution error: %v", err)
  71. return nil, err
  72. }
  73. rs := (results).([]*schedulers.CloudResult)
  74. var synergyStatus int64
  75. if len(rs) > 1 {
  76. synergyStatus = 1
  77. }
  78. var strategy int64
  79. sqlStr := `select t_dict_item.item_value
  80. from t_dict
  81. left join t_dict_item on t_dict.id = t_dict_item.dict_id
  82. where item_text = ?
  83. and t_dict.dict_code = 'schedule_Strategy'`
  84. //查询调度策略
  85. err = tx.Raw(sqlStr, req.Strategy).Scan(&strategy).Error
  86. taskModel := models.Task{
  87. Id: utils.GenSnowflakeID(),
  88. Status: constants.Saved,
  89. Name: req.Name,
  90. CommitTime: time.Now(),
  91. YamlString: strings.Join(req.ReqBody, "\n---\n"),
  92. AdapterTypeDict: "0",
  93. SynergyStatus: synergyStatus,
  94. Strategy: strategy,
  95. UserId: req.UserId,
  96. }
  97. resp.TaskId = taskModel.Id
  98. var taskClouds []cloud.TaskCloudModel
  99. adapterName := ""
  100. tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName)
  101. for _, r := range rs {
  102. for _, s := range req.ReqBody {
  103. sStruct := UnMarshalK8sStruct(s, int64(r.Replica))
  104. unString, _ := sStruct.MarshalJSON()
  105. taskCloud.Id = utils.GenSnowflakeIDUint()
  106. taskCloud.Name = sStruct.GetName() + "-" + sStruct.GetKind()
  107. taskCloud.TaskId = uint(taskModel.Id)
  108. clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64)
  109. taskCloud.AdapterId = uint(adapterId)
  110. taskCloud.AdapterName = adapterName
  111. taskCloud.UserId = req.UserId
  112. taskCloud.ClusterId = uint(clusterId)
  113. taskCloud.ClusterName = r.ClusterName
  114. taskCloud.Status = constants.Saved
  115. taskCloud.YamlString = string(unString)
  116. taskCloud.Kind = sStruct.GetKind()
  117. taskCloud.Namespace = sStruct.GetNamespace()
  118. taskClouds = append(taskClouds, taskCloud)
  119. }
  120. }
  121. noticeInfo := clientCore.NoticeInfo{
  122. AdapterId: int64(adapterId),
  123. AdapterName: adapterName,
  124. NoticeType: "create",
  125. TaskName: req.Name,
  126. Incident: "任务创建中",
  127. CreatedTime: time.Now(),
  128. }
  129. db := tx.Table("task").Create(&taskModel)
  130. db = tx.Table("task_cloud").Create(&taskClouds)
  131. db = tx.Table("t_notice").Create(&noticeInfo)
  132. if db.Error != nil {
  133. logx.Errorf("Task creation failure, err: %v", db.Error)
  134. return nil, errors.New("task creation failure")
  135. }
  136. // 数据上链
  137. bytes, _ := json.Marshal(taskModel)
  138. if err != nil {
  139. return nil, err
  140. }
  141. // 查询资源价格
  142. var price int64
  143. for _, clusterId := range req.ClusterIds {
  144. var clusterPrice int64
  145. l.svcCtx.DbEngin.Raw("select price from resource_cost where resource_id = ?", clusterId).Scan(&clusterPrice)
  146. price = price + clusterPrice
  147. }
  148. remoteUtil.Evidence(remoteUtil.EvidenceParam{
  149. UserIp: req.UserIp,
  150. Url: l.svcCtx.Config.BlockChain.Url,
  151. ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress,
  152. FunctionName: l.svcCtx.Config.BlockChain.FunctionName,
  153. Type: l.svcCtx.Config.BlockChain.Type,
  154. Token: req.Token,
  155. Amount: price,
  156. Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
  157. })
  158. return resp, nil
  159. }
  160. func UnMarshalK8sStruct(yamlString string, replica int64) *unstructured.Unstructured {
  161. unstructuredObj := &unstructured.Unstructured{}
  162. d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096)
  163. var err error
  164. for {
  165. var rawObj runtime.RawExtension
  166. err = d.Decode(&rawObj)
  167. if err == io.EOF {
  168. break
  169. }
  170. obj := &unstructured.Unstructured{}
  171. syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj)
  172. unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
  173. if err != nil {
  174. logx.Errorf("UnMarshalK8sStruct() => Execution failure err:%v", err)
  175. }
  176. unstructuredObj = &unstructured.Unstructured{Object: unstructuredMap}
  177. // 命名空间为空 设置默认值
  178. if len(unstructuredObj.GetNamespace()) == 0 {
  179. unstructuredObj.SetNamespace("default")
  180. }
  181. //设置副本数
  182. if unstructuredObj.GetKind() == "Deployment" || unstructuredObj.GetKind() == "StatefulSet" {
  183. unstructured.SetNestedField(unstructuredObj.Object, replica, "spec", "replicas")
  184. }
  185. }
  186. return unstructuredObj
  187. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.