diff --git a/sdks/pcmscheduler/jobmgr.go b/sdks/pcmscheduler/jobmgr.go new file mode 100644 index 0000000..38d6d1e --- /dev/null +++ b/sdks/pcmscheduler/jobmgr.go @@ -0,0 +1,263 @@ +package sch + +import ( + "fmt" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/serder" + "net/url" + "strings" +) + +type QueryTasksReq struct { + PageNum int64 `json:"pageNum"` + PageSize int64 `json:"pageSize"` + Type int64 `json:"type"` +} + +type QueryTasksResp struct { + List []TaskModel `json:"list"` + Total int64 `json:"total"` + PageNum int64 `json:"pageNum"` + PageSize int64 `json:"pageSize"` +} + +type TaskModel struct { + Id int64 `json:"id,omitempty,string" db:"id"` // id + Name string `json:"name,omitempty" db:"name"` // 作业名称 + Description string `json:"description,omitempty" db:"description"` // 作业描述 + Status string `json:"status,omitempty" db:"status"` // 作业状态 + Strategy int64 `json:"strategy" db:"strategy"` // 策略 + SynergyStatus int64 `json:"synergyStatus" db:"synergy_status"` // 协同状态(0-未协同、1-已协同) + CommitTime string `json:"commitTime,omitempty" db:"commit_time"` // 提交时间 + StartTime string `json:"startTime,omitempty" db:"start_time"` // 开始时间 + EndTime string `json:"endTime,omitempty" db:"end_time"` // 结束运行时间 + RunningTime int64 `json:"runningTime" db:"running_time"` // 已运行时间(单位秒) + YamlString string `json:"yamlString,omitempty" db:"yaml_string"` + Result string `json:"result,omitempty" db:"result"` // 作业结果 + DeletedAt string `json:"deletedAt,omitempty" gorm:"index" db:"deleted_at"` + NsID string `json:"nsId,omitempty" db:"ns_id"` + TenantId string `json:"tenantId,omitempty" db:"tenant_id"` + CreatedTime string `json:"createdTime,omitempty" db:"created_time" gorm:"autoCreateTime"` + UpdatedTime string `json:"updatedTime,omitempty" db:"updated_time"` + AdapterTypeDict string `json:"adapterTypeDict" db:"adapter_type_dict" gorm:"adapter_type_dict"` //适配器类型(对应字典表的值 + TaskTypeDict string `json:"taskTypeDict" db:"task_type_dict" gorm:"task_type_dict"` //任务类型(对应字典表的值 + UserId int64 `json:"userId,omitempty" db:"user_id"` +} + +func (c *Client) QueryTasks(req QueryTasksReq, token string) (*QueryTasksResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/pcm/v1/core/task/list") + if err != nil { + return nil, err + } + + resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ + Body: req, + Header: map[string]string{ + "Authorization": token, + }, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp response[QueryTasksResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} + +type QueryDetailsReq struct { + TaskID string `json:"id"` +} + +type QueryDetailsResp struct { + Name string `json:"name"` + Description string `json:"description"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + Strategy int64 `json:"strategy"` + SynergyStatus int64 `json:"synergyStatus"` + ClusterInfos []*ClusterInfo `json:"clusterInfos"` + SubTaskInfos []*SubTaskInfo `json:"subTaskInfos"` + TaskTypeDict string `json:"taskTypeDict"` + AdapterTypeDict string `json:"adapterTypeDict"` +} + +type SubTaskInfo struct { + Id string `json:"id" db:"id"` + Name string `json:"name" db:"name"` + ClusterId string `json:"clusterId" db:"cluster_id"` + ClusterName string `json:"clusterName" db:"cluster_name"` + Status string `json:"status" db:"status"` + Remark string `json:"remark" db:"remark"` + InferUrl string `json:"inferUrl"` + WorkDir string `json:"workDir"` + AppName string `json:"appName"` +} + +type ClusterInfo struct { + Id string `json:"id,omitempty" db:"id"` + AdapterId int64 `json:"adapterId,omitempty,string" db:"adapter_id"` + Name string `json:"name,omitempty" db:"name"` + Nickname string `json:"nickname,omitempty" db:"nickname"` + Description string `json:"description,omitempty" db:"description"` + Server string `json:"server,omitempty" db:"server"` + MonitorServer string `json:"monitorServer,omitempty" db:"monitor_server"` + Username string `json:"username,omitempty" db:"username"` + Password string `json:"password,omitempty" db:"password"` + Token string `json:"token,omitempty" db:"token"` + Ak string `json:"ak,omitempty" db:"ak"` + Sk string `json:"sk,omitempty" db:"sk"` + Region string `json:"region,omitempty" db:"region"` + ProjectId string `json:"projectId,omitempty" db:"project_id"` + Version string `json:"version,omitempty" db:"version"` + Label string `json:"label,omitempty" db:"label"` + OwnerId string `json:"ownerId,omitempty" db:"owner_id"` + AuthType string `json:"authType,omitempty" db:"auth_type"` + ProducerDict string `json:"producerDict,omitempty" db:"producer_dict"` + RegionDict string `json:"regionDict,omitempty" db:"region_dict"` + Location string `json:"location,omitempty" db:"location"` + CreateTime string `json:"createTime,omitempty" db:"created_time" gorm:"autoCreateTime"` + EnvPath string `json:"envPath,omitempty" db:"env_path"` + EnvLdPath string `json:"envLdPath,omitempty" db:"env_ld_path"` + WorkDir string `json:"workDir,omitempty" db:"work_dir"` + Address string `json:"address,omitempty" db:"address"` + ProxyAddress string `json:"proxyAddress,omitempty" db:"proxy_address"` + ProxyEnable string `json:"proxyEnable,omitempty" db:"proxy_enable"` + Driver string `json:"driver,omitempty" db:"driver"` +} + +func (c *Client) QueryDetails(req QueryDetailsReq, token string) (*QueryDetailsResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/pcm/v1/core/task/details") + if err != nil { + return nil, err + } + + resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ + Body: req, + Header: map[string]string{ + "Authorization": token, + }, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp response[QueryDetailsResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} + +type QueryHPCTrainReq struct { + TaskID string `json:"taskId"` +} + +type QueryHPCTrainResp struct { + ErrLogs string `json:"errLogs"` + JobId string `json:"jobId"` + OutLogs string `json:"outLogs"` + WorkDir string `json:"workDir"` +} + +func (c *Client) QueryHPCTrainLog(req QueryHPCTrainReq, token string) (*QueryHPCTrainResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/pcm/v1/hpc/jobLogs") + if err != nil { + return nil, err + } + + resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ + Body: req, + Header: map[string]string{ + "Authorization": token, + }, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp response[QueryHPCTrainResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} + +type QueryAITrainLogReq struct { + AdapterID string `json:"adapterId"` + ClusterID string `json:"clusterId"` + TaskID string `json:"taskId"` + InstanceNum string `json:"instanceNum"` +} + +type QueryAITrainLogResp struct { + Log string `json:"log"` +} + +func (c *Client) QueryAITrainLog(req QueryAITrainLogReq, token string) (*QueryAITrainLogResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/pcm/v1/schedule/ai/getJobLog") + if err != nil { + return nil, err + } + + req.AdapterID = "1777144940459986944" + req.InstanceNum = "0" + + resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ + Body: req, + Header: map[string]string{ + "Authorization": token, + }, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp response[QueryAITrainLogResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +}