tzwang 8 months ago
parent
commit
08fc74bbaa
12 changed files with 145 additions and 206 deletions
  1. +0
    -65
      .devops/阿里云.yml
  2. +19
    -24
      desc/hpc/pcm-hpc.api
  3. +1
    -1
      go.mod
  4. +10
    -0
      go.sum
  5. +4
    -3
      internal/handler/hpc/commithpctaskhandler.go
  6. +1
    -1
      internal/logic/adapters/updateclusterlogic.go
  7. +69
    -63
      internal/logic/hpc/commithpctasklogic.go
  8. +1
    -1
      internal/scheduler/service/inference/imageInference/imageInference.go
  9. +19
    -18
      internal/storeLink/modelarts.go
  10. +1
    -5
      internal/svc/servicecontext.go
  11. +17
    -25
      internal/types/types.go
  12. +3
    -0
      pkg/models/taskhpcmodel_gen.go

+ 0
- 65
.devops/阿里云.yml View File

@@ -1,65 +0,0 @@
version: 2
name: 阿里云
description: ""
global:
concurrent: 1
param:
- ref: ssh_host
name: ""
value: '"47.92.39.128"'
required: false
type: STRING
hidden: true
- ref: ssh_user
name: ""
value: '"root"'
required: false
type: STRING
hidden: true
workflow:
- ref: start
name: 开始
task: start
- ref: end
name: 结束
task: end
needs:
- ssh_cmd_0
- ref: git_clone_0
name: git clone
task: git_clone@1.2.9
input:
remote_url: '"https://gitlink.org.cn/JointCloud/pcm-coordinator.git"'
ref: '"refs/heads/master"'
commit_id: '""'
depth: 1
needs:
- start
- ref: docker_image_build_0
name: docker镜像构建
task: docker_image_build@1.6.0
input:
docker_username: ((aly.docker_user))
docker_password: ((aly.docker_password))
image_name: '"registry.cn-hangzhou.aliyuncs.com/jcce/pcm-core-api"'
image_tag: '"latest"'
registry_address: '"registry.cn-hangzhou.aliyuncs.com"'
docker_file: '"Dockerfile"'
docker_build_path: git_clone_0.git_path
workspace: git_clone_0.git_path
image_push: true
build_args: '""'
needs:
- git_clone_0
- ref: ssh_cmd_0
name: ssh执行命令
task: ssh_cmd@1.1.1
input:
ssh_private_key: ((aly.ssh_private_key))
ssh_ip: global.ssh_host
ssh_port: '"22"'
ssh_user: global.ssh_user
ssh_cmd: '"kubectl rollout restart deployment pcm-core-api -n ns-admin"'
needs:
- docker_image_build_0


+ 19
- 24
desc/hpc/pcm-hpc.api View File

@@ -10,37 +10,33 @@ info(


type ( type (
commitHpcTaskReq { commitHpcTaskReq {
ClusterId string `json:"clusterId,optional"`
Name string `json:"name"` Name string `json:"name"`
Account string `json:"account,optional"`
Backend string `json:"backend"`
ClusterId string `json:"clusterId"`
App string `json:"app"`
Description string `json:"description,optional"` Description string `json:"description,optional"`
TenantId int64 `json:"tenantId,optional"`
TaskId int64 `json:"taskId,optional"`
AdapterIds []string `json:"adapterIds,optional"`
MatchLabels map[string]string `json:"matchLabels,optional"`
CardCount int64 `json:"cardCount,optional"`
WorkDir string `json:"workDir,optional"` //paratera:workingDir
WallTime string `json:"wallTime,optional"`
CmdScript string `json:"cmdScript,optional"` // paratera:bootScript
AppType string `json:"appType,optional"`
AppName string `json:"appName,optional"` // paratera:jobGroupName ac:appname
Queue string `json:"queue,optional"`
NNode string `json:"nNode,optional"`
SubmitType string `json:"submitType,optional"`
StdInput string `json:"stdInput,optional"`
ClusterType string `json:"clusterType,optional"`
Partition string `json:"partition"`
UserId int64 `json:"userId,optional"`
Token string `json:"token,optional"`
UserIp string `json:"userIp,optional"`
OperateType string `json:"operateType,optional"`
Parameters map[string]string `json:"parameters"`
CustomParams map[string]string `json:"customParams"`
} }
)



type (
commitHpcTaskResp { commitHpcTaskResp {
ClusterId int64 `json:"clusterId"`
JobId string `json:"jobId"`
Code int `json:"code"`
Data Data `json:"data"`
Msg string `json:"msg"`
TraceId string `json:"trace_id"`
} }
Data {
Backend string `json:"backend"`
JobInfo map[string]string `json:"jobInfo"`
}

) )



type ( type (
hpcOverViewReq { hpcOverViewReq {
} }
@@ -160,7 +156,6 @@ type (
InstanceType int32 `form:"instanceType,optional"` InstanceType int32 `form:"instanceType,optional"`
InstanceClass string `form:"instanceClass,optional"` InstanceClass string `form:"instanceClass,optional"`
InstanceName string `form:"instanceName,optional"` InstanceName string `form:"instanceName,optional"`
PageInfo
} }
HpcInstanceCenterResp { HpcInstanceCenterResp {
InstanceCenterList []HpcInstanceCenterList `json:"instanceCenterList" copier:"InstanceCenterList"` InstanceCenterList []HpcInstanceCenterList `json:"instanceCenterList" copier:"InstanceCenterList"`


+ 1
- 1
go.mod View File

@@ -20,7 +20,7 @@ require (
github.com/zeromicro/go-zero v1.7.4 github.com/zeromicro/go-zero v1.7.4
gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20250107025835-8fc888b1d170 gitlink.org.cn/JointCloud/pcm-ac v0.0.0-20250107025835-8fc888b1d170
gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4 gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306073530-56ecf1273207
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250313064001-91fb558cfdb6
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20250102093846-164b4884c9ec gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20250102093846-164b4884c9ec
gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203 gitlink.org.cn/JointCloud/pcm-openstack v0.0.0-20240403033338-e7edabad4203


+ 10
- 0
go.sum View File

@@ -298,6 +298,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/memberlist v0.5.0 h1:EtYPN8DpAURiapus508I4n9CzHs2W+8NZGbmmR/prTM= github.com/hashicorp/memberlist v0.5.0 h1:EtYPN8DpAURiapus508I4n9CzHs2W+8NZGbmmR/prTM=
github.com/hashicorp/memberlist v0.5.0/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0= github.com/hashicorp/memberlist v0.5.0/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0=
github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.61 h1:b203Ob+V22EyNiJlrhYQGJ0aAJ9ddFMa3neYrOZ8/tQ=
github.com/huaweicloud/huaweicloud-sdk-go-v3 v0.1.61/go.mod h1:AZT3IyeViMA1qIoo6lM2eDobcTXORpqIQzSqdodah7E=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
@@ -530,6 +532,14 @@ gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4 h1:WIs/189l
gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4/go.mod h1:YbuoRgF9sEVvNJPQtGRjdocX7Du6NBOTLn+GVwqRVjo= gitlink.org.cn/JointCloud/pcm-hpc v0.0.0-20241125115811-72f3568255a4/go.mod h1:YbuoRgF9sEVvNJPQtGRjdocX7Du6NBOTLn+GVwqRVjo=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306073530-56ecf1273207 h1:korhOkFl0x1wuQBKoKTsQHeFboDwLFRWwR2G9IPPfNg= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306073530-56ecf1273207 h1:korhOkFl0x1wuQBKoKTsQHeFboDwLFRWwR2G9IPPfNg=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306073530-56ecf1273207/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0= gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250306073530-56ecf1273207/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250311041651-d676f57aac45 h1:SoR/DoLffkzoXrcfSaOY4EitPayA3kfjTp/yTOdRlps=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250311041651-d676f57aac45/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250312043331-e84d101055bd h1:MMF06GNHfYDKhtOneImh1mL5fcgDqOpesZF2fW9oU6A=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250312043331-e84d101055bd/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250313020604-f0c18343ad05 h1:2YjglJQeesAd3x6Lraq/c0qA2m8kGk8v5IPsu3IfDso=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250313020604-f0c18343ad05/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250313064001-91fb558cfdb6 h1:9o0ONbSiQHTzODptzgtVZjRYFBLncZ6dpHp9YF+v73I=
gitlink.org.cn/JointCloud/pcm-modelarts v0.0.0-20250313064001-91fb558cfdb6/go.mod h1:MxtnJJcU8S4zfGKZVcg2MOXGtwucKy7MMDwA0IemBd0=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 h1:GaXwr5sgDh0raHjUf9IewTvnRvajYea7zbLsaerYyXo= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110 h1:GaXwr5sgDh0raHjUf9IewTvnRvajYea7zbLsaerYyXo=
gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ= gitlink.org.cn/JointCloud/pcm-octopus v0.0.0-20240817071412-44397870b110/go.mod h1:QOD5+/l2D+AYBjF2h5T0mdJyfGAmF78QmeKdbBXbjLQ=
gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20250102093846-164b4884c9ec h1:Yul2JOAIS94B+eIg0UvmBSe8JrtSrZ2OA47gAYLiBYI= gitlink.org.cn/JointCloud/pcm-openi v0.0.0-20250102093846-164b4884c9ec h1:Yul2JOAIS94B+eIg0UvmBSe8JrtSrZ2OA47gAYLiBYI=


+ 4
- 3
internal/handler/hpc/commithpctaskhandler.go View File

@@ -10,6 +10,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils"
"k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/json"
"net/http" "net/http"
"strconv"
) )


func CommitHpcTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { func CommitHpcTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
@@ -21,15 +22,15 @@ func CommitHpcTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
} }
// 获取ip信息 // 获取ip信息
ip := utils.GetClientIP(r) ip := utils.GetClientIP(r)
req.UserIp = ip
req.Parameters["UserIp"] = ip
// 获取token信息 // 获取token信息
token := r.Header.Get("Authorization") token := r.Header.Get("Authorization")
req.Token = token
req.Parameters["Token"] = token
// 获取用户信息 // 获取用户信息
userStr := r.Header.Get("User") userStr := r.Header.Get("User")
user := &models.JccUserInfo{} user := &models.JccUserInfo{}
json.Unmarshal([]byte(userStr), user) json.Unmarshal([]byte(userStr), user)
req.UserId = user.Id
req.Parameters["UserId"] = strconv.FormatInt(user.Id, 10)


l := hpc.NewCommitHpcTaskLogic(r.Context(), svcCtx) l := hpc.NewCommitHpcTaskLogic(r.Context(), svcCtx)
resp, err := l.CommitHpcTask(&req) resp, err := l.CommitHpcTask(&req)


+ 1
- 1
internal/logic/adapters/updateclusterlogic.go View File

@@ -60,7 +60,7 @@ func (l *UpdateClusterLogic) UpdateCluster(req *types.ClusterCreateReq) (resp *t
}).Create(&resourceCost) }).Create(&resourceCost)


if dbResult.Error != nil { if dbResult.Error != nil {
panic(dbResult.Error)
return nil, dbResult.Error
} }
return return
} }

+ 69
- 63
internal/logic/hpc/commithpctasklogic.go View File

@@ -6,9 +6,6 @@ import (
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client" clientCore "gitlink.org.cn/JointCloud/pcm-coordinator/client"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models" "gitlink.org.cn/JointCloud/pcm-coordinator/pkg/models"
"gitlink.org.cn/JointCloud/pcm-coordinator/pkg/utils/remoteUtil"
v1 "gitlink.org.cn/JointCloud/pcm-hpc/routers/v1"
"k8s.io/apimachinery/pkg/util/json"
"strconv" "strconv"
"time" "time"


@@ -32,6 +29,27 @@ func NewCommitHpcTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Com
} }
} }


type JobSpec struct {
Name string // 应用名称: BWA/lammps
Backend string // 后端类型:slurm/sugonac
App string
OperateType string // 应用内操作类型: bwa:构建索引/对比序列
Parameters map[string]string // 通用参数
CustomParams map[string]string // 各平台自定义参数
}
type ResultParticipant struct {
Code int `json:"code"`
Data struct {
Backend string `json:"backend"`
JobInfo struct {
JobDir string `json:"jobDir"`
JobId string `json:"jobId"`
} `json:"jobInfo"`
} `json:"data"`
Msg string `json:"msg"`
TraceId string `json:"trace_id"`
}

func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) { func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *types.CommitHpcTaskResp, err error) {


var clusterInfo types.ClusterInfo var clusterInfo types.ClusterInfo
@@ -42,13 +60,14 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
} }


// 构建主任务结构体 // 构建主任务结构体
userId, _ := strconv.ParseInt(req.Parameters["UserId"], 10, 64)
taskModel := models.Task{ taskModel := models.Task{
Name: req.Name, Name: req.Name,
Description: req.Description, Description: req.Description,
CommitTime: time.Now(), CommitTime: time.Now(),
Status: "Running", Status: "Running",
AdapterTypeDict: "2", AdapterTypeDict: "2",
UserId: req.UserId,
UserId: userId,
} }


// 保存任务数据到数据库 // 保存任务数据到数据库
@@ -65,6 +84,8 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
return nil, errors.New("no corresponding adapter found") return nil, errors.New("no corresponding adapter found")
} }
clusterId, err := strconv.ParseInt(req.ClusterId, 10, 64) clusterId, err := strconv.ParseInt(req.ClusterId, 10, 64)
cardCount, _ := strconv.ParseInt(req.Parameters["cardCount"], 10, 64)
timelimit, _ := strconv.ParseInt(req.Parameters["timeLimit"], 10, 64)
hpcInfo := models.TaskHpc{ hpcInfo := models.TaskHpc{
TaskId: taskModel.Id, TaskId: taskModel.Id,
AdapterId: clusterInfo.AdapterId, AdapterId: clusterInfo.AdapterId,
@@ -72,24 +93,27 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
ClusterId: clusterId, ClusterId: clusterId,
ClusterName: clusterInfo.Name, ClusterName: clusterInfo.Name,
Name: taskModel.Name, Name: taskModel.Name,
CmdScript: req.CmdScript,
Backend: req.Backend,
OperateType: req.OperateType,
CmdScript: req.Parameters["cmdScript"],
StartTime: time.Now().String(), StartTime: time.Now().String(),
CardCount: req.CardCount,
WorkDir: req.WorkDir,
WallTime: req.WallTime,
AppType: req.AppType,
AppName: req.AppName,
Queue: req.Queue,
SubmitType: req.SubmitType,
NNode: req.NNode,
CardCount: cardCount,
WorkDir: req.Parameters["workDir"],
WallTime: req.Parameters["wallTime"],
AppType: req.Parameters["appType"],
AppName: req.Parameters["appName"],
Queue: req.Parameters["queue"],
SubmitType: req.Parameters["submitType"],
NNode: req.Parameters["nNode"],
Account: clusterInfo.Username, Account: clusterInfo.Username,
StdInput: req.StdInput,
Partition: req.Partition,
StdInput: req.Parameters["stdInput"],
Partition: req.Parameters["partition"],
CreatedTime: time.Now(), CreatedTime: time.Now(),
UpdatedTime: time.Now(), UpdatedTime: time.Now(),
Status: "Running", Status: "Running",
TimeLimit: timelimit,
} }
hpcInfo.WorkDir = clusterInfo.WorkDir + req.WorkDir
hpcInfo.WorkDir = clusterInfo.WorkDir + req.Parameters["WorkDir"]
tx = l.svcCtx.DbEngin.Create(&hpcInfo) tx = l.svcCtx.DbEngin.Create(&hpcInfo)
if tx.Error != nil { if tx.Error != nil {
return nil, tx.Error return nil, tx.Error
@@ -109,64 +133,46 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
if result.Error != nil { if result.Error != nil {
logx.Errorf("Task creation failure, err: %v", result.Error) logx.Errorf("Task creation failure, err: %v", result.Error)
} }
resp = &types.CommitHpcTaskResp{
JobId: string(""),
}
// 数据上链 // 数据上链
// 查询资源价格 // 查询资源价格
var price int64 var price int64
l.svcCtx.DbEngin.Raw("select price from resource_cost where resource_id = ?", clusterId).Scan(&price)
bytes, _ := json.Marshal(taskModel)
remoteUtil.Evidence(remoteUtil.EvidenceParam{
UserIp: req.UserIp,
Url: l.svcCtx.Config.BlockChain.Url,
ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress,
FunctionName: l.svcCtx.Config.BlockChain.FunctionName,
Type: l.svcCtx.Config.BlockChain.Type,
Token: req.Token,
Amount: price,
Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
})
l.svcCtx.DbEngin.Raw("select price from `resource_cost` where resource_id = ?", clusterId).Scan(&price)
//bytes, _ := json.Marshal(taskModel)
//remoteUtil.Evidence(remoteUtil.EvidenceParam{
// UserIp: req.Parameters["UserIp"],
// Url: l.svcCtx.Config.BlockChain.Url,
// ContractAddress: l.svcCtx.Config.BlockChain.ContractAddress,
// FunctionName: l.svcCtx.Config.BlockChain.FunctionName,
// Type: l.svcCtx.Config.BlockChain.Type,
// Token: req.Parameters["Token"],
// Amount: price,
// Args: []string{strconv.FormatInt(taskModel.Id, 10), string(bytes)},
//})
// 提交job到指定集群 // 提交job到指定集群
logx.Info("提交job到指定集群") logx.Info("提交job到指定集群")
go func() {
submitJob(&hpcInfo, &clusterInfo, server)
}()
resp, _ = submitJob(req, server)

return resp, nil return resp, nil
} }


func submitJob(hpcInfo *models.TaskHpc, clusterInfo *types.ClusterInfo, adapterAddress string) (int, error) {
SubmitJobReq := v1.SubmitJobReq{
Server: clusterInfo.Server,
Version: clusterInfo.Version,
Username: clusterInfo.Username,
Token: clusterInfo.Token,
JobOptions: v1.JobOptions{
Script: hpcInfo.CmdScript,
Job: &v1.JobProperties{
Account: hpcInfo.Account,
Name: hpcInfo.Name,
NTasks: 1,
CurrentWorkingDirectory: hpcInfo.WorkDir,
Partition: hpcInfo.Partition,
Environment: map[string]string{"PATH": clusterInfo.EnvPath,
"LD_LIBRARY_PATH": clusterInfo.EnvLdPath},
StandardOutput: hpcInfo.WorkDir + "/job.out",
StandardError: hpcInfo.WorkDir + "/job.err",
},
},
func submitJob(req *types.CommitHpcTaskReq, adapterAddress string) (resp *types.CommitHpcTaskResp, err error) {
req.Parameters["jobName"] = req.Name
reqParticipant := JobSpec{
Name: req.Name,
Backend: req.Backend,
App: req.App,
OperateType: req.OperateType,
Parameters: req.Parameters,
CustomParams: req.CustomParams,
} }
var resp v1.SubmitJobResp
httpClient := resty.New().R() httpClient := resty.New().R()
logx.Info("远程调用p端接口开始") logx.Info("远程调用p端接口开始")
_, err := httpClient.SetHeader("Content-Type", "application/json").
SetBody(SubmitJobReq).
httpClient.SetHeader("Content-Type", "application/json").
SetBody(reqParticipant).
SetResult(&resp). SetResult(&resp).
Post(adapterAddress + "/api/v1/job/submit")
Post(adapterAddress + "/api/v1/jobs")
logx.Info("远程调用p端接口完成") logx.Info("远程调用p端接口完成")
if err != nil {
return 0, err
}
return resp.JobId, nil

return resp, nil
} }

+ 1
- 1
internal/scheduler/service/inference/imageInference/imageInference.go View File

@@ -417,7 +417,7 @@ func (i *ImageInference) saveAiSubTasks(id int64, aiTaskList []*models.TaskAi, c
} }
err := i.storage.SaveAiTaskImageSubTask(&taskAiSub) err := i.storage.SaveAiTaskImageSubTask(&taskAiSub)
if err != nil { if err != nil {
panic(err)
return err
} }
} }
} }


+ 19
- 18
internal/storeLink/modelarts.go View File

@@ -157,8 +157,8 @@ func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd stri
// modelArts提交任务 // modelArts提交任务
environments := make(map[string]string) environments := make(map[string]string)
parameters := make([]*modelarts.ParametersTrainJob, 0) parameters := make([]*modelarts.ParametersTrainJob, 0)
/* inputs := make([]*modelarts.InputTraining, 0)
outputs := make([]*modelarts.OutputTraining, 0)*/
inputs := make([]*modelarts.InputTraining, 0)
//outputs := make([]*modelarts.OutputTraining, 0)
for _, env := range envs { for _, env := range envs {
s := strings.Split(env, COMMA) s := strings.Split(env, COMMA)
environments[s[0]] = s[1] environments[s[0]] = s[1]
@@ -170,22 +170,23 @@ func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd stri
Value: s[1], Value: s[1],
}) })
} }
/* inputs = append(inputs, &modelarts.InputTraining{
Name: "data_url",
Remote: &modelarts.RemoteTra{
Obs: &modelarts.Obs1{
ObsUrl: "/test-wq/data/mnist.npz",
},
}})

outputs = append(outputs, &modelarts.OutputTraining{
Name: "train_url",
Remote: &modelarts.RemoteOut{
Obs: &modelarts.ObsTra{
ObsUrl: "/test-wq/model/",
},

inputs = append(inputs, &modelarts.InputTraining{
Name: "input",
AccessMethod: "parameter",
Remote: &modelarts.RemoteTra{
Obs: &modelarts.ObsTra{
ObsUrl: datasetsId + "/",
}, },
})*/
}})

/*outputs = append(outputs, &modelarts.OutputTraining{
Name: "output",
Remote: &modelarts.RemoteOut{
Obs: &modelarts.ObsTra{
ObsUrl: "obs://test-modelarts-train/output/10v/",
},
}})*/
req := &modelarts.CreateTrainingJobReq{ req := &modelarts.CreateTrainingJobReq{
Kind: "job", Kind: "job",
Metadata: &modelarts.MetadataS{ Metadata: &modelarts.MetadataS{
@@ -200,7 +201,7 @@ func (m *ModelArtsLink) SubmitTask(ctx context.Context, imageId string, cmd stri
Command: cmd, Command: cmd,
Environments: environments, Environments: environments,
Parameters: parameters, Parameters: parameters,
//Inputs: inputs,
Inputs: inputs,
//Outputs: outputs, //Outputs: outputs,
}, },
Spec: &modelarts.SpecsC{ Spec: &modelarts.SpecsC{


+ 1
- 5
internal/svc/servicecontext.go View File

@@ -63,15 +63,11 @@ func NewServiceContext(c config.Config) *ServiceContext {


if err != nil { if err != nil {
logx.Errorf("InitPrometheus err: %v", err) logx.Errorf("InitPrometheus err: %v", err)
panic("InitSnowflake err")
panic("InitPrometheus err")
} }
httpClient := resty.New() httpClient := resty.New()
httpClient.SetTimeout(1 * time.Second) httpClient.SetTimeout(1 * time.Second)
alertClient := tracker.NewAlertClient(c.Monitoring.AlertUrl) alertClient := tracker.NewAlertClient(c.Monitoring.AlertUrl)
if err != nil {
logx.Errorf("InitPrometheus err: %v", err)
panic("InitSnowflake err")
}


//添加snowflake支持 //添加snowflake支持
err = utils.InitSnowflake(c.SnowflakeConf.MachineId) err = utils.InitSnowflake(c.SnowflakeConf.MachineId)


+ 17
- 25
internal/types/types.go View File

@@ -1307,34 +1307,26 @@ type ResourceCostRecord struct {
} }


type CommitHpcTaskReq struct { type CommitHpcTaskReq struct {
ClusterId string `json:"clusterId,optional"`
Name string `json:"name"`
Account string `json:"account,optional"`
Description string `json:"description,optional"`
TenantId int64 `json:"tenantId,optional"`
TaskId int64 `json:"taskId,optional"`
AdapterIds []string `json:"adapterIds,optional"`
MatchLabels map[string]string `json:"matchLabels,optional"`
CardCount int64 `json:"cardCount,optional"`
WorkDir string `json:"workDir,optional"` //paratera:workingDir
WallTime string `json:"wallTime,optional"`
CmdScript string `json:"cmdScript,optional"` // paratera:bootScript
AppType string `json:"appType,optional"`
AppName string `json:"appName,optional"` // paratera:jobGroupName ac:appname
Queue string `json:"queue,optional"`
NNode string `json:"nNode,optional"`
SubmitType string `json:"submitType,optional"`
StdInput string `json:"stdInput,optional"`
ClusterType string `json:"clusterType,optional"`
Partition string `json:"partition"`
UserId int64 `json:"userId,optional"`
Token string `json:"token,optional"`
UserIp string `json:"userIp,optional"`
Name string `json:"name"`
Backend string `json:"backend"` //
ClusterId string `json:"clusterId"`
App string `json:"app"`
Description string `json:"description,optional"`
OperateType string `json:"operateType,optional"`
Parameters map[string]string `json:"parameters"`
CustomParams map[string]string `json:"customParams"`
} }


type CommitHpcTaskResp struct { type CommitHpcTaskResp struct {
ClusterId int64 `json:"clusterId"`
JobId string `json:"jobId"`
Code int `json:"code"`
Data Data `json:"data"`
Msg string `json:"msg"`
TraceId string `json:"trace_id"`
}

type Data struct {
Backend string `json:"backend"`
JobInfo map[string]string `json:"jobInfo"`
} }


type HpcOverViewReq struct { type HpcOverViewReq struct {


+ 3
- 0
pkg/models/taskhpcmodel_gen.go View File

@@ -44,6 +44,8 @@ type (
ClusterId int64 `db:"cluster_id"` //集群id ClusterId int64 `db:"cluster_id"` //集群id
ClusterName string `db:"cluster_name"` //集群名称 ClusterName string `db:"cluster_name"` //集群名称
Name string `db:"name"` // 名称 Name string `db:"name"` // 名称
Backend string `db:"backend"` // 平台类型
OperateType string `db:"operate_type"` // 操作类型
Status string `db:"status"` // 状态 Status string `db:"status"` // 状态
CmdScript string `db:"cmd_script"` CmdScript string `db:"cmd_script"`
StartTime string `db:"start_time"` // 开始时间 StartTime string `db:"start_time"` // 开始时间
@@ -78,6 +80,7 @@ type (
UpdatedBy int64 `db:"updated_by"` // 更新人 UpdatedBy int64 `db:"updated_by"` // 更新人
UpdatedTime time.Time `db:"updated_time"` // 更新时间 UpdatedTime time.Time `db:"updated_time"` // 更新时间
UserId int64 `db:"user_id"` UserId int64 `db:"user_id"`
TimeLimit int64 `db:"time_limit"`
} }
) )




Loading…
Cancel
Save