package kq import ( "bytes" "context" "encoding/json" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" "gitlink.org.cn/jcce-pcm/pcm-coordinator/model" "io" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml" kyaml "k8s.io/apimachinery/pkg/util/yaml" ) /* * Listening to the payment flow status change notification message queue */ type ScheduleCloudMq struct { ctx context.Context svcCtx *svc.ServiceContext } func NewScheduleCloudMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleCloudMq { return &ScheduleCloudMq{ ctx: ctx, svcCtx: svcCtx, } } func UnMarshalK8sStruct(yamlString string, taskId int64) model.Cloud { var cloud model.Cloud d := kyaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(yamlString), 4096) var err error for { var rawObj runtime.RawExtension err = d.Decode(&rawObj) if err == io.EOF { break } if err != nil { } obj := &unstructured.Unstructured{} syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj) if err != nil { } unstructuredMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { } unstructureObj := &unstructured.Unstructured{Object: unstructuredMap} cloud = model.Cloud{ TaskId: taskId, ApiVersion: unstructureObj.GetAPIVersion(), Name: unstructureObj.GetName(), Kind: unstructureObj.GetKind(), Namespace: unstructureObj.GetNamespace(), Status: "Saved", ServiceName: "kubeNative", } } return cloud } func (l *ScheduleCloudMq) Consume(_, val string) error { var task *types.TaskInfo json.Unmarshal([]byte(val), &task) // 构建提交作业到云算的结构体 bytes, err := json.Marshal(task.Metadata) if err != nil { return err } cloud := UnMarshalK8sStruct(string(bytes), task.TaskId) cloud.YamlString = string(bytes) // 存储数据 tx := l.svcCtx.DbEngin.Create(&cloud) if tx.Error != nil { logx.Error(tx.Error) return tx.Error } return nil }