package kq import ( "context" "encoding/json" "github.com/zeromicro/go-zero/core/logx" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/pkg/scheduler" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc" "gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types" "gitlink.org.cn/jcce-pcm/pcm-coordinator/model" "gitlink.org.cn/jcce-pcm/utils/tool" ) /* * Listening to the payment flow status change notification message queue */ type ScheduleHpcMq struct { ctx context.Context svcCtx *svc.ServiceContext } func NewScheduleHpcMq(ctx context.Context, svcCtx *svc.ServiceContext) *ScheduleHpcMq { return &ScheduleHpcMq{ ctx: ctx, svcCtx: svcCtx, } } func (l *ScheduleHpcMq) Consume(_, val string) error { // 接受消息 var task *types.TaskInfo json.Unmarshal([]byte(val), &task) participantId, err := scheduler.MatchLabels(l.svcCtx.DbEngin, task) if err != nil { return err } hpc := model.Hpc{ TaskId: task.TaskId, Status: "Saved", ParticipantId: participantId[0], YamlString: val, } tool.Convert(task.Metadata, &hpc) // 存储数据 tx := l.svcCtx.DbEngin.Create(&hpc) if tx.Error != nil { logx.Error(tx.Error) return tx.Error } return nil }