Browse Source

提交任务天河代码

pull/9/head
tzwang 2 years ago
parent
commit
d1dbc2e4b6
16 changed files with 2031 additions and 78 deletions
  1. +1
    -1
      adaptor/slurm/slurmCore/api/internal/handler/routes.go
  2. +17
    -3
      adaptor/slurm/slurmCore/api/internal/logic/submitjoblogic.go
  3. +96
    -5
      adaptor/slurm/slurmCore/api/internal/types/types.go
  4. +100
    -7
      adaptor/slurm/slurmCore/api/slurmCore.api
  5. +6
    -2
      adaptor/slurm/slurmShuguang/rpc/internal/logic/submitjoblogic.go
  6. +1
    -0
      adaptor/slurm/slurmShuguang/rpc/internal/server/slurmshuguangserver.go
  7. +18
    -17
      adaptor/slurm/slurmShuguang/rpc/pb/slurmShuguang.proto
  8. +17
    -17
      adaptor/slurm/slurmShuguang/rpc/slurmShuguang/slurmShuguang.pb.go
  9. +2
    -0
      adaptor/slurm/slurmShuguang/rpc/slurmShuguang/slurmShuguang_grpc.pb.go
  10. +2
    -0
      adaptor/slurm/slurmShuguang/rpc/slurmshuguangclient/slurmshuguang.go
  11. +401
    -0
      adaptor/slurm/slurmTianhe/rpc/internal/logic/submitjoblogic.go
  12. +6
    -0
      adaptor/slurm/slurmTianhe/rpc/internal/server/slurmtianheserver.go
  13. +125
    -0
      adaptor/slurm/slurmTianhe/rpc/pb/slurmTianhe.proto
  14. +1188
    -26
      adaptor/slurm/slurmTianhe/rpc/slurmTianhe/slurmTianhe.pb.go
  15. +38
    -0
      adaptor/slurm/slurmTianhe/rpc/slurmTianhe/slurmTianhe_grpc.pb.go
  16. +13
    -0
      adaptor/slurm/slurmTianhe/rpc/slurmtianheclient/slurmtianhe.go

+ 1
- 1
adaptor/slurm/slurmCore/api/internal/handler/routes.go View File

@@ -28,7 +28,7 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Handler: scheduleTaskHandler(serverCtx),
},
{
Method: http.MethodGet,
Method: http.MethodPost,
Path: "/submitJob",
Handler: submitJobHandler(serverCtx),
},


+ 17
- 3
adaptor/slurm/slurmCore/api/internal/logic/submitjoblogic.go View File

@@ -2,6 +2,7 @@ package logic

import (
"PCM/adaptor/slurm/slurmShuguang/rpc/slurmShuguang"
"PCM/adaptor/slurm/slurmTianhe/rpc/slurmTianhe"
"PCM/common/tool"
"PCM/common/xerr"
"context"
@@ -30,23 +31,36 @@ func NewSubmitJobLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SubmitJ

func (l *SubmitJobLogic) SubmitJob(req *types.SubmitJobReq) (resp *types.SubmitJobResp, err error) {

//coreResp := &types.SubmitJobResp{}
coreResp := &types.SubmitJobResp{}

version := req.SlurmVersion
switch version {
case "shuguang":
shuguangReq := &slurmShuguang.SubmitJobReq{}
err = copier.CopyWithOption(shuguangReq, req, copier.Option{Converters: tool.Converters})
//shuguangResp, err := l.svcCtx.ShuguangRpc.SubmitJob(l.ctx, shuguangReq)
shuguangResp, err := l.svcCtx.ShuguangRpc.SubmitJob(l.ctx, shuguangReq)

if err != nil {
return nil, errors.Wrapf(xerr.NewErrMsg("Failed to submit job to Shuguang"), "Failed to submit job to Shuguang err : %v ,req:%+v", err, req)
}

coreResp.Msg = shuguangResp.Msg
coreResp.Code = shuguangResp.Code
coreResp.Data = shuguangResp.Data

case "tianhe":
tianheReq := &slurmTianhe.SubmitJobReq{}
err = copier.CopyWithOption(tianheReq, req, copier.Option{Converters: tool.Converters})
tianheResp, err := l.svcCtx.TianheRpc.SubmitJob(l.ctx, tianheReq)

if err != nil {
return nil, errors.Wrapf(xerr.NewErrMsg("Failed to submit job to Tianhe"), "Failed to submit job to Tianhe err : %v ,req:%+v", err, req)
}

coreResp.Msg = string(tianheResp.SubmitResponseMsg[0].ErrorCode)
//coreResp.Code = tianheResp.SubmitResponseMsg[0].ErrorCode
//coreResp.Data = tianheResp.SubmitResponseMsg[0].JobId
}

return resp, nil
return coreResp, nil
}

+ 96
- 5
adaptor/slurm/slurmCore/api/internal/types/types.go View File

@@ -175,11 +175,94 @@ type ScheduleTaskResp struct {
}

type SubmitJobReq struct {
SlurmVersion string `json:"slurmVersion"`
Apptype string `json:"apptype"`
Appname string `json:"appname"`
StrJobManagerID int64 `json:"strJobManagerID"`
MapAppJobInfo MapAppJobInfo `json:"mapAppJobInfo"`
SlurmVersion string `json:"slurmVersion"`
Apptype string `json:"apptype"`
Appname string `json:"appname"`
StrJobManagerID int64 `json:"strJobManagerID"`
MapAppJobInfo MapAppJobInfo `json:"mapAppJobInfo"`
Account string `json:"account,omitempty"`
Acctg_freq string `json:"acctg_freq,omitempty"`
Alloc_node string `json:"alloc_node,omitempty"`
Alloc_resp_port int32 `json:"alloc_resp_port,omitempty"`
Alloc_sid int32 `json:"alloc_sid,omitempty"`
Argc int32 `json:"argc,omitempty"`
Argv []Argv `json:"Argv,omitempty"`
Array_inx string `json:"array_inx,omitempty"`
Begin_time int64 `json:"begin_time,omitempty"`
Ckpt_interval int32 `json:"ckpt_interval,omitempty"`
Ckpt_dir string `json:"ckpt_dir,omitempty"`
Comment string `json:"comment,omitempty"`
Contiguous int32 `json:"contiguous,omitempty"`
Cpu_bind string `json:"cpu_bind,omitempty"`
Cpu_bind_type int32 `json:"cpu_bind_type,omitempty"`
Dependency string `json:"dependency,omitempty"`
End_time int64 `json:"end_time,omitempty"`
Environment []Environment `json:"Environment,omitempty"`
Env_size int32 `json:"env_size,omitempty"`
Exc_nodes string `json:"exc_nodes,omitempty"`
Features string `json:"features,omitempty"`
Gres string `json:"gres,omitempty"`
Group_id int32 `json:"group_id,omitempty"`
Immediate int32 `json:"immediate,omitempty"`
Job_id int32 `json:"job_id,omitempty"`
Kill_on_node_fail int32 `json:"kill_on_node_fail,omitempty"`
Licenses string `json:"licenses,omitempty"`
Mail_type int32 `json:"mail_type,omitempty"`
Mail_user string `json:"mail_user,omitempty"`
Mem_bind string `json:"mem_bind,omitempty"`
Mem_bind_type int32 `json:"mem_bind_type,omitempty"`
Name string `json:"name,omitempty"`
Network string `json:"network,omitempty"`
Nice int32 `json:"nice,omitempty"`
Num_tasks int32 `json:"num_tasks,omitempty"`
Open_mode int32 `json:"open_mode,omitempty"`
Other_port int32 `json:"other_port,omitempty"`
Overcommit int32 `json:"overcommit,omitempty"`
Partition string `json:"partition,omitempty"`
Plane_size int32 `json:"plane_size,omitempty"`
Priority int32 `json:"priority,omitempty"`
Profile int32 `json:"profile,omitempty"`
Qos string `json:"qos,omitempty"`
Resp_host string `json:"resp_host,omitempty"`
Req_nodes string `json:"req_nodes,omitempty"`
Requeue int32 `json:"requeue,omitempty"`
Reservation string `json:"reservation,omitempty"`
Script string `json:"script,omitempty"`
Shared int32 `json:"shared,omitempty"`
Spank_job_env_size int32 `json:"spank_job_env_size,omitempty"`
Task_dist int32 `json:"task_dist,omitempty"`
Time_limit int32 `json:"time_limit,omitempty"`
Time_min int32 `json:"time_min,omitempty"`
User_id int32 `json:"user_id,omitempty"`
Wait_all_nodes int32 `json:"wait_all_nodes,omitempty"`
Warn_signal int32 `json:"warn_signal,omitempty"`
Warn_time int32 `json:"warn_time,omitempty"`
Work_dir string `json:"work_dir,omitempty"`
Cpus_per_task int32 `json:"cpus_per_task,omitempty"`
Min_cpus int32 `json:"min_cpus,omitempty"`
Max_cpus int32 `json:"max_cpus,omitempty"`
Min_nodes int32 `json:"min_nodes,omitempty"`
Max_nodes int32 `json:"max_nodes,omitempty"`
Boards_per_node int32 `json:"boards_per_node,omitempty"`
Sockets_per_board int32 `json:"sockets_per_board,omitempty"`
Sockets_per_node int32 `json:"sockets_per_node,omitempty"`
Cores_per_socket int32 `json:"cores_per_socket,omitempty"`
Threads_per_core int32 `json:"threads_per_core,omitempty"`
Ntasks_per_node int32 `json:"ntasks_per_node,omitempty"`
Ntasks_per_socket int32 `json:"ntasks_per_socket,omitempty"`
Ntasks_per_core int32 `json:"ntasks_per_core,omitempty"`
Ntasks_per_board int32 `json:"ntasks_per_board,omitempty"`
Pn_min_cpus int32 `json:"pn_min_cpus,omitempty"`
Pn_min_memory int32 `json:"pn_min_memory,omitempty"`
Pn_min_tmp_disk int32 `json:"pn_min_tmp_disk,omitempty"`
Reboot int32 `json:"reboot,omitempty"`
Rotate int32 `json:"rotate,omitempty"`
Req_switch int32 `json:"req_switch,omitempty"`
Std_err string `json:"std_err,omitempty"`
Std_in string `json:"std_in,omitempty"`
Std_out string `json:"std_out,omitempty"`
Wait4switch int32 `json:"wait4switch,omitempty"`
Wckey string `json:"wckey,omitempty"`
}

type SubmitJobResp struct {
@@ -204,6 +287,14 @@ type MapAppJobInfo struct {
GAP_STD_ERR_FILE string `json:"GAP_STD_ERR_FILE"` //工作路径/std.err.%j
}

type Argv struct {
Argv string `json:"argv,omitempty"`
}

type Environment struct {
Environment string `json:"environment,omitempty"`
}

type GetRegionResp struct {
Code string `json:"code"`
Msg string `json:"msg"`


+ 100
- 7
adaptor/slurm/slurmCore/api/slurmCore.api View File

@@ -195,15 +195,100 @@ type (
Appname string `json:"appname"`
StrJobManagerID int64 `json:"strJobManagerID"`
MapAppJobInfo MapAppJobInfo `json:"mapAppJobInfo"`

Account string `json:"account,omitempty"`
Acctg_freq string `json:"acctg_freq,omitempty"`
Alloc_node string `json:"alloc_node,omitempty"`
Alloc_resp_port int32 `json:"alloc_resp_port,omitempty"`
Alloc_sid int32 `json:"alloc_sid,omitempty"`
Argc int32 `json:"argc,omitempty"`
Argv []Argv `json:"Argv,omitempty"`
Array_inx string `json:"array_inx,omitempty"`
Begin_time int64 `json:"begin_time,omitempty"`
Ckpt_interval int32 `json:"ckpt_interval,omitempty"`
Ckpt_dir string `json:"ckpt_dir,omitempty"`
Comment string `json:"comment,omitempty"`
Contiguous int32 `json:"contiguous,omitempty"`
Cpu_bind string `json:"cpu_bind,omitempty"`
Cpu_bind_type int32 `json:"cpu_bind_type,omitempty"`
Dependency string `json:"dependency,omitempty"`
End_time int64 `json:"end_time,omitempty"`
Environment []Environment `json:"Environment,omitempty"`
Env_size int32 `json:"env_size,omitempty"`
Exc_nodes string `json:"exc_nodes,omitempty"`
Features string `json:"features,omitempty"`
Gres string `json:"gres,omitempty"`
Group_id int32 `json:"group_id,omitempty"`
Immediate int32 `json:"immediate,omitempty"`
Job_id int32 `json:"job_id,omitempty"`
Kill_on_node_fail int32 `json:"kill_on_node_fail,omitempty"`
Licenses string `json:"licenses,omitempty"`
Mail_type int32 `json:"mail_type,omitempty"`
Mail_user string `json:"mail_user,omitempty"`
Mem_bind string `json:"mem_bind,omitempty"`
Mem_bind_type int32 `json:"mem_bind_type,omitempty"`
Name string `json:"name,omitempty"`
Network string `json:"network,omitempty"`
Nice int32 `json:"nice,omitempty"`
Num_tasks int32 `json:"num_tasks,omitempty"`
Open_mode int32 `json:"open_mode,omitempty"`
Other_port int32 `json:"other_port,omitempty"`
Overcommit int32 `json:"overcommit,omitempty"`
Partition string `json:"partition,omitempty"`
Plane_size int32 `json:"plane_size,omitempty"`
Priority int32 `json:"priority,omitempty"`
Profile int32 `json:"profile,omitempty"`
Qos string `json:"qos,omitempty"`
Resp_host string `json:"resp_host,omitempty"`
Req_nodes string `json:"req_nodes,omitempty"`
Requeue int32 `json:"requeue,omitempty"`
Reservation string `json:"reservation,omitempty"`
Script string `json:"script,omitempty"`
Shared int32 `json:"shared,omitempty"`
Spank_job_env_size int32 `json:"spank_job_env_size,omitempty"`
Task_dist int32 `json:"task_dist,omitempty"`
Time_limit int32 `json:"time_limit,omitempty"`
Time_min int32 `json:"time_min,omitempty"`
User_id int32 `json:"user_id,omitempty"`
Wait_all_nodes int32 `json:"wait_all_nodes,omitempty"`
Warn_signal int32 `json:"warn_signal,omitempty"`
Warn_time int32 `json:"warn_time,omitempty"`
Work_dir string `json:"work_dir,omitempty"`
Cpus_per_task int32 `json:"cpus_per_task,omitempty"`
Min_cpus int32 `json:"min_cpus,omitempty"`
Max_cpus int32 `json:"max_cpus,omitempty"`
Min_nodes int32 `json:"min_nodes,omitempty"`
Max_nodes int32 `json:"max_nodes,omitempty"`
Boards_per_node int32 `json:"boards_per_node,omitempty"`
Sockets_per_board int32 `json:"sockets_per_board,omitempty"`
Sockets_per_node int32 `json:"sockets_per_node,omitempty"`
Cores_per_socket int32 `json:"cores_per_socket,omitempty"`
Threads_per_core int32 `json:"threads_per_core,omitempty"`
Ntasks_per_node int32 `json:"ntasks_per_node,omitempty"`
Ntasks_per_socket int32 `json:"ntasks_per_socket,omitempty"`
Ntasks_per_core int32 `json:"ntasks_per_core,omitempty"`
Ntasks_per_board int32 `json:"ntasks_per_board,omitempty"`
Pn_min_cpus int32 `json:"pn_min_cpus,omitempty"`
Pn_min_memory int32 `json:"pn_min_memory,omitempty"`
Pn_min_tmp_disk int32 `json:"pn_min_tmp_disk,omitempty"`
Reboot int32 `json:"reboot,omitempty"`
Rotate int32 `json:"rotate,omitempty"`
Req_switch int32 `json:"req_switch,omitempty"`
Std_err string `json:"std_err,omitempty"`
Std_in string `json:"std_in,omitempty"`
Std_out string `json:"std_out,omitempty"`
Wait4switch int32 `json:"wait4switch,omitempty"`
Wckey string `json:"wckey,omitempty"`
}

submitJobResp {
Job_id int32 `json:"job_id"`
Step_id int32 `json:"step_id"`
Error_code int32 `json:"error_code"`
Code string `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
Job_id int32 `json:"job_id"`
Step_id int32 `json:"step_id"`
Error_code int32 `json:"error_code"`

Code string `json:"code"`
Msg string `json:"msg"`
Data string `json:"data"`
}

MapAppJobInfo {
@@ -218,6 +303,14 @@ type (
GAP_STD_OUT_FILE string `json:"GAP_STD_OUT_FILE"` //工作路径/std.out.%j
GAP_STD_ERR_FILE string `json:"GAP_STD_ERR_FILE"` //工作路径/std.err.%j
}

Argv {
Argv string `json:"argv,omitempty"`
}

Environment {
Environment string `json:"environment,omitempty"`
}
)

type (
@@ -258,7 +351,7 @@ service slurmcore-api {
post /scheduleTask (scheduleTaskReq) returns (scheduleTaskResp)
@handler submitJobHandler
get /submitJob (listHistoryJobReq) returns (listHistoryJobResp)
post /submitJob (listHistoryJobReq) returns (listHistoryJobResp)
@handler getRegionHandler
get /getRegion () returns (getRegionResp)


+ 6
- 2
adaptor/slurm/slurmShuguang/rpc/internal/logic/submitjoblogic.go View File

@@ -24,7 +24,11 @@ func NewSubmitJobLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SubmitJ
}

func (l *SubmitJobLogic) SubmitJob(in *slurmShuguang.SubmitJobReq) (*slurmShuguang.SubmitJobResp, error) {
// todo: add your logic here and delete this line
resp := &slurmShuguang.SubmitJobResp{}

return &slurmShuguang.SubmitJobResp{}, nil
resp.Msg = in.Appname
resp.Code = in.Appname
resp.Data = in.Appname

return resp, nil
}

+ 1
- 0
adaptor/slurm/slurmShuguang/rpc/internal/server/slurmshuguangserver.go View File

@@ -34,6 +34,7 @@ func (s *SlurmShuguangServer) ListHistoryJob(ctx context.Context, in *slurmShugu
return l.ListHistoryJob(in)
}

// Submit job
func (s *SlurmShuguangServer) SubmitJob(ctx context.Context, in *slurmShuguang.SubmitJobReq) (*slurmShuguang.SubmitJobResp, error) {
l := logic.NewSubmitJobLogic(ctx, s.svcCtx)
return l.SubmitJob(in)


+ 18
- 17
adaptor/slurm/slurmShuguang/rpc/pb/slurmShuguang.proto View File

@@ -77,29 +77,29 @@ message ListHistoryJobResp{

/******************Job(Submit) Start*************************/
message SubmitJobReq{
string apptype = 1;
string appname = 2;
int64 strJobManagerID = 3;
MapAppJobInfo mapAppJobInfo = 4;
string apptype = 1; // @gotags: copier:"Apptype"
string appname = 2; // @gotags: copier:"Appname"
int64 strJobManagerID = 3; // @gotags: copier:"StrJobManagerID"
MapAppJobInfo mapAppJobInfo = 4; // @gotags: copier:"MapAppJobInfo"
}

message SubmitJobResp{
string Code = 1;
string Msg = 2;
string Data = 3;
string Code = 1; // @gotags: copier:"Code"
string Msg = 2; // @gotags: copier:"Msg"
string Data = 3; // @gotags: copier:"Data"
}

message MapAppJobInfo{
string GAP_CMD_FILE = 1; //命令行内容
string GAP_NNODE = 2; //节点个数
string GAP_SUBMIT_TYPE = 3; //cmd(命令行模式)
string GAP_JOB_NAME = 4; //作业名称
string GAP_WORK_DIR = 5; //工作路径
string GAP_QUEUE = 6; //队列名称
string GAP_WALL_TIME = 7; //最大运行时长(HH:MM:ss)
string GAP_APPNAME = 8; //BASE(基础应用),支持填写具体的应用英文名称
string GAP_STD_OUT_FILE = 9; //工作路径/std.out.%j
string GAP_STD_ERR_FILE = 10; //工作路径/std.err.%j
string GAP_CMD_FILE = 1; // @gotags: copier:"GAP_CMD_FILE" //命令行内容
string GAP_NNODE = 2; // @gotags: copier:"GAP_NNODE" //节点个数
string GAP_SUBMIT_TYPE = 3; // @gotags: copier:"GAP_SUBMIT_TYPE" //cmd(命令行模式)
string GAP_JOB_NAME = 4; // @gotags: copier:"GAP_JOB_NAME" //作业名称
string GAP_WORK_DIR = 5; // @gotags: copier:"GAP_WORK_DIR" //工作路径
string GAP_QUEUE = 6; // @gotags: copier:"GAP_QUEUE" //队列名称
string GAP_WALL_TIME = 7; // @gotags: copier:"GAP_WALL_TIME" //最大运行时长(HH:MM:ss)
string GAP_APPNAME = 8; // @gotags: copier:"GAP_APPNAME" //BASE(基础应用),支持填写具体的应用英文名称
string GAP_STD_OUT_FILE = 9; // @gotags: copier:"GAP_STD_OUT_FILE" //工作路径/std.out.%j
string GAP_STD_ERR_FILE = 10; // @gotags: copier:"GAP_STD_ERR_FILE" //工作路径/std.err.%j
}
/******************Job(Submit) End*************************/

@@ -113,6 +113,7 @@ service SlurmShuguang {
//ListHistoryJob list all history jobs
rpc ListHistoryJob(ListHistoryJobReq) returns (ListHistoryJobResp);

//Submit job
rpc SubmitJob(SubmitJobReq) returns (SubmitJobResp);

}

+ 17
- 17
adaptor/slurm/slurmShuguang/rpc/slurmShuguang/slurmShuguang.pb.go View File

@@ -637,10 +637,10 @@ type SubmitJobReq struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Apptype string `protobuf:"bytes,1,opt,name=apptype,proto3" json:"apptype,omitempty"`
Appname string `protobuf:"bytes,2,opt,name=appname,proto3" json:"appname,omitempty"`
StrJobManagerID int64 `protobuf:"varint,3,opt,name=strJobManagerID,proto3" json:"strJobManagerID,omitempty"`
MapAppJobInfo *MapAppJobInfo `protobuf:"bytes,4,opt,name=mapAppJobInfo,proto3" json:"mapAppJobInfo,omitempty"`
Apptype string `protobuf:"bytes,1,opt,name=apptype,proto3" json:"apptype,omitempty"` // @gotags: copier:"Apptype"
Appname string `protobuf:"bytes,2,opt,name=appname,proto3" json:"appname,omitempty"` // @gotags: copier:"Appname"
StrJobManagerID int64 `protobuf:"varint,3,opt,name=strJobManagerID,proto3" json:"strJobManagerID,omitempty"` // @gotags: copier:"StrJobManagerID"
MapAppJobInfo *MapAppJobInfo `protobuf:"bytes,4,opt,name=mapAppJobInfo,proto3" json:"mapAppJobInfo,omitempty"` // @gotags: copier:"MapAppJobInfo"
}

func (x *SubmitJobReq) Reset() {
@@ -708,9 +708,9 @@ type SubmitJobResp struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Code string `protobuf:"bytes,1,opt,name=Code,proto3" json:"Code,omitempty"`
Msg string `protobuf:"bytes,2,opt,name=Msg,proto3" json:"Msg,omitempty"`
Data string `protobuf:"bytes,3,opt,name=Data,proto3" json:"Data,omitempty"`
Code string `protobuf:"bytes,1,opt,name=Code,proto3" json:"Code,omitempty"` // @gotags: copier:"Code"
Msg string `protobuf:"bytes,2,opt,name=Msg,proto3" json:"Msg,omitempty"` // @gotags: copier:"Msg"
Data string `protobuf:"bytes,3,opt,name=Data,proto3" json:"Data,omitempty"` // @gotags: copier:"Data"
}

func (x *SubmitJobResp) Reset() {
@@ -771,16 +771,16 @@ type MapAppJobInfo struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

GAP_CMD_FILE string `protobuf:"bytes,1,opt,name=GAP_CMD_FILE,json=GAPCMDFILE,proto3" json:"GAP_CMD_FILE,omitempty"` //命令行内容
GAP_NNODE string `protobuf:"bytes,2,opt,name=GAP_NNODE,json=GAPNNODE,proto3" json:"GAP_NNODE,omitempty"` //节点个数
GAP_SUBMIT_TYPE string `protobuf:"bytes,3,opt,name=GAP_SUBMIT_TYPE,json=GAPSUBMITTYPE,proto3" json:"GAP_SUBMIT_TYPE,omitempty"` //cmd(命令行模式)
GAP_JOB_NAME string `protobuf:"bytes,4,opt,name=GAP_JOB_NAME,json=GAPJOBNAME,proto3" json:"GAP_JOB_NAME,omitempty"` //作业名称
GAP_WORK_DIR string `protobuf:"bytes,5,opt,name=GAP_WORK_DIR,json=GAPWORKDIR,proto3" json:"GAP_WORK_DIR,omitempty"` //工作路径
GAP_QUEUE string `protobuf:"bytes,6,opt,name=GAP_QUEUE,json=GAPQUEUE,proto3" json:"GAP_QUEUE,omitempty"` //队列名称
GAP_WALL_TIME string `protobuf:"bytes,7,opt,name=GAP_WALL_TIME,json=GAPWALLTIME,proto3" json:"GAP_WALL_TIME,omitempty"` //最大运行时长(HH:MM:ss)
GAP_APPNAME string `protobuf:"bytes,8,opt,name=GAP_APPNAME,json=GAPAPPNAME,proto3" json:"GAP_APPNAME,omitempty"` //BASE(基础应用),支持填写具体的应用英文名称
GAP_STD_OUT_FILE string `protobuf:"bytes,9,opt,name=GAP_STD_OUT_FILE,json=GAPSTDOUTFILE,proto3" json:"GAP_STD_OUT_FILE,omitempty"` //工作路径/std.out.%j
GAP_STD_ERR_FILE string `protobuf:"bytes,10,opt,name=GAP_STD_ERR_FILE,json=GAPSTDERRFILE,proto3" json:"GAP_STD_ERR_FILE,omitempty"` //工作路径/std.err.%j
GAP_CMD_FILE string `protobuf:"bytes,1,opt,name=GAP_CMD_FILE,json=GAPCMDFILE,proto3" json:"GAP_CMD_FILE,omitempty"` // @gotags: copier:"GAP_CMD_FILE" //命令行内容
GAP_NNODE string `protobuf:"bytes,2,opt,name=GAP_NNODE,json=GAPNNODE,proto3" json:"GAP_NNODE,omitempty"` // @gotags: copier:"GAP_NNODE" //节点个数
GAP_SUBMIT_TYPE string `protobuf:"bytes,3,opt,name=GAP_SUBMIT_TYPE,json=GAPSUBMITTYPE,proto3" json:"GAP_SUBMIT_TYPE,omitempty"` // @gotags: copier:"GAP_SUBMIT_TYPE" //cmd(命令行模式)
GAP_JOB_NAME string `protobuf:"bytes,4,opt,name=GAP_JOB_NAME,json=GAPJOBNAME,proto3" json:"GAP_JOB_NAME,omitempty"` // @gotags: copier:"GAP_JOB_NAME" //作业名称
GAP_WORK_DIR string `protobuf:"bytes,5,opt,name=GAP_WORK_DIR,json=GAPWORKDIR,proto3" json:"GAP_WORK_DIR,omitempty"` // @gotags: copier:"GAP_WORK_DIR" //工作路径
GAP_QUEUE string `protobuf:"bytes,6,opt,name=GAP_QUEUE,json=GAPQUEUE,proto3" json:"GAP_QUEUE,omitempty"` // @gotags: copier:"GAP_QUEUE" //队列名称
GAP_WALL_TIME string `protobuf:"bytes,7,opt,name=GAP_WALL_TIME,json=GAPWALLTIME,proto3" json:"GAP_WALL_TIME,omitempty"` // @gotags: copier:"GAP_WALL_TIME" //最大运行时长(HH:MM:ss)
GAP_APPNAME string `protobuf:"bytes,8,opt,name=GAP_APPNAME,json=GAPAPPNAME,proto3" json:"GAP_APPNAME,omitempty"` // @gotags: copier:"GAP_APPNAME" //BASE(基础应用),支持填写具体的应用英文名称
GAP_STD_OUT_FILE string `protobuf:"bytes,9,opt,name=GAP_STD_OUT_FILE,json=GAPSTDOUTFILE,proto3" json:"GAP_STD_OUT_FILE,omitempty"` // @gotags: copier:"GAP_STD_OUT_FILE" //工作路径/std.out.%j
GAP_STD_ERR_FILE string `protobuf:"bytes,10,opt,name=GAP_STD_ERR_FILE,json=GAPSTDERRFILE,proto3" json:"GAP_STD_ERR_FILE,omitempty"` // @gotags: copier:"GAP_STD_ERR_FILE" //工作路径/std.err.%j
}

func (x *MapAppJobInfo) Reset() {


+ 2
- 0
adaptor/slurm/slurmShuguang/rpc/slurmShuguang/slurmShuguang_grpc.pb.go View File

@@ -26,6 +26,7 @@ type SlurmShuguangClient interface {
ListJob(ctx context.Context, in *ListJobReq, opts ...grpc.CallOption) (*ListJobResp, error)
// ListHistoryJob list all history jobs
ListHistoryJob(ctx context.Context, in *ListHistoryJobReq, opts ...grpc.CallOption) (*ListHistoryJobResp, error)
// Submit job
SubmitJob(ctx context.Context, in *SubmitJobReq, opts ...grpc.CallOption) (*SubmitJobResp, error)
}

@@ -72,6 +73,7 @@ type SlurmShuguangServer interface {
ListJob(context.Context, *ListJobReq) (*ListJobResp, error)
// ListHistoryJob list all history jobs
ListHistoryJob(context.Context, *ListHistoryJobReq) (*ListHistoryJobResp, error)
// Submit job
SubmitJob(context.Context, *SubmitJobReq) (*SubmitJobResp, error)
mustEmbedUnimplementedSlurmShuguangServer()
}


+ 2
- 0
adaptor/slurm/slurmShuguang/rpc/slurmshuguangclient/slurmshuguang.go View File

@@ -28,6 +28,7 @@ type (
ListJob(ctx context.Context, in *ListJobReq, opts ...grpc.CallOption) (*ListJobResp, error)
// ListHistoryJob list all history jobs
ListHistoryJob(ctx context.Context, in *ListHistoryJobReq, opts ...grpc.CallOption) (*ListHistoryJobResp, error)
// Submit job
SubmitJob(ctx context.Context, in *SubmitJobReq, opts ...grpc.CallOption) (*SubmitJobResp, error)
}

@@ -54,6 +55,7 @@ func (m *defaultSlurmShuguang) ListHistoryJob(ctx context.Context, in *ListHisto
return client.ListHistoryJob(ctx, in, opts...)
}

// Submit job
func (m *defaultSlurmShuguang) SubmitJob(ctx context.Context, in *SubmitJobReq, opts ...grpc.CallOption) (*SubmitJobResp, error) {
client := slurmShuguang.NewSlurmShuguangClient(m.cli.Conn())
return client.SubmitJob(ctx, in, opts...)


+ 401
- 0
adaptor/slurm/slurmTianhe/rpc/internal/logic/submitjoblogic.go View File

@@ -0,0 +1,401 @@
package logic

/*
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include<slurm/slurm.h>
#include<slurm/slurm_errno.h>

struct submit_response_msg *submit_job(struct job_descriptor *desc)
{

struct submit_response_msg *resp_msg;
if (slurm_submit_batch_job(desc,
&resp_msg)) {
return NULL;
}
return resp_msg;

}

void free_submit_response_msg(struct submit_response_msg *msg)
{
slurm_free_submit_response_response_msg(msg);
}

int update_job (struct job_descriptor *msg) {

return slurm_update_job (msg);
}
*/
import "C"

import (
"context"
"fmt"
"unsafe"

"PCM/adaptor/slurm/slurmTianhe/rpc/internal/svc"
"PCM/adaptor/slurm/slurmTianhe/rpc/slurmTianhe"

"github.com/zeromicro/go-zero/core/logx"
)

type SubmitJobLogic struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}

func NewSubmitJobLogic(ctx context.Context, svcCtx *svc.ServiceContext) *SubmitJobLogic {
return &SubmitJobLogic{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}

// Submit job
func (l *SubmitJobLogic) SubmitJob(in *slurmTianhe.SubmitJobReq) (*slurmTianhe.SubmitJobResp, error) {
var cStruct C.struct_job_descriptor

C.slurm_init_job_desc_msg(&cStruct)
if in.Account != "" {
accountS := C.CString(in.Account)
defer C.free(unsafe.Pointer(accountS))
cStruct.account = accountS
}
if in.AcctgFreq != "" {
acctgFreqS := C.CString(in.AcctgFreq)
defer C.free(unsafe.Pointer(acctgFreqS))
cStruct.acctg_freq = acctgFreqS
}
if in.AllocNode != "" {
allocNodeS := C.CString(in.AllocNode)
defer C.free(unsafe.Pointer(allocNodeS))
cStruct.alloc_node = allocNodeS
}
if in.AllocRespPort != 0 {
cStruct.alloc_resp_port = C.uint16_t(in.AllocRespPort)
}
if in.AllocSid != 0 {
cStruct.alloc_sid = C.uint32_t(in.AllocSid)
}
if len(in.Argv) > 0 {
cStruct.argc = C.uint32_t(len(in.Argv))
cArray := C.malloc(C.size_t(C.size_t(len(in.Argv)) * C.size_t(unsafe.Sizeof(uintptr(0)))))
a := (*[1<<30 - 1]*C.char)(cArray)
for i := 0; i < len(in.Argv); i++ {
a[i] = C.CString(in.Argv[i].String())
}
cStruct.argv = (**C.char)(cArray)
fmt.Printf("test\n")
}

if in.ArrayInx != "" {
arrayInxS := C.CString(in.ArrayInx)
defer C.free(unsafe.Pointer(arrayInxS))
cStruct.array_inx = arrayInxS
}
if in.BeginTime != 0 {
cStruct.begin_time = C.int64_t(in.BeginTime)
}
if in.CkptInterval != 0 {
cStruct.ckpt_interval = C.uint16_t(in.CkptInterval)
}
if in.CkptDir != "" {
ckptDirS := C.CString(in.CkptDir)
defer C.free(unsafe.Pointer(ckptDirS))
cStruct.ckpt_dir = ckptDirS
}
if in.Comment != "" {
commentS := C.CString(in.Comment)
defer C.free(unsafe.Pointer(commentS))
cStruct.comment = commentS
}
if in.Contiguous != 0 {
cStruct.contiguous = C.uint16_t(in.Contiguous)
}
if in.CpuBind != "" {
cpuBindS := C.CString(in.CpuBind)
defer C.free(unsafe.Pointer(cpuBindS))
cStruct.cpu_bind = cpuBindS
}
if in.CpuBindType != 0 {
cStruct.cpu_bind_type = C.uint16_t(in.CpuBindType)
}
if in.Dependency != "" {
dependencyS := C.CString(in.Dependency)
defer C.free(unsafe.Pointer(dependencyS))
cStruct.dependency = dependencyS
}
if in.EndTime != 0 {
cStruct.end_time = C.int64_t(in.EndTime)
}
if len(in.Environment) > 0 {
cStruct.env_size = C.uint32_t(len(in.Environment))
cArray := C.malloc(C.size_t(C.size_t(len(in.Environment)) * C.size_t(unsafe.Sizeof(uintptr(0)))))
a := (*[1<<30 - 1]*C.char)(cArray)
for i := 0; i < len(in.Environment); i++ {
a[i] = C.CString(in.Environment[i].String())
defer C.free(unsafe.Pointer(a[i]))
}
cStruct.environment = (**C.char)(cArray)
} else {
cStruct.env_size = 1
cArray := C.malloc(C.size_t(C.size_t(1) * C.size_t(unsafe.Sizeof(uintptr(0)))))
a := (*[1<<30 - 1]*C.char)(cArray)
a[0] = C.CString("SLURM_GO_JOB=TRUE")
defer C.free(unsafe.Pointer(a[0]))
cStruct.environment = (**C.char)(cArray)

}
if in.ExcNodes != "" {
excNodesS := C.CString(in.ExcNodes)
defer C.free(unsafe.Pointer(excNodesS))
cStruct.exc_nodes = excNodesS
}
if in.Features != "" {
featuresS := C.CString(in.Features)
defer C.free(unsafe.Pointer(featuresS))
cStruct.features = featuresS
}
if in.GroupId != 0 {
cStruct.group_id = C.uint32_t(in.GroupId)
}
if in.Immediate != 0 {
cStruct.immediate = C.uint16_t(in.Immediate)
}
if in.JobId != 0 {
cStruct.job_id = C.uint32_t(in.JobId)
}
if in.KillOnNodeFail != 0 {
cStruct.kill_on_node_fail = C.uint16_t(in.KillOnNodeFail)
}
if in.Licenses != "" {
licensesS := C.CString(in.Licenses)
defer C.free(unsafe.Pointer(licensesS))
cStruct.licenses = licensesS
}
if in.MailType != 0 {
cStruct.mail_type = C.uint16_t(in.MailType)
}
if in.MailUser != "" {
mailUserS := C.CString(in.MailUser)
defer C.free(unsafe.Pointer(mailUserS))
cStruct.mail_user = mailUserS
}
if in.MemBind != "" {
memBindS := C.CString(in.MemBind)
defer C.free(unsafe.Pointer(memBindS))
cStruct.mem_bind = memBindS
}
if in.MemBindType != 0 {
cStruct.mem_bind_type = C.uint16_t(in.MemBindType)
}
if in.Name != "" {
nameS := C.CString(in.Name)
defer C.free(unsafe.Pointer(nameS))
cStruct.name = nameS
}
if in.Network != "" {
networkS := C.CString(in.Network)
defer C.free(unsafe.Pointer(networkS))
cStruct.network = networkS
}
if in.Nice != 0 {
cStruct.nice = C.uint16_t(in.Nice)
}
if in.NumTasks != 0 {
cStruct.num_tasks = C.uint32_t(in.NumTasks)
}
if in.OpenMode != 0 {
cStruct.open_mode = C.uint8_t(in.OpenMode)
}
if in.OtherPort != 0 {
cStruct.other_port = C.uint16_t(in.OtherPort)
}
if in.Overcommit != 0 {
cStruct.overcommit = C.uint8_t(in.Overcommit)
}
if in.Partition != "" {
partitionS := C.CString(in.Partition)
defer C.free(unsafe.Pointer(partitionS))
cStruct.partition = partitionS
}
if in.PlaneSize != 0 {
cStruct.plane_size = C.uint16_t(in.PlaneSize)
}
if in.Priority != 0 {
cStruct.priority = C.uint32_t(in.Priority)
}
if in.Profile != 0 {
cStruct.profile = C.uint32_t(in.Profile)
}
if in.Qos != "" {
qosS := C.CString(in.Qos)
defer C.free(unsafe.Pointer(qosS))
cStruct.qos = qosS
}
if in.Reboot != 0 {
cStruct.reboot = C.uint16_t(in.Reboot)
}
if in.RespHost != "" {
respHostS := C.CString(in.RespHost)
defer C.free(unsafe.Pointer(respHostS))
cStruct.resp_host = respHostS
}
if in.ReqNodes != "" {
reqNodesS := C.CString(in.ReqNodes)
defer C.free(unsafe.Pointer(reqNodesS))
cStruct.req_nodes = reqNodesS
}
if in.Requeue != 0 {
cStruct.requeue = C.uint16_t(in.Requeue)
}
if in.Reservation != "" {
reservationS := C.CString(in.Reservation)
defer C.free(unsafe.Pointer(reservationS))
cStruct.reservation = reservationS
}
if in.Script != "" {
scriptS := C.CString(in.Script)
defer C.free(unsafe.Pointer(scriptS))
cStruct.script = scriptS
}
if in.Shared != 0 {
cStruct.shared = C.uint16_t(in.Shared)
}
if in.SpankJobEnvSize != 0 {
cStruct.spank_job_env_size = C.uint32_t(in.SpankJobEnvSize)
}
if in.TaskDist != 0 {
cStruct.task_dist = C.uint16_t(in.TaskDist)
}
if in.TimeLimit != 0 {
cStruct.time_limit = C.uint32_t(in.TimeLimit)
}
if in.TimeMin != 0 {
cStruct.time_min = C.uint32_t(in.TimeMin)
}
//if go_struct.User_id != 0 {
// c_struct.user_id = C.uint32_t(go_struct.User_id)
//}
cStruct.user_id = C.uint32_t(in.UserId)

if in.WaitAllNodes != 0 {
cStruct.wait_all_nodes = C.uint16_t(in.WaitAllNodes)
}
if in.WarnSignal != 0 {
cStruct.warn_signal = C.uint16_t(in.WarnSignal)
}
if in.WarnTime != 0 {
cStruct.warn_time = C.uint16_t(in.WarnTime)
}
if in.WorkDir != "" {
workDirS := C.CString(in.WorkDir)
defer C.free(unsafe.Pointer(workDirS))
cStruct.work_dir = workDirS
}
if in.CpusPerTask != 0 {
cStruct.cpus_per_task = C.uint16_t(in.CpusPerTask)
}
if in.MinCpus != 0 {
cStruct.min_cpus = C.uint32_t(in.MinCpus)
}
if in.MaxCpus != 0 {
cStruct.max_cpus = C.uint32_t(in.MaxCpus)
}
if in.MinNodes != 0 {
cStruct.min_nodes = C.uint32_t(in.MinNodes)
}
if in.MaxNodes != 0 {
cStruct.max_nodes = C.uint32_t(in.MaxNodes)
}
if in.BoardsPerNode != 0 {
cStruct.boards_per_node = C.uint16_t(in.BoardsPerNode)
}
if in.SocketsPerBoard != 0 {
cStruct.sockets_per_board = C.uint16_t(in.SocketsPerBoard)
}
if in.SocketsPerNode != 0 {
cStruct.sockets_per_node = C.uint16_t(in.SocketsPerNode)
}
if in.CoresPerSocket != 0 {
cStruct.cores_per_socket = C.uint16_t(in.CoresPerSocket)
}
if in.ThreadsPerCore != 0 {
cStruct.threads_per_core = C.uint16_t(in.ThreadsPerCore)
}
if in.NtasksPerNode != 0 {
cStruct.ntasks_per_node = C.uint16_t(in.NtasksPerNode)
}
if in.NtasksPerSocket != 0 {
cStruct.ntasks_per_socket = C.uint16_t(in.NtasksPerSocket)
}
if in.NtasksPerCore != 0 {
cStruct.ntasks_per_core = C.uint16_t(in.NtasksPerCore)
}
if in.NtasksPerBoard != 0 {
cStruct.ntasks_per_board = C.uint16_t(in.NtasksPerBoard)
}
if in.PnMinCpus != 0 {
cStruct.pn_min_cpus = C.uint16_t(in.PnMinCpus)
}
if in.PnMinMemory != 0 {
cStruct.pn_min_memory = C.uint32_t(in.PnMinMemory)
}
if in.PnMinTmpDisk != 0 {
cStruct.pn_min_tmp_disk = C.uint32_t(in.PnMinTmpDisk)
}
if in.ReqSwitch != 0 {
cStruct.req_switch = C.uint32_t(in.ReqSwitch)
}
if in.StdErr != "" {
stdErrS := C.CString(in.StdErr)
defer C.free(unsafe.Pointer(stdErrS))
cStruct.std_err = stdErrS
}
if in.StdIn != "" {
stdInS := C.CString(in.StdIn)
defer C.free(unsafe.Pointer(stdInS))
cStruct.std_in = stdInS
}
if in.StdOut != "" {
stdOutS := C.CString(in.StdOut)
defer C.free(unsafe.Pointer(stdOutS))
cStruct.std_out = stdOutS
}

if in.Wait4Switch != 0 {
cStruct.wait4switch = C.uint32_t(in.Wait4Switch)
}
if in.Wckey != "" {
wckeyS := C.CString(in.Wckey)
defer C.free(unsafe.Pointer(wckeyS))
cStruct.wckey = wckeyS
}

cMsg := C.submit_job(&cStruct)

defer C.free_submit_response_msg(cMsg)
if cMsg == nil {
goMsg := slurmTianhe.SubmitJobResp{}
goMsg.SubmitResponseMsg[0].JobId = 1<<31 - 1
goMsg.SubmitResponseMsg[0].ErrorCode = uint32(C.slurm_get_errno())
return &goMsg, nil
}
goMsg := submitResponseMsgConvertCToGo(cMsg)

return &goMsg, nil
}

func submitResponseMsgConvertCToGo(cStruct *C.struct_submit_response_msg) slurmTianhe.SubmitJobResp {
var goStruct slurmTianhe.SubmitJobResp

goStruct.SubmitResponseMsg[0].JobId = uint32(cStruct.job_id)
goStruct.SubmitResponseMsg[0].StepId = uint32(cStruct.step_id)
goStruct.SubmitResponseMsg[0].ErrorCode = uint32(cStruct.error_code)
return goStruct
}

+ 6
- 0
adaptor/slurm/slurmTianhe/rpc/internal/server/slurmtianheserver.go View File

@@ -33,3 +33,9 @@ func (s *SlurmTianheServer) ListHistoryJob(ctx context.Context, in *slurmTianhe.
l := logic.NewListHistoryJobLogic(ctx, s.svcCtx)
return l.ListHistoryJob(in)
}

// Submit job
func (s *SlurmTianheServer) SubmitJob(ctx context.Context, in *slurmTianhe.SubmitJobReq) (*slurmTianhe.SubmitJobResp, error) {
l := logic.NewSubmitJobLogic(ctx, s.svcCtx)
return l.SubmitJob(in)
}

+ 125
- 0
adaptor/slurm/slurmTianhe/rpc/pb/slurmTianhe.proto View File

@@ -150,6 +150,127 @@ message ListHistoryJobResp{
/******************History Job End*************************/


/******************Job(Submit) Start*************************/
message SubmitJobReq{
string account =1; // @gotags: copier:"account" /* charge to specified account */
string acctg_freq =2; // @gotags: copier:"acctg_freq" /* accounting polling intervals (seconds) */
string alloc_node=3; // @gotags: copier:"alloc_node" /* node making resource allocation request * NOTE: Normally set by slurm_submit* or * slurm_allocate* function */
uint32 alloc_resp_port=4; // @gotags: copier:"alloc_resp_port" /* port to send allocation confirmation to */
uint32 alloc_sid =5; // @gotags: copier:"alloc_sid" /* local sid making resource allocation request * NOTE: Normally set by slurm_submit* or * slurm_allocate* function * NOTE: Also used for update flags, see * ALLOC_SID_* flags */
uint32 argc =6; // @gotags: copier:"argc" /* number of arguments to the script */
repeated Argv argv = 7; // @gotags: copier:"Argv" /* arguments to the script */
string array_inx =8; // @gotags: copier:"array_inx" /* job array index values */ //void *array_bitmap; /* NOTE: Set by slurmctld */
int64 begin_time = 9; // @gotags: copier:"begin_time" /* delay initiation until this time */
uint32 ckpt_interval=10; // @gotags: copier:"ckpt_interval" /* periodically checkpoint this job */
string ckpt_dir =11; // @gotags: copier:"ckpt_dir" /* directory to store checkpoint images */
string comment =12; // @gotags: copier:"comment" /* arbitrary comment (used by Moab scheduler) */
uint32 contiguous=13; // @gotags: copier:"contiguous" /* 1 if job requires contiguous nodes,* 0 otherwise,default=0 */
string cpu_bind=14; // @gotags: copier:"cpu_bind" /* binding map for map/mask_cpu */
uint32 cpu_bind_type=15; // @gotags: copier:"cpu_bind_type" /* see cpu_bind_type_t */
string dependency =16; // @gotags: copier:"dependency" /* synchronize job execution with other jobs */
int64 end_time=17; // @gotags: copier:"end_time" /* time by which job must complete, used for * job update only now, possible deadline * scheduling in the future */
repeated Environment environment=18; // @gotags: copier:"Environment" /* environment variables to set for job, * name=value pairs, one per line */
uint32 env_size =19; // @gotags: copier:"env_size" /* element count in environment */
string exc_nodes =20; // @gotags: copier:"exc_nodes" /* comma separated list of nodes excluded * from job's allocation, default NONE */
string features =21; // @gotags: copier:"features" /* comma separated list of required features, * default NONE */
string gres =22; // @gotags: copier:"gres" /* comma separated list of required generic * resources, default NONE */
uint32 group_id =23; // @gotags: copier:"group_id" /* group to assume, if run as root. */
uint32 immediate=24; // @gotags: copier:"immediate" /* 1 if allocate to run or fail immediately, * 0 if to be queued awaiting resources */
uint32 job_id =25; // @gotags: copier:"job_id" /* job ID, default set by SLURM */
uint32 kill_on_node_fail=26; // @gotags: copier:"kill_on_node_fail" /* 1 if node failure to kill job, * 0 otherwise,default=1 */
string licenses=27; // @gotags: copier:"licenses" /* licenses required by the job */
uint32 mail_type=28; // @gotags: copier:"mail_type" /* see MAIL_JOB_ definitions above */
string mail_user =29; // @gotags: copier:"mail_user" /* user to receive notification */
string mem_bind =30; // @gotags: copier:"mem_bind" /* binding map for map/mask_cpu */
uint32 mem_bind_type=31; // @gotags: copier:"mem_bind_type" /* see mem_bind_type_t */
string name =32; // @gotags: copier:"name" /* name of the job, default "" */
string network=33; // @gotags: copier:"network" /* network use spec */
uint32 nice =34; // @gotags: copier:"nice" /* requested priority change, * NICE_OFFSET == no change */
uint32 num_tasks=35; // @gotags: copier:"num_tasks" /* number of tasks to be started, * for batch only */
uint32 open_mode=36; // @gotags: copier:"open_mode" /* out/err open mode truncate or append, * see OPEN_MODE_* */
uint32 other_port=37; // @gotags: copier:"other_port" /* port to send various notification msg to */
uint32 overcommit =38; // @gotags: copier:"overcommit" /* over subscribe resources, for batch only */
string partition=39; // @gotags: copier:"partition" /* name of requested partition, * default in SLURM config */
uint32 plane_size =40; // @gotags: copier:"plane_size" /* plane size when task_dist = SLURM_DIST_PLANE */
uint32 priority =41; // @gotags: copier:"priority" /* relative priority of the job, * explicitly set only for user root, * 0 == held (don't initiate) */
uint32 profile =42; // @gotags: copier:"profile" /* Level of acct_gather_profile {all | none} */
string qos =43; // @gotags: copier:"qos" /* Quality of Service */
string resp_host=44; // @gotags: copier:"resp_host" /* NOTE: Set by slurmctld */
string req_nodes=45; // @gotags: copier:"req_nodes" /* comma separated list of required nodes * default NONE */
uint32 requeue=46; // @gotags: copier:"requeue" /* enable or disable job requeue option */
string reservation=47; // @gotags: copier:"reservation" /* name of reservation to use */
string script=48; // @gotags: copier:"script" /* the actual job script, default NONE */
uint32 shared =49; // @gotags: copier:"shared" /* 1 if job can share nodes with other jobs, * 0 if job needs exclusive access to the node, * or NO_VAL to accept the system default. * SHARED_FORCE to eliminate user control. */ //char **spank_job_env; environment variables for job prolog/epilog // * scripts as set by SPANK plugins
uint32 spank_job_env_size=50; // @gotags: copier:"spank_job_env_size" /* element count in spank_env */
uint32 task_dist =51; // @gotags: copier:"task_dist" /* see enum task_dist_state */
uint32 time_limit =52; // @gotags: copier:"time_limit" /* maximum run time in minutes, default is * partition limit */
uint32 time_min =53; // @gotags: copier:"time_min" /* minimum run time in minutes, default is * time_limit */
uint32 user_id=54; // @gotags: copier:"user_id" /* set only if different from current UID, * can only be explicitly set by user root */
uint32 wait_all_nodes=55; // @gotags: copier:"wait_all_nodes" /* 0 to start job immediately after allocation * 1 to start job after all nodes booted * or NO_VAL to use system default */
uint32 warn_signal=56; // @gotags: copier:"warn_signal" /* signal to send when approaching end time */
uint32 warn_time=57; // @gotags: copier:"warn_time" /* time before end to send signal (seconds) */
string work_dir =58; // @gotags: copier:"work_dir" /* pathname of working directory */ /* job constraints: */
uint32 cpus_per_task=59; // @gotags: copier:"cpus_per_task" /* number of processors required for * each task */
uint32 min_cpus =60; // @gotags: copier:"min_cpus" /* minimum number of processors required, * default=0 */
uint32 max_cpus=61; // @gotags: copier:"max_cpus" /* maximum number of processors required, * default=0 */
uint32 min_nodes=62; // @gotags: copier:"min_nodes" /* minimum number of nodes required by job, * default=0 */
uint32 max_nodes=63; // @gotags: copier:"max_nodes" /* maximum number of nodes usable by job, * default=0 */
uint32 boards_per_node =64; // @gotags: copier:"boards_per_node" /* boards per node required by job */
uint32 sockets_per_board=65; // @gotags: copier:"sockets_per_board" /* sockets per board required by job */
uint32 sockets_per_node =66; // @gotags: copier:"sockets_per_node" /* sockets per node required by job */
uint32 cores_per_socket=67; // @gotags: copier:"cores_per_socket" /* cores per socket required by job */
uint32 threads_per_core=68; // @gotags: copier:"threads_per_core" /* threads per core required by job */
uint32 ntasks_per_node =69; // @gotags: copier:"ntasks_per_node" /* number of tasks to invoke on each node */
uint32 ntasks_per_socket=70; // @gotags: copier:"ntasks_per_socket" /* number of tasks to invoke on * each socket */
uint32 ntasks_per_core =71; // @gotags: copier:"ntasks_per_core" /* number of tasks to invoke on each core */
uint32 ntasks_per_board=72; // @gotags: copier:"ntasks_per_board" /* number of tasks to invoke on each board */
uint32 pn_min_cpus =73; // @gotags: copier:"pn_min_cpus" /* minimum # CPUs per node, default=0 */
uint32 pn_min_memory=74; // @gotags: copier:"pn_min_memory" /* minimum real memory per node OR * real memory per CPU | MEM_PER_CPU, * default=0 (no limit) */
uint32 pn_min_tmp_disk =75; // @gotags: copier:"pn_min_tmp_disk" /* minimum tmp disk per node, * default=0 */
/*
* The following parameters are only meaningful on a Blue Gene
* system at present. Some will be of value on other system. Don't remove these
* they are needed for LCRM and others that can't talk to the opaque data type
* select_jobinfo.
*/
//uint16_t geometry[HIGHEST_DIMENSIONS]; node count in various // * dimensions, e.g. X, Y, and Z
//uint16_t conn_type[HIGHEST_DIMENSIONS]; see enum connection_type
uint32 reboot=76; // @gotags: copier:"reboot" /* force node reboot before startup */
uint32 rotate=77; // @gotags: copier:"rotate" /* permit geometry rotation if set */
//char *blrtsimage; /* BlrtsImage for block */
//char *linuximage; /* LinuxImage for block */
//char *mloaderimage; /* MloaderImage for block */
//char *ramdiskimage; /* RamDiskImage for block */
/* End of Blue Gene specific values */

uint32 req_switch =78; // @gotags: copier:"req_switch" /* Minimum number of switches */ //dynamic_plugin_data_t *select_jobinfo; /* opaque data type, // * SLURM internal use only */
string std_err=79; // @gotags: copier:"std_err" /* pathname of stderr */
string std_in =80; // @gotags: copier:"std_in" /* pathname of stdin */
string std_out=81; // @gotags: copier:"std_out" /* pathname of stdout */
uint32 wait4switch=82; // @gotags: copier:"wait4switch" /* Maximum time to wait for minimum switches */
string wckey =83; // @gotags: copier:"wckey" /* wckey for job */
}

message SubmitJobResp{
repeated SubmitResponseMsg submit_response_msg = 1;
}

message SubmitResponseMsg{
uint32 job_id = 1;
uint32 step_id =2;
uint32 error_code=3;
}

message Argv{
string argv =1; // @gotags: copier:"argv"
}

message Environment{
string environment =1; // @gotags: copier:"environment"
}
/******************Job(Submit) End*************************/


// Slurm Services for Tianhe Branch
service slurmTianhe {

@@ -159,4 +280,8 @@ service slurmTianhe {
//ListHistoryJob list all history jobs
rpc ListHistoryJob(ListHistoryJobReq) returns (ListHistoryJobResp);

//Submit job
rpc SubmitJob(SubmitJobReq) returns (SubmitJobResp);


}

+ 1188
- 26
adaptor/slurm/slurmTianhe/rpc/slurmTianhe/slurmTianhe.pb.go
File diff suppressed because it is too large
View File


+ 38
- 0
adaptor/slurm/slurmTianhe/rpc/slurmTianhe/slurmTianhe_grpc.pb.go View File

@@ -26,6 +26,8 @@ type SlurmTianheClient interface {
ListJob(ctx context.Context, in *ListJobReq, opts ...grpc.CallOption) (*ListJobResp, error)
// ListHistoryJob list all history jobs
ListHistoryJob(ctx context.Context, in *ListHistoryJobReq, opts ...grpc.CallOption) (*ListHistoryJobResp, error)
// Submit job
SubmitJob(ctx context.Context, in *SubmitJobReq, opts ...grpc.CallOption) (*SubmitJobResp, error)
}

type slurmTianheClient struct {
@@ -54,6 +56,15 @@ func (c *slurmTianheClient) ListHistoryJob(ctx context.Context, in *ListHistoryJ
return out, nil
}

func (c *slurmTianheClient) SubmitJob(ctx context.Context, in *SubmitJobReq, opts ...grpc.CallOption) (*SubmitJobResp, error) {
out := new(SubmitJobResp)
err := c.cc.Invoke(ctx, "/slurmTianhe.slurmTianhe/SubmitJob", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

// SlurmTianheServer is the server API for SlurmTianhe service.
// All implementations must embed UnimplementedSlurmTianheServer
// for forward compatibility
@@ -62,6 +73,8 @@ type SlurmTianheServer interface {
ListJob(context.Context, *ListJobReq) (*ListJobResp, error)
// ListHistoryJob list all history jobs
ListHistoryJob(context.Context, *ListHistoryJobReq) (*ListHistoryJobResp, error)
// Submit job
SubmitJob(context.Context, *SubmitJobReq) (*SubmitJobResp, error)
mustEmbedUnimplementedSlurmTianheServer()
}

@@ -75,6 +88,9 @@ func (UnimplementedSlurmTianheServer) ListJob(context.Context, *ListJobReq) (*Li
func (UnimplementedSlurmTianheServer) ListHistoryJob(context.Context, *ListHistoryJobReq) (*ListHistoryJobResp, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListHistoryJob not implemented")
}
func (UnimplementedSlurmTianheServer) SubmitJob(context.Context, *SubmitJobReq) (*SubmitJobResp, error) {
return nil, status.Errorf(codes.Unimplemented, "method SubmitJob not implemented")
}
func (UnimplementedSlurmTianheServer) mustEmbedUnimplementedSlurmTianheServer() {}

// UnsafeSlurmTianheServer may be embedded to opt out of forward compatibility for this service.
@@ -124,6 +140,24 @@ func _SlurmTianhe_ListHistoryJob_Handler(srv interface{}, ctx context.Context, d
return interceptor(ctx, in, info, handler)
}

func _SlurmTianhe_SubmitJob_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SubmitJobReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SlurmTianheServer).SubmitJob(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/slurmTianhe.slurmTianhe/SubmitJob",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SlurmTianheServer).SubmitJob(ctx, req.(*SubmitJobReq))
}
return interceptor(ctx, in, info, handler)
}

// SlurmTianhe_ServiceDesc is the grpc.ServiceDesc for SlurmTianhe service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -139,6 +173,10 @@ var SlurmTianhe_ServiceDesc = grpc.ServiceDesc{
MethodName: "ListHistoryJob",
Handler: _SlurmTianhe_ListHistoryJob_Handler,
},
{
MethodName: "SubmitJob",
Handler: _SlurmTianhe_SubmitJob_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "slurmTianhe.proto",


+ 13
- 0
adaptor/slurm/slurmTianhe/rpc/slurmtianheclient/slurmtianhe.go View File

@@ -13,18 +13,25 @@ import (
)

type (
Argv = slurmTianhe.Argv
Environment = slurmTianhe.Environment
HistoryJob = slurmTianhe.HistoryJob
Job = slurmTianhe.Job
ListHistoryJobReq = slurmTianhe.ListHistoryJobReq
ListHistoryJobResp = slurmTianhe.ListHistoryJobResp
ListJobReq = slurmTianhe.ListJobReq
ListJobResp = slurmTianhe.ListJobResp
SubmitJobReq = slurmTianhe.SubmitJobReq
SubmitJobResp = slurmTianhe.SubmitJobResp
SubmitResponseMsg = slurmTianhe.SubmitResponseMsg

SlurmTianhe interface {
// ListJob list all jobs
ListJob(ctx context.Context, in *ListJobReq, opts ...grpc.CallOption) (*ListJobResp, error)
// ListHistoryJob list all history jobs
ListHistoryJob(ctx context.Context, in *ListHistoryJobReq, opts ...grpc.CallOption) (*ListHistoryJobResp, error)
// Submit job
SubmitJob(ctx context.Context, in *SubmitJobReq, opts ...grpc.CallOption) (*SubmitJobResp, error)
}

defaultSlurmTianhe struct {
@@ -49,3 +56,9 @@ func (m *defaultSlurmTianhe) ListHistoryJob(ctx context.Context, in *ListHistory
client := slurmTianhe.NewSlurmTianheClient(m.cli.Conn())
return client.ListHistoryJob(ctx, in, opts...)
}

// Submit job
func (m *defaultSlurmTianhe) SubmitJob(ctx context.Context, in *SubmitJobReq, opts ...grpc.CallOption) (*SubmitJobResp, error) {
client := slurmTianhe.NewSlurmTianheClient(m.cli.Conn())
return client.SubmitJob(ctx, in, opts...)
}

Loading…
Cancel
Save