|
- package cloud
-
- import (
- "bytes"
- "context"
- "github.com/pkg/errors"
- clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/remoteUtil"
- "io"
- "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
- "k8s.io/apimachinery/pkg/runtime"
- syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
- "k8s.io/apimachinery/pkg/util/json"
- kyaml "k8s.io/apimachinery/pkg/util/yaml"
- "strconv"
- "strings"
- "time"
-
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
-
- "github.com/zeromicro/go-zero/core/logx"
- )
-
- type CommitGeneralTaskLogic struct {
- logx.Logger
- ctx context.Context
- svcCtx *svc.ServiceContext
- }
-
- func NewCommitGeneralTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitGeneralTaskLogic {
- return &CommitGeneralTaskLogic{
- Logger: logx.WithContext(ctx),
- ctx: ctx,
- svcCtx: svcCtx,
- }
- }
-
- func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) (resp *types.GeneralTaskResp, err error) {
- // todo: add your logic here and delete this line
- resp = &types.GeneralTaskResp{}
- tx := l.svcCtx.DbEngin.Begin()
- // 执行回滚或者提交操作
- defer func() {
- if p := recover(); p != nil {
- tx.Rollback()
- logx.Error(p)
- } else if tx.Error != nil {
- logx.Info("rollback, error", tx.Error)
- tx.Rollback()
- } else {
- tx = tx.Commit()
- logx.Info("commit success")
- }
- }()
- adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64)
- var clusters []*models.CloudModel
- err = tx.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error
- if err != nil {
- logx.Errorf("CommitGeneralTask() => sql execution error: %v", err)
- return nil, errors.Errorf("the cluster does not match the drive resources. Check the data")
- }
- taskCloud := cloud.TaskCloudModel{}
- opt := &option.CloudOption{}
- utils.Convert(&req, &opt)
- sc, _ := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, tx, l.svcCtx.PromClient)
-
- results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc, executor.SUBMIT_MODE_JOINT_CLOUD, nil)
- if err != nil {
- logx.Errorf("AssignAndSchedule() => execution error: %v", err)
- return nil, err
- }
-
- rs := (results).([]*schedulers.CloudResult)
-
- var synergyStatus int64
- if len(rs) > 1 {
- synergyStatus = 1
- }
- var strategy int64
- sqlStr := `select t_dict_item.item_value
- from t_dict
- left join t_dict_item on t_dict.id = t_dict_item.dict_id
- where item_text = ?
- and t_dict.dict_code = 'schedule_Strategy'`
- //查询调度策略
- err = tx.Raw(sqlStr, req.Strategy).Scan(&strategy).Error
- taskModel := models.Task{
- Id: utils.GenSnowflakeID(),
- Status: constants.Saved,
- Name: req.Name,
- CommitTime: time.Now(),
- YamlString: strings.Join(req.ReqBody, "\n---\n"),
- AdapterTypeDict: "0",
- SynergyStatus: synergyStatus,
- Strategy: strategy,
- UserId: req.UserId,
- }
- resp.TaskId = taskModel.Id
- var taskClouds []cloud.TaskCloudModel
- adapterName := ""
- tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName)
- for _, r := range rs {
- for _, s := range req.ReqBody {
- sStruct := UnMarshalK8sStruct(s, int64(r.Replica))
- unString, _ := sStruct.MarshalJSON()
- taskCloud.Id = utils.GenSnowflakeIDUint()
- taskCloud.Name = sStruct.GetName() + "-" + sStruct.GetKind()
- taskCloud.TaskId = uint(taskModel.Id)
- clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64)
- taskCloud.AdapterId = uint(adapterId)
- taskCloud.AdapterName = adapterName
- taskCloud.UserId = req.UserId
- taskCloud.ClusterId = uint(clusterId)
- taskCloud.ClusterName = r.ClusterName
- taskCloud.Status = constants.Saved
- taskCloud.YamlString = string(unString)
- taskCloud.Kind = sStruct.GetKind()
- taskCloud.Namespace = sStruct.GetNamespace()
- taskClouds = append(taskClouds, taskCloud)
- }
- }
- noticeInfo := clientCore.NoticeInfo{
- AdapterId: int64(adapterId),
- AdapterName: adapterName,
- NoticeType: "create",
- TaskName: req.Name,
- Incident: "任务创建中",
- CreatedTime: time.Now(),
- }
- db := tx.Table("task").Create(&taskModel)
- db = tx.Table("task_cloud").Create(&taskClouds)
- db = tx.Table("t_notice").Create(¬iceInfo)
-
- if db.Error != nil {
- logx.Errorf("Task creation failure, err: %v", db.Error)
- return nil, errors.New("task creation failure")
- }
- // 数据上链
- bytes, _ := json.Marshal(taskModel)
- if err != nil {
- return nil, err
- }
- // 查询资源价格
- var price int64
- for _, clusterId := range req.ClusterIds {
- var clusterPrice int64
- l.svcCtx.DbEngin.Raw("select price from resource_cost where resource_id = ?", clusterId).Scan(&clusterPrice)
- price = price + clusterPrice
- }
-
- remoteUtil.Evidence(remoteUtil.EvidenceParam{
- UserIp: req.UserIp,
- Url: l.svcCtx.Config.BlockChain.Url,
- ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress,
- FunctionName: l.svcCtx.Config.BlockChain.FunctionName,
- Type: l.svcCtx.Config.BlockChain.Type,
- Token: req.Token,
- Amount: price,
- Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
- })
- return resp, nil
- }
-
- func UnMarshalK8sStruct(yamlString string, replica int64) *unstructured.Unstructured {
- unstructuredObj := &unstructured.Unstructured{}
- d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096)
- var err error
- for {
- var rawObj runtime.RawExtension
- err = d.Decode(&rawObj)
- if err == io.EOF {
- break
- }
- obj := &unstructured.Unstructured{}
- syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj)
- unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
- if err != nil {
- logx.Errorf("UnMarshalK8sStruct() => Execution failure err:%v", err)
- }
- unstructuredObj = &unstructured.Unstructured{Object: unstructuredMap}
- // 命名空间为空 设置默认值
- if len(unstructuredObj.GetNamespace()) == 0 {
- unstructuredObj.SetNamespace("default")
- }
- //设置副本数
- if unstructuredObj.GetKind() == "Deployment" || unstructuredObj.GetKind() == "StatefulSet" {
- unstructured.SetNestedField(unstructuredObj.Object, replica, "spec", "replicas")
- }
- }
- return unstructuredObj
- }
|