You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

commitgeneraltasklogic.go 5.5 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package cloud
  2. import (
  3. "bytes"
  4. "context"
  5. "github.com/pkg/errors"
  6. clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/api/client"
  7. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers"
  8. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/scheduler/schedulers/option"
  9. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
  10. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
  11. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud"
  12. "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
  13. "io"
  14. "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
  15. "k8s.io/apimachinery/pkg/runtime"
  16. syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
  17. kyaml "k8s.io/apimachinery/pkg/util/yaml"
  18. "strconv"
  19. "strings"
  20. "time"
  21. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/svc"
  22. "gitlink.org.cn/JointCloud/pcm-coordinator/api/internal/types"
  23. "github.com/zeromicro/go-zero/core/logx"
  24. )
  25. type CommitGeneralTaskLogic struct {
  26. logx.Logger
  27. ctx context.Context
  28. svcCtx *svc.ServiceContext
  29. }
  30. func NewCommitGeneralTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitGeneralTaskLogic {
  31. return &CommitGeneralTaskLogic{
  32. Logger: logx.WithContext(ctx),
  33. ctx: ctx,
  34. svcCtx: svcCtx,
  35. }
  36. }
  37. func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) error {
  38. tx := l.svcCtx.DbEngin.Begin()
  39. // 执行回滚或者提交操作
  40. defer func() {
  41. if p := recover(); p != nil {
  42. tx.Rollback()
  43. logx.Error(p)
  44. } else if tx.Error != nil {
  45. logx.Info("rollback, error", tx.Error)
  46. tx.Rollback()
  47. } else {
  48. tx = tx.Commit()
  49. logx.Info("commit success")
  50. }
  51. }()
  52. //TODO adapter
  53. adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64)
  54. var clusters []*models.CloudModel
  55. err := tx.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error
  56. if err != nil {
  57. logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
  58. return errors.Errorf("the cluster does not match the drive resources. Check the data")
  59. }
  60. taskCloud := cloud.TaskCloudModel{}
  61. opt := &option.CloudOption{}
  62. utils.Convert(&req, &opt)
  63. sc, _ := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, tx, l.svcCtx.PromClient)
  64. results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc)
  65. if err != nil {
  66. logx.Errorf("AssignAndSchedule() => execution error: %v", err)
  67. return err
  68. }
  69. rs := (results).([]*schedulers.CloudResult)
  70. var synergyStatus int64
  71. if len(rs) > 1 {
  72. synergyStatus = 1
  73. }
  74. var strategy int64
  75. sqlStr := `select t_dict_item.item_value
  76. from t_dict
  77. left join t_dict_item on t_dict.id = t_dict_item.dict_id
  78. where item_text = ?
  79. and t_dict.dict_code = 'schedule_Strategy'`
  80. //查询调度策略
  81. err = tx.Raw(sqlStr, req.Strategy).Scan(&strategy).Error
  82. taskModel := models.Task{
  83. Id: utils.GenSnowflakeID(),
  84. Status: constants.Saved,
  85. Name: req.Name,
  86. CommitTime: time.Now(),
  87. YamlString: strings.Join(req.ReqBody, "\n---\n"),
  88. AdapterTypeDict: 0,
  89. SynergyStatus: synergyStatus,
  90. Strategy: strategy,
  91. }
  92. var taskClouds []cloud.TaskCloudModel
  93. for _, r := range rs {
  94. for _, s := range req.ReqBody {
  95. sStruct := UnMarshalK8sStruct(s, int64(r.Replica))
  96. unString, _ := sStruct.MarshalJSON()
  97. taskCloud.Id = utils.GenSnowflakeIDUint()
  98. taskCloud.TaskId = uint(taskModel.Id)
  99. clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64)
  100. taskCloud.AdapterId = uint(adapterId)
  101. taskCloud.ClusterId = uint(clusterId)
  102. taskCloud.ClusterName = r.ClusterName
  103. taskCloud.Status = constants.Saved
  104. taskCloud.YamlString = string(unString)
  105. taskCloud.Kind = sStruct.GetKind()
  106. taskCloud.Namespace = sStruct.GetNamespace()
  107. taskClouds = append(taskClouds, taskCloud)
  108. }
  109. }
  110. adapterName := ""
  111. tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName)
  112. noticeInfo := clientCore.NoticeInfo{
  113. AdapterId: int64(adapterId),
  114. AdapterName: adapterName,
  115. NoticeType: "create",
  116. TaskName: req.Name,
  117. Incident: "任务创建中",
  118. CreatedTime: time.Now(),
  119. }
  120. db := tx.Table("task").Create(&taskModel)
  121. db = tx.Table("task_cloud").Create(&taskClouds)
  122. db = tx.Table("t_notice").Create(&noticeInfo)
  123. if db.Error != nil {
  124. logx.Errorf("Task creation failure, err: %v", db.Error)
  125. return errors.New("task creation failure")
  126. }
  127. return nil
  128. }
  129. func UnMarshalK8sStruct(yamlString string, replica int64) *unstructured.Unstructured {
  130. unstructuredObj := &unstructured.Unstructured{}
  131. d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096)
  132. var err error
  133. for {
  134. var rawObj runtime.RawExtension
  135. err = d.Decode(&rawObj)
  136. if err == io.EOF {
  137. break
  138. }
  139. obj := &unstructured.Unstructured{}
  140. syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj)
  141. unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
  142. if err != nil {
  143. logx.Errorf("UnMarshalK8sStruct() => Execution failure err:%v", err)
  144. }
  145. unstructuredObj = &unstructured.Unstructured{Object: unstructuredMap}
  146. // 命名空间为空 设置默认值
  147. if len(unstructuredObj.GetNamespace()) == 0 {
  148. unstructuredObj.SetNamespace("default")
  149. }
  150. //设置副本数
  151. if unstructuredObj.GetKind() == "Deployment" || unstructuredObj.GetKind() == "StatefulSet" {
  152. unstructured.SetNestedField(unstructuredObj.Object, replica, "spec", "replicas")
  153. }
  154. }
  155. return unstructuredObj
  156. }

PCM is positioned as Software stack over Cloud, aiming to build the standards and ecology of heterogeneous cloud collaboration for JCC in a non intrusive and autonomous peer-to-peer manner.