You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

commitgeneraltasklogic.go 6.3 kB

10 months ago
11 months ago
11 months ago
10 months ago
11 months ago
11 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package cloud
  2. import (
  3. "bytes"
  4. "context"
  5. "github.com/pkg/errors"
  6. clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud"
  13. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  14. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/remoteUtil"
  15. "io"
  16. "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  17. "k8s.io/apimachinery/pkg/runtime"
  18. syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
  19. "k8s.io/apimachinery/pkg/util/json"
  20. kyaml "k8s.io/apimachinery/pkg/util/yaml"
  21. "strconv"
  22. "strings"
  23. "time"
  24. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
  25. "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
  26. "github.com/zeromicro/go-zero/core/logx"
  27. )
  28. type CommitGeneralTaskLogic struct {
  29. logx.Logger
  30. ctx context.Context
  31. svcCtx *svc.ServiceContext
  32. }
  33. func NewCommitGeneralTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitGeneralTaskLogic {
  34. return &CommitGeneralTaskLogic{
  35. Logger: logx.WithContext(ctx),
  36. ctx: ctx,
  37. svcCtx: svcCtx,
  38. }
  39. }
  40. func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) error {
  41. tx := l.svcCtx.DbEngin.Begin()
  42. // 执行回滚或者提交操作
  43. defer func() {
  44. if p := recover(); p != nil {
  45. tx.Rollback()
  46. logx.Error(p)
  47. } else if tx.Error != nil {
  48. logx.Info("rollback, error", tx.Error)
  49. tx.Rollback()
  50. } else {
  51. tx = tx.Commit()
  52. logx.Info("commit success")
  53. }
  54. }()
  55. adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64)
  56. var clusters []*models.CloudModel
  57. err := tx.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error
  58. if err != nil {
  59. logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
  60. return errors.Errorf("the cluster does not match the drive resources. Check the data")
  61. }
  62. taskCloud := cloud.TaskCloudModel{}
  63. opt := &option.CloudOption{}
  64. utils.Convert(&req, &opt)
  65. sc, _ := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, tx, l.svcCtx.PromClient)
  66. results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc, executor.SUBMIT_MODE_JOINT_CLOUD, nil)
  67. if err != nil {
  68. logx.Errorf("AssignAndSchedule() => execution error: %v", err)
  69. return err
  70. }
  71. rs := (results).([]*schedulers.CloudResult)
  72. var synergyStatus int64
  73. if len(rs) > 1 {
  74. synergyStatus = 1
  75. }
  76. var strategy int64
  77. sqlStr := `select t_dict_item.item_value
  78. from t_dict
  79. left join t_dict_item on t_dict.id = t_dict_item.dict_id
  80. where item_text = ?
  81. and t_dict.dict_code = 'schedule_Strategy'`
  82. //查询调度策略
  83. err = tx.Raw(sqlStr, req.Strategy).Scan(&strategy).Error
  84. taskModel := models.Task{
  85. Id: utils.GenSnowflakeID(),
  86. Status: constants.Saved,
  87. Name: req.Name,
  88. CommitTime: time.Now(),
  89. YamlString: strings.Join(req.ReqBody, "\n---\n"),
  90. AdapterTypeDict: "0",
  91. SynergyStatus: synergyStatus,
  92. Strategy: strategy,
  93. }
  94. var taskClouds []cloud.TaskCloudModel
  95. adapterName := ""
  96. tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName)
  97. for _, r := range rs {
  98. for _, s := range req.ReqBody {
  99. sStruct := UnMarshalK8sStruct(s, int64(r.Replica))
  100. unString, _ := sStruct.MarshalJSON()
  101. taskCloud.Id = utils.GenSnowflakeIDUint()
  102. taskCloud.Name = sStruct.GetName() + "-" + sStruct.GetKind()
  103. taskCloud.TaskId = uint(taskModel.Id)
  104. clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64)
  105. taskCloud.AdapterId = uint(adapterId)
  106. taskCloud.AdapterName = adapterName
  107. taskCloud.UserId = req.UserId
  108. taskCloud.ClusterId = uint(clusterId)
  109. taskCloud.ClusterName = r.ClusterName
  110. taskCloud.Status = constants.Saved
  111. taskCloud.YamlString = string(unString)
  112. taskCloud.Kind = sStruct.GetKind()
  113. taskCloud.Namespace = sStruct.GetNamespace()
  114. taskClouds = append(taskClouds, taskCloud)
  115. }
  116. }
  117. noticeInfo := clientCore.NoticeInfo{
  118. AdapterId: int64(adapterId),
  119. AdapterName: adapterName,
  120. NoticeType: "create",
  121. TaskName: req.Name,
  122. Incident: "任务创建中",
  123. CreatedTime: time.Now(),
  124. }
  125. db := tx.Table("task").Create(&taskModel)
  126. db = tx.Table("task_cloud").Create(&taskClouds)
  127. db = tx.Table("t_notice").Create(&noticeInfo)
  128. if db.Error != nil {
  129. logx.Errorf("Task creation failure, err: %v", db.Error)
  130. return errors.New("task creation failure")
  131. }
  132. // 数据上链
  133. bytes, _ := json.Marshal(taskModel)
  134. if err != nil {
  135. return err
  136. }
  137. remoteUtil.Evidence(remoteUtil.EvidenceParam{
  138. UserIp: req.UserIp,
  139. Url: l.svcCtx.Config.BlockChain.Url,
  140. ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress,
  141. FunctionName: l.svcCtx.Config.BlockChain.FunctionName,
  142. Type: l.svcCtx.Config.BlockChain.Type,
  143. Token: req.Token,
  144. Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
  145. })
  146. return nil
  147. }
  148. func UnMarshalK8sStruct(yamlString string, replica int64) *unstructured.Unstructured {
  149. unstructuredObj := &unstructured.Unstructured{}
  150. d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096)
  151. var err error
  152. for {
  153. var rawObj runtime.RawExtension
  154. err = d.Decode(&rawObj)
  155. if err == io.EOF {
  156. break
  157. }
  158. obj := &unstructured.Unstructured{}
  159. syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj)
  160. unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
  161. if err != nil {
  162. logx.Errorf("UnMarshalK8sStruct() => Execution failure err:%v", err)
  163. }
  164. unstructuredObj = &unstructured.Unstructured{Object: unstructuredMap}
  165. // 命名空间为空 设置默认值
  166. if len(unstructuredObj.GetNamespace()) == 0 {
  167. unstructuredObj.SetNamespace("default")
  168. }
  169. //设置副本数
  170. if unstructuredObj.GetKind() == "Deployment" || unstructuredObj.GetKind() == "StatefulSet" {
  171. unstructured.SetNestedField(unstructuredObj.Object, replica, "spec", "replicas")
  172. }
  173. }
  174. return unstructuredObj
  175. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.