package cloud import ( "bytes" "context" "github.com/pkg/errors" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/schedulers/option" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service/executor" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/constants" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models/cloud" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/remoteUtil" "io" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" "k8s.io/apimachinery/pkg/util/json" kyaml "k8s.io/apimachinery/pkg/util/yaml" "strconv" "strings" "time" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc" "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types" "github.com/zeromicro/go-zero/core/logx" ) type CommitGeneralTaskLogic struct { logx.Logger ctx context.Context svcCtx *svc.ServiceContext } func NewCommitGeneralTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitGeneralTaskLogic { return &CommitGeneralTaskLogic{ Logger: logx.WithContext(ctx), ctx: ctx, svcCtx: svcCtx, } } func (l *CommitGeneralTaskLogic) CommitGeneralTask(req *types.GeneralTaskReq) error { tx := l.svcCtx.DbEngin.Begin() // 执行回滚或者提交操作 defer func() { if p := recover(); p != nil { tx.Rollback() logx.Error(p) } else if tx.Error != nil { logx.Info("rollback, error", tx.Error) tx.Rollback() } else { tx = tx.Commit() logx.Info("commit success") } }() adapterId, _ := strconv.ParseUint(req.AdapterIds[0], 10, 64) var clusters []*models.CloudModel err := tx.Raw("SELECT * FROM `t_cluster` where adapter_id in ? and id in ?", req.AdapterIds, req.ClusterIds).Scan(&clusters).Error if err != nil { logx.Errorf("CommitGeneralTask() => sql execution error: %v", err) return errors.Errorf("the cluster does not match the drive resources. Check the data") } taskCloud := cloud.TaskCloudModel{} opt := &option.CloudOption{} utils.Convert(&req, &opt) sc, _ := schedulers.NewCloudScheduler(l.ctx, "", l.svcCtx.Scheduler, opt, tx, l.svcCtx.PromClient) results, err := l.svcCtx.Scheduler.AssignAndSchedule(sc, executor.SUBMIT_MODE_JOINT_CLOUD, nil) if err != nil { logx.Errorf("AssignAndSchedule() => execution error: %v", err) return err } rs := (results).([]*schedulers.CloudResult) var synergyStatus int64 if len(rs) > 1 { synergyStatus = 1 } var strategy int64 sqlStr := `select t_dict_item.item_value from t_dict left join t_dict_item on t_dict.id = t_dict_item.dict_id where item_text = ? and t_dict.dict_code = 'schedule_Strategy'` //查询调度策略 err = tx.Raw(sqlStr, req.Strategy).Scan(&strategy).Error taskModel := models.Task{ Id: utils.GenSnowflakeID(), Status: constants.Saved, Name: req.Name, CommitTime: time.Now(), YamlString: strings.Join(req.ReqBody, "\n---\n"), AdapterTypeDict: "0", SynergyStatus: synergyStatus, Strategy: strategy, } var taskClouds []cloud.TaskCloudModel adapterName := "" tx.Table("t_adapter").Select("name").Where("id=?", adapterId).Find(&adapterName) for _, r := range rs { for _, s := range req.ReqBody { sStruct := UnMarshalK8sStruct(s, int64(r.Replica)) unString, _ := sStruct.MarshalJSON() taskCloud.Id = utils.GenSnowflakeIDUint() taskCloud.Name = sStruct.GetName() + "-" + sStruct.GetKind() taskCloud.TaskId = uint(taskModel.Id) clusterId, _ := strconv.ParseUint(r.ClusterId, 10, 64) taskCloud.AdapterId = uint(adapterId) taskCloud.AdapterName = adapterName taskCloud.UserId = req.UserId taskCloud.ClusterId = uint(clusterId) taskCloud.ClusterName = r.ClusterName taskCloud.Status = constants.Saved taskCloud.YamlString = string(unString) taskCloud.Kind = sStruct.GetKind() taskCloud.Namespace = sStruct.GetNamespace() taskClouds = append(taskClouds, taskCloud) } } noticeInfo := clientCore.NoticeInfo{ AdapterId: int64(adapterId), AdapterName: adapterName, NoticeType: "create", TaskName: req.Name, Incident: "任务创建中", CreatedTime: time.Now(), } db := tx.Table("task").Create(&taskModel) db = tx.Table("task_cloud").Create(&taskClouds) db = tx.Table("t_notice").Create(¬iceInfo) // 数据上链 bytes, err := json.Marshal(taskModel) if err != nil { return err } remoteUtil.Evidence(remoteUtil.EvidenceParam{ Url: l.svcCtx.Config.BlockChain.Url, ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress, FunctionName: l.svcCtx.Config.BlockChain.FunctionName, Type: l.svcCtx.Config.BlockChain.Type, Token: req.Token, Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)}, }) if db.Error != nil { logx.Errorf("Task creation failure, err: %v", db.Error) return errors.New("task creation failure") } return nil } func UnMarshalK8sStruct(yamlString string, replica int64) *unstructured.Unstructured { unstructuredObj := &unstructured.Unstructured{} d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) var err error for { var rawObj runtime.RawExtension err = d.Decode(&rawObj) if err == io.EOF { break } obj := &unstructured.Unstructured{} syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj) unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { logx.Errorf("UnMarshalK8sStruct() => Execution failure err:%v", err) } unstructuredObj = &unstructured.Unstructured{Object: unstructuredMap} // 命名空间为空 设置默认值 if len(unstructuredObj.GetNamespace()) == 0 { unstructuredObj.SetNamespace("default") } //设置副本数 if unstructuredObj.GetKind() == "Deployment" || unstructuredObj.GetKind() == "StatefulSet" { unstructured.SetNestedField(unstructuredObj.Object, replica, "spec", "replicas") } } return unstructuredObj }