|
- package hpc
-
- import (
- "context"
- "fmt"
- "github.com/zeromicro/go-zero/core/logx"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/scheduler/service"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/svc"
- "gitlink.org.cn/JointCloud/pcm-coordinator/internal/types"
- "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
- )
-
- type CancelJobLogic struct {
- logx.Logger
- ctx context.Context
- svcCtx *svc.ServiceContext
- hpcService *service.HpcService
- }
-
- type TaskHPCResult struct {
- ID uint `gorm:"column:id"` // 对应 t.id
- JobID string `gorm:"column:job_id"` // 对应 hpc.job_id
- AdapterId string `gorm:"column:adapter_id"` // 对应 hpc.adapter_id
- }
-
- func NewCancelJobLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CancelJobLogic {
- cache := make(map[string]interface{}, 10)
- hpcService, err := service.NewHpcService(&svcCtx.Config, svcCtx.Scheduler.HpcStorages, cache)
- if err != nil {
- return nil
- }
- return &CancelJobLogic{
- Logger: logx.WithContext(ctx),
- ctx: ctx,
- svcCtx: svcCtx,
- hpcService: hpcService,
- }
- }
-
- func (l *CancelJobLogic) CancelJob(req *types.CancelJobReq) error {
- //var clusterInfo *types.ClusterInfo
- //tx := l.svcCtx.DbEngin.Raw("select * from t_cluster where id = ?", req.ClusterId).Scan(&clusterInfo)
- //if tx.Error != nil {
- // return tx.Error
- //}
- //// 查询p端调用地址
- //var adapterAddress string
- //l.svcCtx.DbEngin.Raw("SELECT server FROM `t_adapter` where id = ?", clusterInfo.AdapterId).Scan(&adapterAddress)
- //var jobResp slurm.GetJobResp
- //httpClient := resty.New().R()
- //_, err := httpClient.SetHeader("Content-Type", "application/json").
- // SetQueryParams(map[string]string{
- // "jobId": req.JobId,
- // "server": clusterInfo.Server,
- // "version": clusterInfo.Version,
- // "token": clusterInfo.Token,
- // "username": clusterInfo.Username,
- // }).
- // SetResult(&jobResp).
- // Delete(adapterAddress + "/api/v1/job/cancel")
- //if err != nil {
- // return err
- //}
- //if len(jobResp.Errors) != 0 {
- // return errors.Errorf(jobResp.Errors[0].Description)
- //}
- //return nil
- var hpcR TaskHPCResult
- tx := l.svcCtx.DbEngin.Raw(
- "SELECT t.id, hpc.job_id ,hpc.adapter_id FROM task t "+
- "INNER JOIN task_hpc hpc ON t.id = hpc.task_id "+
- "WHERE adapter_type_dict = 2 AND t.id = ?",
- req.TaskId,
- ).Scan(&hpcR).Error
- if tx != nil {
- return fmt.Errorf("数据库查询失败: %v", tx.Error)
- }
- if hpcR.ID == 0 || hpcR.JobID == "" {
- return fmt.Errorf("作业不存在")
- }
- var adapterInfo types.AdapterInfo
- l.svcCtx.DbEngin.Raw("SELECT * FROM `t_adapter` where id = ?", hpcR.AdapterId).Scan(&adapterInfo)
- if adapterInfo.Id == "" {
- return fmt.Errorf("adapter not found")
- }
- // 取消作业
- err := l.hpcService.HpcExecutorAdapterMap[adapterInfo.Id].CancelTask(l.ctx, hpcR.JobID)
- if err != nil {
- return err
- }
- // 更新数据库状态
- tx = l.svcCtx.DbEngin.Model(&types.Task{}).Where("id = ?", hpcR.ID).Update("status", "Canceled").Error
- if tx != nil {
- return fmt.Errorf("数据库更新失败: %v", tx.Error)
- }
- // 更新数据库状态
- tx = l.svcCtx.DbEngin.Model(&models.TaskHpc{}).Where("task_id = ?", hpcR.ID).Update("status", "Canceled").Error
- if tx != nil {
- return fmt.Errorf("数据库更新失败: %v", tx.Error)
- }
- return nil
- }
|