diff --git a/sdks/blockchain/blockchain.go b/sdks/blockchain/blockchain.go index a7cb793..c905db1 100644 --- a/sdks/blockchain/blockchain.go +++ b/sdks/blockchain/blockchain.go @@ -6,6 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/utils/serder" "net/url" "strings" + "time" ) type InvokeReq struct { @@ -14,6 +15,7 @@ type InvokeReq struct { MemberName string `json:"memberName"` Type string `json:"type"` Args []string `json:"args"` + Amount int64 `json:"amount"` } func (c *Client) BlockChainInvoke(req InvokeReq, token string) error { @@ -22,15 +24,9 @@ func (c *Client) BlockChainInvoke(req InvokeReq, token string) error { return err } - //token, err := c.getToken() - //if err != nil { - // return err - //} - header := make(map[string]string) header["Content-Type"] = http2.ContentTypeJSON header["Authorization"] = token - println("token: " + token) resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ Body: req, @@ -55,6 +51,48 @@ func (c *Client) BlockChainInvoke(req InvokeReq, token string) error { return nil } +type UnPledgePointsReq struct { + PrivateKey string `form:"privateKey"` + BusinessCode string `form:"businessCode"` + EndTime time.Time `form:"endTime"` +} + +func (c *Client) UnPledgePoints(req UnPledgePointsReq, token string) error { + targetUrl, err := url.JoinPath(c.baseURL, "/jcc-bcos/points/unPledgePoints") + if err != nil { + return err + } + + header := make(map[string]string) + header["Content-Type"] = http2.ContentTypeJSON + header["Authorization"] = token + + resp, err := http2.PutJSON(targetUrl, http2.RequestParam{ + Query: req, + Header: header, + }) + if err != nil { + println(err) + return err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp response[string] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == 200 { + return nil + } + + return codeResp.ToError() + } + + return nil +} + type TokenReq struct { Username string `json:"username"` Password string `json:"password"` diff --git a/sdks/hpc/client.go b/sdks/hpc/client.go new file mode 100644 index 0000000..07b4393 --- /dev/null +++ b/sdks/hpc/client.go @@ -0,0 +1,63 @@ +package hpc + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/sdks" +) + +type response[T any] struct { + Code int `json:"code"` + Message string `json:"message"` + Data T `json:"data"` +} + +type respons2[T any] struct { + Code int `json:"code"` + Message string `json:"msg"` + Data T `json:"data"` +} + +const ( + ResponseCodeOK int = 200 +) + +func (r *response[T]) ToError() *sdks.CodeMessageError { + return &sdks.CodeMessageError{ + Code: fmt.Sprintf("%d", r.Code), + Message: r.Message, + } +} + +type Client struct { + baseURL string +} + +func NewClient(cfg *Config) *Client { + return &Client{ + baseURL: cfg.URL, + } +} + +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) +} + +type pool struct { + cfg *Config +} + +func NewPool(cfg *Config) Pool { + return &pool{ + cfg: cfg, + } +} +func (p *pool) Acquire() (*Client, error) { + cli := NewClient(p.cfg) + return cli, nil +} + +func (p *pool) Release(cli *Client) { + +} diff --git a/sdks/hpc/config.go b/sdks/hpc/config.go new file mode 100644 index 0000000..d168c41 --- /dev/null +++ b/sdks/hpc/config.go @@ -0,0 +1,5 @@ +package hpc + +type Config struct { + URL string `json:"url"` +} diff --git a/sdks/hpc/job.go b/sdks/hpc/job.go new file mode 100644 index 0000000..8c9acad --- /dev/null +++ b/sdks/hpc/job.go @@ -0,0 +1,83 @@ +package hpc + +import ( + "fmt" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/serder" + "net/url" + "strings" +) + +type CreateHPCJobReq struct { + Name string `json:"name"` + Description string `json:"description"` + ClusterID schsdk.ClusterID `json:"clusterId"` + Backend string `json:"backend"` + App string `json:"app"` + OperateType string `json:"operateType"` + ScriptContent string `json:"scriptContent"` + //Parameters HPCParameter `json:"parameters"` + Parameters map[string]string `json:"parameters"` +} + +//type HPCParameter struct { +// JobName string `json:"jobName"` +// Partition string `json:"partition"` +// Ntasks string `json:"ntasks"` +// Nodes string `json:"nodes"` +// BamFile string `json:"bamFile"` +// InputFile string `json:"inputFile"` +//} + +type CreateJobResp struct { + Backend string `json:"backend"` + JobInfo HPCJobInfo `json:"jobInfo"` +} + +type HPCJobInfo struct { + JobDir string `json:"jobDir"` + JobID string `json:"jobId"` + TaskID string `json:"taskId"` +} + +func (c *Client) CreateJob(req CreateHPCJobReq, token string) (*CreateJobResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/hpc/commitHpcTask") + if err != nil { + return nil, err + } + + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Body: req, + Header: map[string]string{ + "Authorization": token, + }, + }) + if err != nil { + return nil, err + } + + // 打印resp.Body内容 + //body, err := io.ReadAll(resp.Body) + //if err != nil { + // println("Error reading response body:", err) + //} + //println("Response Body:", string(body)) + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp respons2[CreateJobResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return &codeResp.Data, nil + } + + return nil, fmt.Errorf("error: %s", codeResp.Message) + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) + +} diff --git a/sdks/hpc/models.go b/sdks/hpc/models.go new file mode 100644 index 0000000..cee9586 --- /dev/null +++ b/sdks/hpc/models.go @@ -0,0 +1,525 @@ +package hpc + +import ( + "gitlink.org.cn/cloudream/common/pkgs/types" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/serder" + "time" +) + +type ResourceType string + +const ( + ResourceTypeCPU ResourceType = "CPU" + ResourceTypeNPU ResourceType = "NPU" + ResourceTypeGPU ResourceType = "GPU" + ResourceTypeMLU ResourceType = "MLU" + ResourceTypeStorage ResourceType = "STORAGE" + ResourceTypeMemory ResourceType = "MEMORY" + + Split = "/" + + CODE = "code" + DATASET = "dataset" + IMAGE = "image" + MODEL = "model" + RESULT = "result" + + OrderByName = "name" + OrderBySize = "size" + OrderByTime = "time" + + StorageTypeURL = "url" + StorageTypeJCS = "jcs" + + RejectedStatus = "rejected" + PendingStatus = "pending" + ApprovedStatus = "approved" + RevokedStatus = "revoked" + CancelStatus = "cancel" + ExpiredStatus = "expired" + + ApplyAccess = "apply" + PrivateAccess = "private" + PublicAccess = "public" + + PreferencePriority = "preference" + SpecifyClusterPriority = "specify" + + FailedStatus = "failed" + SuccessStatus = "success" + + Query = "query" + Delete = "delete" + + ChildrenType = "children" + ParentType = "parent" + + PlatformSugon = "sugon" + PlatformOpenI = "OpenI" + PlatformModelArts = "ModelArts" + + URL = "url" + ID = "id" + + Startup = "startup" +) + +type TaskID int64 +type DataID int64 + +type ClusterDetail struct { + // 集群ID + ClusterId schsdk.ClusterID `json:"clusterID"` + // 集群功能类型:云算,智算,超算 + ClusterType string `json:"clusterType"` + // 集群地区:华东地区、华南地区、华北地区、华中地区、西南地区、西北地区、东北地区 + Region string `json:"region"` + // 资源类型 + Resources2 []ResourceData `json:"resources1,omitempty"` + //Resources2 []ResourceData `json:"resources"` + Resources []ClusterResource `json:"resources"` +} + +type ClusterResource struct { + Resource TmpResourceData `json:"resource"` + BaseResources []TmpResourceData `json:"baseResources"` +} + +type TmpResourceData struct { + Type ResourceType `json:"type"` + Name string `json:"name"` + Total UnitValue[float64] `json:"total"` + Available UnitValue[float64] `json:"available"` +} + +type ResourceData interface { + Noop() +} + +var ResourceDataTypeUnion = types.NewTypeUnion[ResourceData]( + (*CPUResourceData)(nil), + (*NPUResourceData)(nil), + (*GPUResourceData)(nil), + (*MLUResourceData)(nil), + (*DCUResourceData)(nil), + (*GCUResourceData)(nil), + (*GPGPUResourceData)(nil), + (*StorageResourceData)(nil), + (*MemoryResourceData)(nil), + (*BalanceResourceData)(nil), + (*RateResourceData)(nil), +) +var _ = serder.UseTypeUnionInternallyTagged(&ResourceDataTypeUnion, "type") + +type ResourceDataBase struct{} + +func (d *ResourceDataBase) Noop() {} + +type UnitValue[T any] struct { + Unit string `json:"unit"` + Value T `json:"value"` +} + +type CPUResourceData struct { + serder.Metadata `union:"CPU"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[int64] `json:"total"` + Available UnitValue[int64] `json:"available"` +} + +type NPUResourceData struct { + serder.Metadata `union:"NPU"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[int64] `json:"total"` + Available UnitValue[int64] `json:"available"` +} + +type GPUResourceData struct { + serder.Metadata `union:"GPU"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[int64] `json:"total"` + Available UnitValue[int64] `json:"available"` +} + +type MLUResourceData struct { + serder.Metadata `union:"MLU"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[int64] `json:"total"` + Available UnitValue[int64] `json:"available"` +} + +type DCUResourceData struct { + serder.Metadata `union:"DCU"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[int64] `json:"total"` + Available UnitValue[int64] `json:"available"` +} + +type GCUResourceData struct { + serder.Metadata `union:"GCU"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[int64] `json:"total"` + Available UnitValue[int64] `json:"available"` +} + +type GPGPUResourceData struct { + serder.Metadata `union:"ILUVATAR-GPGPU"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[int64] `json:"total"` + Available UnitValue[int64] `json:"available"` +} + +type StorageResourceData struct { + serder.Metadata `union:"STORAGE"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[float64] `json:"total"` + Available UnitValue[float64] `json:"available"` +} + +type MemoryResourceData struct { + serder.Metadata `union:"MEMORY"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[float64] `json:"total"` + Available UnitValue[float64] `json:"available"` +} + +type BalanceResourceData struct { + serder.Metadata `union:"BALANCE"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[float64] `json:"total"` + Available UnitValue[float64] `json:"available"` +} + +type RateResourceData struct { + serder.Metadata `union:"RATE"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[float64] `json:"total"` + Available UnitValue[float64] `json:"available"` +} + +type ResourceRange struct { + UserID cdssdk.UserID `json:"userID"` + Type ResourceType `json:"type"` + GPU Range `json:"gpu"` + GPUNumber int `json:"gpuNumber"` + CPU Range `json:"cpu"` + Memory Range `json:"memory"` + Storage Range `json:"storage"` +} + +type Range struct { + Min float64 `json:"min"` + Max float64 `json:"max"` +} + +type ResourcePriority interface { + Noop() +} + +type ResourcePriorityBase struct { +} + +var ResourcePriorityTypeUnion = types.NewTypeUnion[ResourcePriority]( + (*RegionPriority)(nil), + (*ChipPriority)(nil), + (*BiasPriority)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&ResourcePriorityTypeUnion, "type") + +func (d *ResourcePriorityBase) Noop() {} + +type RegionPriority struct { + serder.Metadata `union:"region"` + ResourcePriorityBase + Type string `json:"type"` + Options []string `json:"options"` +} + +type ChipPriority struct { + serder.Metadata `union:"chip"` + ResourcePriorityBase + Type string `json:"type"` + Options []string `json:"options"` +} + +type BiasPriority struct { + serder.Metadata `union:"bias"` + ResourcePriorityBase + Type string `json:"type"` + Options []string `json:"options"` +} + +type TaskMessage struct { + Status string `json:"status"` + Message string `json:"message"` +} + +type ReportMessage struct { + TaskName string `json:"taskName"` + TaskID string `json:"taskID"` + Status bool `json:"status"` + Message string `json:"message"` + ClusterID schsdk.ClusterID `json:"clusterID"` + Output string `json:"output"` +} + +type UploadParams struct { + DataType string `json:"dataType"` + UploadInfo UploadInfo `json:"uploadInfo"` +} + +type UploadInfo interface { + Noop() +} + +var UploadInfoTypeUnion = types.NewTypeUnion[UploadInfo]( + (*LocalUploadInfo)(nil), + (*RemoteUploadInfo)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&UploadInfoTypeUnion, "type") + +type LocalUploadInfo struct { + serder.Metadata `union:"local"` + UploadInfoBase + Type string `json:"type"` + LocalPath string `json:"localPath"` + ObjectIDs []cdssdk.ObjectID `json:"objectIDs"` +} + +type RemoteUploadInfo struct { + serder.Metadata `union:"url"` + UploadInfoBase + Type string `json:"type"` + Url string `json:"url"` + Branch string `json:"branch"` + DataName string `json:"dataName"` + Cluster schsdk.ClusterID `json:"clusterID"` +} + +type UploadInfoBase struct{} + +func (d *UploadInfoBase) Noop() {} + +type UploadPriority interface { + Noop() +} + +var UploadPriorityTypeUnion = types.NewTypeUnion[UploadPriority]( + (*Preferences)(nil), + (*SpecifyCluster)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&UploadPriorityTypeUnion, "type") + +type Preferences struct { + serder.Metadata `union:"preference"` + UploadPriorityBase + Type string `json:"type"` + ResourcePriorities []ResourcePriority `json:"priorities"` +} + +type SpecifyCluster struct { + serder.Metadata `union:"specify"` + UploadPriorityBase + Type string `json:"type"` + Clusters []schsdk.ClusterID `json:"clusters"` +} + +type UploadPriorityBase struct{} + +func (d *UploadPriorityBase) Noop() {} + +type QueryData struct { + DataType string `json:"dataType" binding:"required"` + UserID cdssdk.UserID `json:"userID" binding:"required"` + Path string `json:"path"` + PackageID cdssdk.PackageID `json:"packageID" binding:"required"` + CurrentPage int `json:"currentPage" binding:"required"` + PageSize int `json:"pageSize" binding:"required"` + OrderBy string `json:"orderBy" binding:"required"` +} + +type DataBinding interface { + Noop() +} + +var DataBindingTypeUnion = types.NewTypeUnion[DataBinding]( + (*DatasetBinding)(nil), + (*ModelBinding)(nil), + (*CodeBinding)(nil), + (*ImageBinding)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&DataBindingTypeUnion, "type") + +type DataBindingBase struct{} + +func (d *DataBindingBase) Noop() {} + +type DatasetBinding struct { + serder.Metadata `union:"dataset"` + DataBindingBase + Type string `json:"type"` + Name string `json:"name"` + ClusterIDs []schsdk.ClusterID `json:"clusterIDs"` + Description string `json:"description"` + Category string `json:"category"` + PackageID cdssdk.PackageID `json:"packageID"` + RepositoryName string `json:"repositoryName"` + ConsumptionPoints float64 `json:"points"` +} + +type ModelBinding struct { + serder.Metadata `union:"model"` + DataBindingBase + Type string `json:"type"` + Name string `json:"name"` + ClusterIDs []schsdk.ClusterID `json:"clusterIDs"` + Description string `json:"description"` + Category string `json:"category"` + ModelType string `json:"modelType"` + Env string `json:"env"` + Version string `json:"version"` + PackageID cdssdk.PackageID `json:"packageID"` + RepositoryName string `json:"repositoryName"` +} + +type CodeBinding struct { + serder.Metadata `union:"code"` + DataBindingBase + Type string `json:"type"` + Name string `json:"name"` + ClusterID schsdk.ClusterID `json:"clusterID"` + Description string `json:"description"` + ImageID schsdk.ImageID `json:"imageID"` + BootstrapObjectID cdssdk.ObjectID `json:"bootstrapObjectID"` + PackageID cdssdk.PackageID `json:"packageID"` + Output string `json:"output"` + // 当集群为openi的时候,需要传入分支 + Branch string `json:"branch"` +} + +//type ImageBinding struct { +// serder.Metadata `union:"image"` +// DataBindingBase +// Type string `json:"type"` +// Name string `json:"name"` +// ClusterIDs []schsdk.ClusterID `json:"clusterIDs"` +// Description string `json:"description"` +// Architecture string `json:"architecture"` +// ResourceType string `json:"resourceType"` +// Tags []string `json:"tags"` +// PackageID cdssdk.PackageID `json:"packageID"` +//} + +type ImageBinding struct { + serder.Metadata `union:"image"` + DataBindingBase + Type string `json:"type"` + ID int64 `json:"id"` + Name string `json:"name"` + IDType string `json:"idType"` + ImageID string `json:"imageID"` + ClusterID schsdk.ClusterID `json:"clusterID"` +} + +type Image struct { + ImageID schsdk.ImageID `json:"imageID" gorm:"column:ImageID;primaryKey"` + Name string `json:"name" gorm:"column:Name"` + CreateTime time.Time `json:"createTime" gorm:"column:CreateTime"` + ClusterImage []ClusterImage `gorm:"foreignKey:image_id;references:ImageID" json:"clusterImages"` +} + +type ClusterImage struct { + ImageID schsdk.ImageID `gorm:"column:image_id" json:"imageID"` + ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"` + OriginImageType string `gorm:"column:origin_image_type" json:"originImageType"` + OriginImageID string `gorm:"column:origin_image_id" json:"originImageID"` + OriginImageName string `gorm:"column:origin_image_name" json:"originImageName"` + ClusterImageCard []ClusterImageCard `gorm:"foreignKey:origin_image_id;references:origin_image_id" json:"cards"` +} + +func (ClusterImage) TableName() string { + return "cluster_image" +} + +type ClusterImageCard struct { + OriginImageID string `gorm:"column:origin_image_id" json:"originImageID"` + Card string `gorm:"column:card" json:"card"` +} + +func (ClusterImageCard) TableName() string { + return "cluster_image_card" +} + +type QueryBindingFilters struct { + Status string `json:"status"` + Name string `json:"name"` +} + +type QueryBindingDataParam interface { + Noop() +} + +var QueryBindingDataParamTypeUnion = types.NewTypeUnion[QueryBindingDataParam]( + (*PrivateLevel)(nil), + (*ApplyLevel)(nil), + (*PublicLevel)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&QueryBindingDataParamTypeUnion, "type") + +type QueryBindingDataParamBase struct{} + +func (d *QueryBindingDataParamBase) Noop() {} + +type PrivateLevel struct { + serder.Metadata `union:"private"` + QueryBindingDataParamBase + Type string `json:"type" binding:"required"` + UserID cdssdk.UserID `json:"userID" binding:"required"` + BindingID int64 `json:"bindingID" binding:"required"` + Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现 +} + +type ApplyLevel struct { + serder.Metadata `union:"apply"` + QueryBindingDataParamBase + Type string `json:"type" binding:"required"` + UserID cdssdk.UserID `json:"userID" binding:"required"` + Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现 +} + +type PublicLevel struct { + serder.Metadata `union:"public"` + QueryBindingDataParamBase + UserID cdssdk.UserID `json:"userID" binding:"required"` + Type string `json:"type" binding:"required"` + Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现 +} diff --git a/sdks/pcmscheduler/job.go b/sdks/pcmscheduler/job.go new file mode 100644 index 0000000..3a5d291 --- /dev/null +++ b/sdks/pcmscheduler/job.go @@ -0,0 +1,24 @@ +package sch + +import ( + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "time" +) + +type PCMJob struct { + ID string `gorm:"column:id" json:"ID"` + JobType string `gorm:"column:job_type" json:"jobType"` + UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"` + JobSetID schsdk.JobSetID `gorm:"column:jobset_id" json:"jobSetID"` + LocalJobID string `gorm:"column:local_job_id" json:"localJobID"` + Param string `gorm:"column:param" json:"param"` + Token string `gorm:"column:token" json:"token"` + CreatedAt time.Time `gorm:"column:created_at" json:"createdAt"` +} + +type PCMJobDataReturn struct { + JobID string `gorm:"column:job_id" json:"jobID"` + ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"` + PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"` +} diff --git a/sdks/pcmscheduler/jobset.go b/sdks/pcmscheduler/jobset.go index 8bbdd4e..bcf329c 100644 --- a/sdks/pcmscheduler/jobset.go +++ b/sdks/pcmscheduler/jobset.go @@ -14,22 +14,21 @@ type GetClusterInfoReq struct { IDs []schsdk.ClusterID `json:"clusterIDs"` } -//type GetClusterInfoResp struct { -// Data []ClusterDetail `json:"data"` -// TraceId string `json:"traceId"` -//} - -func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error) { - targetUrl, err := url.JoinPath(c.baseURL, "/queryResources") +func (c *Client) GetClusterInfo(req GetClusterInfoReq, token string) ([]ClusterDetail, error) { + targetUrl, err := url.JoinPath(c.baseURL, "schedule/queryResources") if err != nil { return nil, err } - //resp, err := http2.PostJSON(targetUrl, http2.RequestParam{Body: req}) - resp, err := http2.PostJSON(targetUrl, http2.RequestParam{}) + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Header: map[string]string{ + "Authorization": token, + }, + }) if err != nil { return nil, err } + contType := resp.Header.Get("Content-Type") if strings.Contains(contType, http2.ContentTypeJSON) { @@ -57,11 +56,15 @@ func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error) return nil, fmt.Errorf("unknow response content type: %s", contType) } -type CreateJobReq struct { - Name string `json:"name"` - Description string `json:"description"` - JobResources schsdk.JobResources `json:"jobResources"` - DataDistribute DataDistribute `json:"dataDistributes"` +type CreateInferenceJobResp struct { + TaskId string `json:"taskId"` +} + +type CreateAIJobReq struct { + Name string `json:"name"` + Description string `json:"description"` + JobResources schsdk.JobResources `json:"jobResources"` + DataDistributes DataDistribute `json:"dataDistributes"` } type CommonJsonData struct { @@ -92,6 +95,7 @@ type DatasetDistribute struct { type CodeDistribute struct { DataName string `json:"dataName"` PackageID cdssdk.PackageID `json:"packageID"` + Output string `json:"output"` Clusters []DataDetail `json:"clusters"` } @@ -110,6 +114,7 @@ type ModelDistribute struct { type CreateJobResp struct { TaskID TaskID `json:"taskID"` + TaskName string `json:"taskName"` ScheduleDatas []ScheduleData `json:"scheduleDatas"` } @@ -120,21 +125,87 @@ type ScheduleData struct { ClusterIDs []schsdk.ClusterID `json:"clusterIDs"` } -func (c *Client) CreateJob(req CreateJobReq) (*CreateJobResp, error) { - targetUrl, err := url.JoinPath(c.baseURL, "/createTask") +func (c *Client) CreateInferenceJob(req CreateAIJobReq, token string) (*CreateInferenceJobResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "inference/createTask") + if err != nil { + return nil, err + } + + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Body: req, + Header: map[string]string{ + "Authorization": token, + }, + }) if err != nil { return nil, err } - // 将req转换成json,并打印 - //req2, err := serder.ObjectToJSONEx(req) - //if err != nil { - // return nil, fmt.Errorf("request to json: %w", err) - //} - //fmt.Println(string(req2)) + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp respons2[CreateInferenceJobResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return &codeResp.Data, nil + } + + return nil, fmt.Errorf("error: %s", codeResp.Message) + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} + +type StopInferenceJobReq struct { + TaskId string `json:"taskId"` +} + +func (c *Client) StopInferenceJob(req StopInferenceJobReq, token string) error { + targetUrl, err := url.JoinPath(c.baseURL, "inference/createTask") + if err != nil { + return err + } + + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Body: req, + Header: map[string]string{ + "Authorization": token, + }, + }) + if err != nil { + return err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp respons2[CreateInferenceJobResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return nil + } + + return fmt.Errorf("error: %s", codeResp.Message) + } + + return fmt.Errorf("unknow response content type: %s", contType) +} + +func (c *Client) CreateJob(req CreateAIJobReq, token string) (*CreateJobResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "schedule/createTask") + if err != nil { + return nil, err + } resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ Body: req, + Header: map[string]string{ + "Authorization": token, + }, }) if err != nil { return nil, err @@ -176,14 +247,17 @@ type DataScheduleResults struct { Results []DataScheduleResult `json:"results"` } -func (c *Client) RunJob(req RunJobReq) error { - targetUrl, err := url.JoinPath(c.baseURL, "runTask") +func (c *Client) RunJob(req RunJobReq, token string) error { + targetUrl, err := url.JoinPath(c.baseURL, "schedule/runTask") if err != nil { return err } resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ Body: req, + Header: map[string]string{ + "Authorization": token, + }, }) if err != nil { return err @@ -213,7 +287,7 @@ type CancelJobReq struct { } func (c *Client) CancelJob(req CancelJobReq) error { - targetUrl, err := url.JoinPath(c.baseURL, "/queryResources") + targetUrl, err := url.JoinPath(c.baseURL, "schedule/queryResources") if err != nil { return err } diff --git a/sdks/pcmscheduler/models.go b/sdks/pcmscheduler/models.go index 2da752e..6f8707f 100644 --- a/sdks/pcmscheduler/models.go +++ b/sdks/pcmscheduler/models.go @@ -2,6 +2,7 @@ package sch import ( "gitlink.org.cn/cloudream/common/pkgs/types" + "gitlink.org.cn/cloudream/common/sdks/hpc" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/serder" @@ -26,6 +27,11 @@ const ( DATASET = "dataset" IMAGE = "image" MODEL = "model" + RESULT = "result" + + PackageTypeNormal = "normal" + PackageTypeNull = "null" + Null = "null" OrderByName = "name" OrderBySize = "size" @@ -48,8 +54,11 @@ const ( PreferencePriority = "preference" SpecifyClusterPriority = "specify" - FailedStatus = "failed" - SuccessStatus = "success" + FailedStatus = "failed" + SuccessStatus = "success" + SucceededStatus = "succeeded" + UploadingStatus = "uploading" + RunningStatus = "running" Query = "query" Delete = "delete" @@ -58,11 +67,32 @@ const ( ParentType = "parent" PlatformSugon = "sugon" - PlatformOpenI = "openi" - PlatformModelArts = "modelarts" + PlatformOpenI = "OpenI" + PlatformModelArts = "ModelArts" + + PlatformAI = "AI" // 智算 + PlatformCloud = "CLOUD" // 云算 + PlatformCloudInference = "PCM_Inference" + PlatformHPC = "HPCSlurm" //超算 URL = "url" ID = "id" + + Startup = "startup" + + Schedule = "schedule" + + BlockChainJobCreatePrefix = "job_create_" + + Complete = "Complete" + + NodeTypeBinding = "binding" + NodeTypeUpload = "upload" + NodeTypeDataReturn = "data_return" + NodeTypeHPCCreate = "hpc_create" + NodeTypeInference = "inference" + NodeTypeAICreate = "ai_job_create" + NodeTypeAIJobRun = "ai_job_run" ) type TaskID int64 @@ -231,6 +261,47 @@ type ResourceRange struct { Ids []string `json:"ids"` } +type JobInfo struct { + TaskID string `json:"taskID"` + JobSubmitInfo JobSubmitInfo `json:"jobSubmitInfo"` + ResultFiles []ResultFile `json:"resultFiles"` +} + +type ResultFile struct { + ClusterID schsdk.ClusterID `json:"clusterID"` + Objects []cdssdk.Object `json:"objects"` +} + +type JobSubmitInfo interface { + Noop() +} + +type JobSubmitInfoBase struct { +} + +var JobSubmitInfoTypeUnion = types.NewTypeUnion[JobSubmitInfo]( + (*PCMJobSubmitInfo)(nil), + (*HPCJobSubmitInfo)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&JobSubmitInfoTypeUnion, "type") + +func (d *JobSubmitInfoBase) Noop() {} + +type PCMJobSubmitInfo struct { + serder.Metadata `union:"pcm"` + JobSubmitInfoBase + Type string `json:"type"` + Info CreateAIJobReq `json:"info"` +} + +type HPCJobSubmitInfo struct { + serder.Metadata `union:"hpc"` + JobSubmitInfoBase + Type string `json:"type"` + Info hpc.CreateHPCJobReq `json:"info"` +} + type Range struct { Min float64 `json:"min"` Max float64 `json:"max"` @@ -279,10 +350,18 @@ type TaskMessage struct { Message string `json:"message"` } +type ReportMessage struct { + TaskName string `json:"taskName"` + TaskID string `json:"taskID"` + Status bool `json:"status"` + Message string `json:"message"` + ClusterID schsdk.ClusterID `json:"clusterID"` + Output string `json:"output"` +} + type UploadParams struct { DataType string `json:"dataType"` UploadInfo UploadInfo `json:"uploadInfo"` - //UploadPriority UploadPriority `json:"uploadPriority"` } type UploadInfo interface { @@ -352,9 +431,10 @@ type QueryData struct { UserID cdssdk.UserID `json:"userID" binding:"required"` Path string `json:"path"` PackageID cdssdk.PackageID `json:"packageID" binding:"required"` - CurrentPage int `json:"currentPage" binding:"required"` - PageSize int `json:"pageSize" binding:"required"` + CurrentPage int `json:"currentPage"` + PageSize int `json:"pageSize"` OrderBy string `json:"orderBy" binding:"required"` + PackageName string `json:"packageName"` } type DataBinding interface { @@ -370,47 +450,56 @@ var DataBindingTypeUnion = types.NewTypeUnion[DataBinding]( var _ = serder.UseTypeUnionInternallyTagged(&DataBindingTypeUnion, "type") -type DataBindingBase struct{} +type DataBindingBase struct { + RootPath string `json:"rootPath"` +} func (d *DataBindingBase) Noop() {} type DatasetBinding struct { serder.Metadata `union:"dataset"` DataBindingBase - Type string `json:"type"` - Name string `json:"name"` - ClusterIDs []schsdk.ClusterID `json:"clusterIDs"` - Description string `json:"description"` - Category string `json:"category"` - PackageID cdssdk.PackageID `json:"packageID"` + Type string `json:"type"` + Name string `json:"name"` + OperateType string `json:"operateType"` + ClusterIDs []schsdk.ClusterID `json:"clusterIDs"` + Description string `json:"description"` + Category string `json:"category"` + PackageID cdssdk.PackageID `json:"packageID"` + RepositoryName string `json:"repositoryName"` + ConsumptionPoints int64 `json:"points"` } type ModelBinding struct { serder.Metadata `union:"model"` DataBindingBase - Type string `json:"type"` - Name string `json:"name"` - ClusterIDs []schsdk.ClusterID `json:"clusterIDs"` - Description string `json:"description"` - Category string `json:"category"` - ModelType string `json:"modelType"` - Env string `json:"env"` - Version string `json:"version"` - PackageID cdssdk.PackageID `json:"packageID"` + Type string `json:"type"` + Name string `json:"name"` + OperateType string `json:"operateType"` + ClusterIDs []schsdk.ClusterID `json:"clusterIDs"` + Description string `json:"description"` + Category string `json:"category"` + ModelType string `json:"modelType"` + Env string `json:"env"` + Version string `json:"version"` + PackageID cdssdk.PackageID `json:"packageID"` + RepositoryName string `json:"repositoryName"` } type CodeBinding struct { serder.Metadata `union:"code"` DataBindingBase - Type string `json:"type"` - Name string `json:"name"` - //ClusterIDs []schsdk.ClusterID `json:"clusterIDs"` + Type string `json:"type"` + Name string `json:"name"` + OperateType string `json:"operateType"` ClusterID schsdk.ClusterID `json:"clusterID"` Description string `json:"description"` ImageID schsdk.ImageID `json:"imageID"` BootstrapObjectID cdssdk.ObjectID `json:"bootstrapObjectID"` - //FilePath string `json:"filePath"` - PackageID cdssdk.PackageID `json:"packageID"` + PackageID cdssdk.PackageID `json:"packageID"` + Output string `json:"output"` + // 当集群为openi的时候,需要传入分支 + Branch string `json:"branch"` } //type ImageBinding struct { @@ -429,12 +518,13 @@ type CodeBinding struct { type ImageBinding struct { serder.Metadata `union:"image"` DataBindingBase - Type string `json:"type"` - ID int64 `json:"id"` - Name string `json:"name"` - IDType string `json:"idType"` - ImageID string `json:"imageID"` - ClusterID schsdk.ClusterID `json:"clusterID"` + Type string `json:"type"` + ID int64 `json:"id"` + OperateType string `json:"operateType"` + Name string `json:"name"` + IDType string `json:"idType"` + ImageID string `json:"imageID"` + ClusterID schsdk.ClusterID `json:"clusterID"` } type Image struct { @@ -454,7 +544,7 @@ type ClusterImage struct { } func (ClusterImage) TableName() string { - return "clusterImage" + return "cluster_image" } type ClusterImageCard struct { @@ -463,7 +553,7 @@ type ClusterImageCard struct { } func (ClusterImageCard) TableName() string { - return "clusterImageCard" + return "cluster_image_card" } type QueryBindingFilters struct { diff --git a/sdks/scheduler/jobflow.go b/sdks/scheduler/jobflow.go new file mode 100644 index 0000000..8d78ddf --- /dev/null +++ b/sdks/scheduler/jobflow.go @@ -0,0 +1,102 @@ +package schsdk + +import ( + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "time" +) + +type FlowData struct { + Nodes []Node `json:"nodes"` + Edges []Edge `json:"edges"` +} + +type Node struct { + ID string `json:"id"` + Type string `json:"type"` + X float64 `json:"x"` + Y float64 `json:"y"` + Properties JobInfo `json:"properties"` + Text *Text `json:"text,omitempty"` // 有些节点没有 text 字段 +} + +type Edge struct { + ID string `json:"id"` + Type string `json:"type"` + Properties interface{} `json:"properties"` // 为空对象 {} + SourceNodeID string `json:"sourceNodeId"` + TargetNodeID string `json:"targetNodeId"` + SourceAnchorID string `json:"sourceAnchorId"` + TargetAnchorID string `json:"targetAnchorId"` + StartPoint Point `json:"startPoint"` + EndPoint Point `json:"endPoint"` + PointsList []Point `json:"pointsList"` + Text *Text `json:"text,omitempty"` // 有些 edge 有文字标签 +} + +type Point struct { + X float64 `json:"x"` + Y float64 `json:"y"` +} + +type Text struct { + X float64 `json:"x"` + Y float64 `json:"y"` + Value string `json:"value"` +} + +type JobFlowDAO struct { + ID int64 `gorm:"column:id;primaryKey;autoIncrement" json:"id"` + UserID cdssdk.UserID `gorm:"column:user_id;not null" json:"userID"` + Name string `gorm:"column:name;size:255;not null" json:"name"` + Description string `gorm:"column:description;size:255" json:"description"` + Content string `gorm:"column:content;type:text" json:"content"` + Status string `gorm:"column:status;type:enum('pending','running','failed','success');default:'pending'" json:"status"` + JobType string `gorm:"column:job_type;size:255" json:"jobType"` + UpdatedAt time.Time `gorm:"column:update_at;autoUpdateTime" json:"updatedAt"` + CreatedAt time.Time `gorm:"column:created_at;autoCreateTime" json:"createdAt"` +} + +type JobFlow struct { + ID int64 `json:"id"` + UserID cdssdk.UserID `json:"userID"` + Name string `json:"name"` + Description string `json:"description"` + Content FlowData `json:"content"` + Status string `json:"status"` + JobType string `json:"jobType"` + UpdatedAt time.Time `json:"updatedAt"` + CreatedAt time.Time `json:"createdAt"` +} + +type JobFlowRunStatus struct { + RunID JobSetID `gorm:"column:run_id;primaryKey" json:"runID"` + NodeType string `gorm:"column:node_type;size:100" json:"nodeType"` + NodeID string `gorm:"column:node_id;size:255" json:"nodeID"` + Status string `gorm:"column:status;type:enum('pending','running','fail','success')" json:"status"` + RunOutput string `gorm:"column:run_output;type:text" json:"runOutput"` + RunLog string `gorm:"column:run_log;type:text" json:"runLog"` +} + +type JobFlowRunDAO struct { + ID JobSetID `gorm:"column:id;primaryKey;" json:"runID"` + UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"` + Name string `gorm:"column:name;size:255;not null" json:"name"` + Description string `gorm:"column:description;size:255" json:"description"` + Content string `gorm:"column:content;type:text" json:"content"` + Status string `gorm:"column:status;type:enum('running','fail','success')" json:"status"` + Token string `gorm:"column:token;size:255" json:"token"` + CreatedAt time.Time `gorm:"column:created_at;autoCreateTime" json:"createdAt"` + FinishAt *time.Time `gorm:"column:finish_at" json:"finishedAt"` +} + +type JobFlowRun struct { + ID JobSetID `gorm:"column:id;primaryKey;" json:"runID"` + UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"` + Name string `gorm:"column:name;size:255;not null" json:"name"` + Description string `gorm:"column:description;size:255" json:"description"` + Content FlowData `gorm:"column:content;type:text" json:"content"` + Status string `gorm:"column:status;type:enum('running','fail','success')" json:"status"` + Token string `gorm:"column:token;size:255" json:"token"` + CreatedAt time.Time `gorm:"column:created_at;autoCreateTime" json:"createdAt"` + FinishAt *time.Time `gorm:"column:finish_at" json:"finishedAt"` +} diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index cbf0ae7..8262d75 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -8,10 +8,12 @@ import ( const ( JobTypeNormal = "Normal" + JobTypePCM = "PCM" JobTypeResource = "Resource" JobTypeInstance = "Instance" JobTypeFinetuning = "Finetuning" JobTypeDataPreprocess = "DataPreprocess" + JobTypeDataReturn = "DataReturn" FileInfoTypePackage = "Package" FileInfoTypeLocalFile = "LocalFile" @@ -24,12 +26,18 @@ const ( MemoryUtilization = "MemoryUtilization" GPUUtilization = "GPUUtilization" CPUUtilization = "CPUUtilization" + + MethodPost = "POST" + MethodGet = "GET" + CodeSuccess = 200 ) type JobID string type JobSetID string +type DataID int64 + type ImageID int64 // 计算中心ID @@ -49,6 +57,9 @@ type JobSetInfo struct { type JobInfo interface { GetLocalJobID() string + GetTargetLocalJobIDs() []string + SetTargetLocalJob(info TargetJobInfo) + GetTargetInputParams(targetID string) map[string]string } var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( @@ -59,18 +70,59 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( (*UpdateMultiInstanceJobInfo)(nil), (*FinetuningJobInfo)(nil), (*DataPreprocessJobInfo)(nil), - (*PCMJobInfo)(nil), + (*AIJobInfo)(nil), + (*HPCJobInfo)(nil), + (*BindingJobInfo)(nil), + (*PCMInferenceJobInfo)(nil), + (*CompleteJobInfo)(nil), + (*StartJobInfo)(nil), + (*NotifyJobInfo)(nil), + (*StopInferenceJobInfo)(nil), + (*UploadJobInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") type JobInfoBase struct { - LocalJobID string `json:"localJobID"` + LocalJobID string `json:"localJobID"` + TargetJob []TargetJobInfo `json:"targetJob"` +} + +type TargetJobInfo struct { + TargetJobID string `json:"targetJobID"` + InputParams map[string]string `json:"inputParams"` } func (i *JobInfoBase) GetLocalJobID() string { return i.LocalJobID } +func (i *JobInfoBase) GetTargetInputParams(targetID string) map[string]string { + for _, v := range i.TargetJob { + if v.TargetJobID == targetID { + return v.InputParams + } + } + return nil +} + +func (i *JobInfoBase) GetTargetLocalJobIDs() []string { + var IDs []string + for _, v := range i.TargetJob { + IDs = append(IDs, v.TargetJobID) + } + return IDs +} + +func (i *JobInfoBase) SetTargetLocalJob(info TargetJobInfo) { + for _, target := range i.TargetJob { + // 已经存在,则不用再添加 + if target.TargetJobID == info.TargetJobID { + return + } + } + i.TargetJob = append(i.TargetJob, info) +} + type NormalJobInfo struct { serder.Metadata `union:"Normal"` JobInfoBase @@ -82,14 +134,190 @@ type NormalJobInfo struct { ModelJobInfo ModelJobInfo `json:"modelJobInfo"` } -type PCMJobInfo struct { - serder.Metadata `union:"PCM"` +type PCMInferenceJobInfo struct { + serder.Metadata `union:"PCM_Inference"` + JobInfoBase + Type string `json:"type"` + Name string `json:"name"` + Description string `json:"description"` + Files JobFilesInfo `json:"files"` + JobResources JobResources `json:"jobResources"` + BindingID DataID `json:"bindingID"` + ResourceChoice ResourceChoice `json:"resourceChoice"` +} + +type StopInferenceJobInfo struct { + serder.Metadata `union:"StopInference"` + JobInfoBase + Type string `json:"type"` + Url string `json:"url"` +} + +type AIJobInfo struct { + serder.Metadata `union:"AI"` + JobInfoBase + Type string `json:"type"` + Name string `json:"name"` + Description string `json:"description"` + Files JobFilesInfo `json:"files"` + JobResources JobResources `json:"jobResources"` + ResourceChoice ResourceChoice `json:"resourceChoice"` +} + +type CompleteJobInfo struct { + serder.Metadata `union:"Complete"` + JobInfoBase + Type string `json:"type"` +} + +type StartJobInfo struct { + serder.Metadata `union:"Start"` + JobInfoBase + Type string `json:"type"` +} + +type UploadJobInfo struct { + serder.Metadata `union:"Upload"` + JobInfoBase + Type string `json:"type"` + DataType string `json:"dataType"` +} + +type NotifyJobInfo struct { + serder.Metadata `union:"Notify"` JobInfoBase - Type string `json:"type"` - Name string `json:"name"` - Description string `json:"description"` - Files JobFilesInfo `json:"files"` - JobResources JobResources `json:"jobResources"` + Type string `json:"type"` + RequestType string `json:"requestType"` + Url string `json:"url"` + Body any `json:"body"` + Headers map[string]string `json:"headers"` +} + +type ResourceChoice struct { + Type string `json:"type"` + ResourceScopes []ResourceScope `json:"resourceScopes"` +} + +type ResourceScope struct { + Name string `json:"name"` + Min float64 `json:"min"` + Max float64 `json:"max"` +} + +type BindingJobInfo struct { + serder.Metadata `union:"Binding"` + JobInfoBase + Type string `json:"type"` + Info DataBinding `json:"info"` + Name string `json:"name"` // 临时使用 +} + +type DataBinding interface { + Noop() +} + +var DataBindingTypeUnion = types.NewTypeUnion[DataBinding]( + (*ModelBinding)(nil), + (*DatasetBinding)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&DataBindingTypeUnion, "type") + +type DataBindingBase struct{} + +func (d *DataBindingBase) Noop() {} + +type ModelBinding struct { + serder.Metadata `union:"model"` + DataBindingBase + Type string `json:"type"` + Name string `json:"name"` + Description string `json:"description"` + ClusterIDs []ClusterID `json:"clusterIDs"` + Category string `json:"category"` + ModelType string `json:"modelType"` + Env string `json:"env"` + Version string `json:"version"` + RepositoryName string `json:"repositoryName"` +} + +type DatasetBinding struct { + serder.Metadata `union:"dataset"` + DataBindingBase + Type string `json:"type"` + Name string `json:"name"` + OperateType string `json:"operateType"` + ClusterIDs []ClusterID `json:"clusterIDs"` + Description string `json:"description"` + Category string `json:"category"` + PackageID cdssdk.PackageID `json:"packageID"` + RepositoryName string `json:"repositoryName"` + ConsumptionPoints int64 `json:"points"` +} + +type HPCJobInfo struct { + serder.Metadata `union:"HPC"` + JobInfoBase + Type string `json:"type"` + Name string `json:"name"` + Description string `json:"description"` + ClusterID ClusterID `json:"clusterID"` + Backend string `json:"backend"` + App string `json:"app"` + OperateType string `json:"operateType"` + ScriptContent string `json:"scriptContent"` + Parameters HPCParameter `json:"parameters"` +} + +type HPCParameter struct { + JobName string `json:"jobName"` + JobDir string `json:"jobDir"` + Partition string `json:"partition"` + Ntasks string `json:"ntasks"` + Nodes string `json:"nodes"` + BamFile string `json:"bamFile"` + HashType string `json:"hashType"` + AttackMode string `json:"attackMode"` + HashInput string `json:"hashInput"` + Mask string `json:"mask"` + Dictionary string `json:"dictionary"` + Dictionary2 string `json:"dictionary2"` + HPCBindingFiles []HPCBindingFile `json:"hpcBindingFiles"` +} + +type HPCBindingFile struct { + ParamName string `json:"paramName"` + Resource HPCFile `json:"resource"` +} + +type HPCFile interface { + Noop() +} + +var HPCFileTypeUnion = types.NewTypeUnion[HPCFile]( + (*HPCObject)(nil), + (*HPCPath)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&HPCFileTypeUnion, "type") + +type HPCFileBase struct{} + +func (d *HPCFileBase) Noop() {} + +type HPCObject struct { + serder.Metadata `union:"object"` + HPCFileBase + Type string `json:"type"` + ObjectID cdssdk.ObjectID `json:"objectID"` +} + +type HPCPath struct { + serder.Metadata `union:"path"` + HPCFileBase + Type string `json:"type"` + PackageID cdssdk.PackageID `json:"packageID"` + Path string `json:"path"` } type JobResources struct { @@ -126,8 +354,9 @@ var JobResourceTypeUnion = types.NewTypeUnion[JobResource]( (*NPU)(nil), (*MLU)(nil), (*DCU)(nil), - (*Memory)(nil), + (*MEMORY)(nil), (*PRICE)(nil), + (*STORAGE)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobResourceTypeUnion, "type") @@ -140,6 +369,15 @@ type CPU struct { serder.Metadata `union:"CPU"` JobResourceBase Type string `json:"type"` + Name string `json:"name"` + Number int64 `json:"number"` +} + +type STORAGE struct { + serder.Metadata `union:"STORAGE"` + JobResourceBase + Type string `json:"type"` + Name string `json:"name"` Number int64 `json:"number"` } @@ -147,6 +385,7 @@ type GPU struct { serder.Metadata `union:"GPU"` JobResourceBase Type string `json:"type"` + Name string `json:"name"` Number int64 `json:"number"` } @@ -154,13 +393,15 @@ type NPU struct { serder.Metadata `union:"NPU"` JobResourceBase Type string `json:"type"` + Name string `json:"name"` Number int64 `json:"number"` } -type Memory struct { - serder.Metadata `union:"Memory"` +type MEMORY struct { + serder.Metadata `union:"MEMORY"` JobResourceBase Type string `json:"type"` + Name string `json:"name"` Number int64 `json:"number"` } @@ -168,6 +409,7 @@ type DCU struct { serder.Metadata `union:"DCU"` JobResourceBase Type string `json:"type"` + Name string `json:"name"` Number int64 `json:"number"` } @@ -175,6 +417,7 @@ type MLU struct { serder.Metadata `union:"MLU"` JobResourceBase Type string `json:"type"` + Name string `json:"name"` Number int64 `json:"number"` } @@ -182,6 +425,7 @@ type PRICE struct { serder.Metadata `union:"PRICE"` JobResourceBase Type string `json:"type"` + Name string `json:"name"` Number int64 `json:"number"` } @@ -211,9 +455,10 @@ type DataPreprocessJobInfo struct { type DataReturnJobInfo struct { serder.Metadata `union:"DataReturn"` JobInfoBase - Type string `json:"type"` - BucketID cdssdk.BucketID `json:"bucketID"` - TargetLocalJobID string `json:"targetLocalJobID"` + Type string `json:"type"` + BucketID cdssdk.BucketID `json:"bucketID"` + TargetLocalJobID string `json:"targetLocalJobID"` + ReportMessage TrainJobStatusReport `json:"report"` } // MultiInstanceJobInfo 多实例(推理任务) @@ -288,6 +533,8 @@ type BindingJobFileInfo struct { JobFileInfoBase Type string `json:"type"` BindingID int64 `json:"bindingID"` + // 用于参数回显 + BindingName string `json:"bindingName"` } type PackageJobFileInfo struct { @@ -316,6 +563,8 @@ type ImageJobFileInfo struct { JobFileInfoBase Type string `json:"type"` ImageID ImageID `json:"imageID"` + // 用于参数回显 + ImageName string `json:"imageName"` } type JobRuntimeInfo struct { @@ -424,3 +673,99 @@ type InferencePlatform struct { SimilarityThreshold string `json:"similarityThreshold"` EntriesPerFile string `json:"entriesPerFile"` } + +type JobOutput interface { + Output2() +} + +var JobOutputTypeUnion = types.NewTypeUnion[JobOutput]( + (*AIJobOutput)(nil), + (*BindingJobOutput)(nil), + (*UploadJobOutput)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&JobOutputTypeUnion, "type") + +type JobOutputBase struct{} + +func (d *JobOutputBase) Output2() {} + +type PublicOutput struct { + serder.Metadata `union:"object"` + JobOutputBase + Type string `json:"type"` +} + +type HPCOutput struct { + serder.Metadata `union:"HPCSlurm"` + JobOutputBase + Type string `json:"type"` + Output string `json:"output"` +} + +type AIJobOutput struct { + serder.Metadata `union:"AI"` + JobOutputBase + Type string `json:"type"` + Output string `json:"output"` +} + +type BindingJobOutput struct { + serder.Metadata `union:"binding"` + JobOutputBase + Type string `json:"type"` + BindingID DataID `json:"bindingID"` +} + +type UploadJobOutput struct { + serder.Metadata `union:"upload"` + JobOutputBase + Type string `json:"type"` + PackageID cdssdk.PackageID `json:"packageID"` +} + +type DataReturnJobOutput struct { + serder.Metadata `union:"DataReturn"` + JobOutputBase + Type string `json:"type"` + ReportMessage TrainJobStatusReport `json:"report"` + PackageID cdssdk.PackageID `json:"packageID"` +} + +type JobStatusReport interface { + Report() +} + +var JobStatusReportTypeUnion = types.NewTypeUnion[JobStatusReport]( + (*TrainJobStatusReport)(nil), + (*InferenceJobStatusReport)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&JobStatusReportTypeUnion, "type") + +type JobStatusReportBase struct{} + +func (d *JobStatusReportBase) Report() {} + +type TrainJobStatusReport struct { + serder.Metadata `union:"Train"` + JobStatusReportBase + Type string `json:"type"` + TaskName string `json:"taskName"` + TaskID string `json:"taskID"` + Status bool `json:"status"` + Message string `json:"message"` + ClusterID ClusterID `json:"clusterID"` + Output string `json:"output"` +} + +type InferenceJobStatusReport struct { + serder.Metadata `union:"Inference"` + JobStatusReportBase + Type string `json:"type"` + TaskName string `json:"taskName"` + TaskID string `json:"taskID"` + Status bool `json:"status"` + Message string `json:"message"` + URL string `json:"url"` +} diff --git a/sdks/uploader/models.go b/sdks/uploader/models.go index 2a92a89..9ad3f8d 100644 --- a/sdks/uploader/models.go +++ b/sdks/uploader/models.go @@ -21,7 +21,7 @@ type BlockChain struct { } func (BlockChain) TableName() string { - return "BlockChain" // 确保和数据库中的表名一致 + return "block_chain" // 确保和数据库中的表名一致 } type Binding struct { @@ -66,25 +66,26 @@ type BindingDetail struct { SSOId string `json:"ssoID"` Name string `json:"Name"` Info sch.DataBinding `json:"info"` - Packages []Package `json:"packages"` + Package Package `json:"package"` Status string `json:"status"` AccessLevel string `json:"accessLevel"` CreateTime time.Time `json:"createTime"` } func (Binding) TableName() string { - return "BindingData" // 确保和数据库中的表名一致 + return "bindings" // 确保和数据库中的表名一致 } type BindingCluster struct { BindingID DataID `gorm:"column:binding_id" json:"bindingID"` ClusterID ClusterID `gorm:"column:cluster_id" json:"clusterID"` Status string `gorm:"column:status" json:"status"` + Param string `gorm:"column:param" json:"Param"` JsonData string `gorm:"column:json_data" json:"jsonData"` } func (BindingCluster) TableName() string { - return "bindingCluster" // 确保和数据库中的表名一致 + return "binding_cluster" // 确保和数据库中的表名一致 } type Folder struct { @@ -107,37 +108,38 @@ type Cluster struct { } func (Cluster) TableName() string { - return "uploadedCluster" // 确保和数据库中的表名一致 + return "uploaded_cluster" // 确保和数据库中的表名一致 } type Package struct { + UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"` + PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"` + PackageName string `gorm:"column:package_name" json:"packageName"` + BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"` + DataType string `gorm:"column:data_type" json:"dataType"` + BindingID DataID `gorm:"column:binding_id" json:"bindingID"` + CreateTime time.Time `gorm:"column:create_time" json:"createTime"` + Objects []cdssdk.Object `gorm:"column:objects" json:"objects"` + UploadedCluster []Cluster `gorm:"column:uploadedCluster" json:"uploadedCluster"` + Versions []PackageCloneDAO `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"` + UploadPriority sch.UploadPriority `gorm:"column:upload_priority" json:"uploadPriority"` + BindingInfo sch.DataBinding `gorm:"column:binding_info" json:"bindingInfo"` + PackageType string `gorm:"column:package_type" json:"packageType"` +} + +type PackageDAO struct { UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"` PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"` PackageName string `gorm:"column:package_name" json:"packageName"` BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"` DataType string `gorm:"column:data_type" json:"dataType"` - JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段 - BindingID DataID `gorm:"column:binding_id" json:"bindingID"` - CreateTime time.Time `gorm:"column:create_time" json:"createTime"` - Objects []cdssdk.Object `gorm:"column:objects" json:"objects"` - UploadedCluster []Cluster `gorm:"column:uploadedCluster" json:"uploadedCluster"` - Versions []PackageCloneDAO `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"` - //BlockChain []BlockChain `gorm:"foreignKey:package_id;references:package_id" json:"blockChains"` // 关联 BlockChain 数据 - UploadPriority sch.UploadPriority `gorm:"column:upload_priority" json:"uploadPriority"` -} - -type PackageDAO struct { - UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"` - PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"` - PackageName string `gorm:"column:package_name" json:"packageName"` - BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"` - DataType string `gorm:"column:data_type" json:"dataType"` - //JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段 BindingID DataID `gorm:"column:binding_id" json:"bindingID"` CreateTime time.Time `gorm:"column:create_time" json:"createTime"` UploadedCluster []Cluster `gorm:"foreignKey:package_id;references:package_id" json:"clusters"` // 关联 Cluster 数据 Versions []PackageCloneDAO `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"` UploadPriority string `gorm:"column:upload_priority" json:"uploadPriority"` + Param string `gorm:"column:param" json:"param"` + PackageType string `gorm:"column:package_type" json:"packageType"` } type PackageCloneDAO struct { @@ -155,7 +157,7 @@ type PackageCloneDAO struct { } func (PackageCloneDAO) TableName() string { - return "packageClone" // 确保和数据库中的表名一致 + return "package_clone" // 确保和数据库中的表名一致 } type PackageCloneParam struct { @@ -165,6 +167,7 @@ type PackageCloneParam struct { Description string `json:"description"` BootstrapObjectID cdssdk.ObjectID `json:"bootstrapObjectID"` ClusterID schsdk.ClusterID `json:"clusterID"` + Output string `json:"output"` ImageID schsdk.ImageID `json:"imageID"` } @@ -192,7 +195,7 @@ type ClusterMapping struct { } func (ClusterMapping) TableName() string { - return "ClusterMapping" + return "cluster_mapping" } type ScheduleTarget interface { diff --git a/sdks/uploader/uploader.go b/sdks/uploader/uploader.go index 54f17df..6cae186 100644 --- a/sdks/uploader/uploader.go +++ b/sdks/uploader/uploader.go @@ -2,152 +2,21 @@ package uploadersdk import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/types" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" "gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/serder" + "mime/multipart" "net/url" "strings" ) -//type DataScheduleReq struct { -// PackageID cdssdk.PackageID `json:"packageID"` -// DataType string `json:"dataType"` -// ScheduleTarget ScheduleTarget `json:"scheduleTarget"` -//} - -//type DataScheduleResp struct { -// Results []sch.DataScheduleResult `json:"data"` -//} - -//type TmpDataScheduleResult struct { -// Cluster sch.DataDetail `json:"cluster"` -// PackageID cdssdk.PackageID `json:"packageID"` -// Status bool `json:"status"` -// Msg string `json:"msg"` -//} - -//func (c *Client) DataSchedule(req DataScheduleReq) ([]sch.DataScheduleResult, error) { -// targetUrl, err := url.JoinPath(c.baseURL, "/dataSchedule") -// if err != nil { -// return nil, err -// } -// -// resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ -// Body: req, -// }) -// if err != nil { -// return nil, err -// } -// println(resp.Body) -// -// contType := resp.Header.Get("Content-Type") -// if strings.Contains(contType, http2.ContentTypeJSON) { -// var codeResp response[[]TmpDataScheduleResult] -// if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { -// return nil, fmt.Errorf("parsing response: %w", err) -// } -// -// if codeResp.Code == ResponseCodeOK { -// var results []sch.DataScheduleResult -// for _, tmpResult := range codeResp.Data { -// result := sch.DataScheduleResult{ -// PackageID: tmpResult.PackageID, -// Status: tmpResult.Status, -// Msg: tmpResult.Msg, -// Clusters: []sch.DataDetail{ -// tmpResult.Cluster, -// }, -// } -// results = append(results, result) -// } -// return results, nil -// } -// -// return nil, codeResp.ToError() -// } -// -// return nil, fmt.Errorf("unknow response content type: %s", contType) -//} - -type UploadReq struct { - DataType string `json:"dataType"` - Source UploadSource `json:"source"` - Target UploadTarget `json:"target"` - //StorageIDs []cdssdk.StorageID `json:"storageIDs"` -} - -type UploadSource interface { - Noop() -} - -var UploadSourceTypeUnion = types.NewTypeUnion[UploadSource]( - (*PackageSource)(nil), - (*UrlSource)(nil), -) - -var _ = serder.UseTypeUnionInternallyTagged(&UploadSourceTypeUnion, "type") - -type PackageSource struct { - serder.Metadata `union:"jcs"` - UploadSourceBase - Type string `json:"type"` - PackageID cdssdk.PackageID `json:"packageID"` -} - -type UrlSource struct { - serder.Metadata `union:"url"` - UploadSourceBase - Type string `json:"type"` - Url string `json:"url"` -} - -type UploadSourceBase struct{} - -func (d *UploadSourceBase) Noop() {} - -type UploadTarget interface { - Noop() -} - -var UploadTargetTypeUnion = types.NewTypeUnion[UploadTarget]( - (*UrlTarget)(nil), - (*ApiTarget)(nil), -) - -var _ = serder.UseTypeUnionInternallyTagged(&UploadTargetTypeUnion, "type") - -type UrlTarget struct { - serder.Metadata `union:"url"` - UploadTargetBase - Type string `json:"type"` - ClusterID ClusterID `json:"clusterId"` - JCSUploadInfo cdsapi.ObjectUploadInfo `form:"JCSUploadInfo"` -} - -type ApiTarget struct { - serder.Metadata `union:"api"` - UploadTargetBase - Type string `json:"type"` - Clusters []ClusterID `json:"clusters"` -} - -type UploadTargetBase struct{} - -func (d *UploadTargetBase) Noop() {} - -type UploadResp struct { - PackageID cdssdk.PackageID `json:"packageID"` - ObjectIDs []cdssdk.ObjectID `json:"objectIDs"` - ClusterID ClusterID `json:"clusterID"` - JsonData string `json:"jsonData"` - Status bool `json:"status"` - Message string `json:"message"` +type ObjectUploadReq struct { + Info cdsapi.ObjectUploadInfo `form:"info" binding:"required"` + Files []*multipart.FileHeader `form:"files"` } -func (c *Client) Upload(req UploadReq) (*UploadResp, error) { - targetUrl, err := url.JoinPath(c.baseURL, "/dataUpload") +func (c *Client) Upload(req ObjectUploadReq) (*cdsapi.ObjectUploadResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/object/upload") if err != nil { return nil, err } @@ -161,7 +30,7 @@ func (c *Client) Upload(req UploadReq) (*UploadResp, error) { contType := resp.Header.Get("Content-Type") if strings.Contains(contType, http2.ContentTypeJSON) { - var codeResp response[UploadResp] + var codeResp response[cdsapi.ObjectUploadResp] if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { return nil, fmt.Errorf("parsing response: %w", err) } diff --git a/utils/http2/http.go b/utils/http2/http.go index ab67e21..f57b06f 100644 --- a/utils/http2/http.go +++ b/utils/http2/http.go @@ -119,6 +119,27 @@ func PostJSON(url string, param RequestParam) (*http.Response, error) { return defaultClient.Do(req) } +func PutJSON(url string, param RequestParam) (*http.Response, error) { + req, err := http.NewRequest(http.MethodPut, url, nil) + if err != nil { + return nil, err + } + + if err = prepareQuery(req, param.Query); err != nil { + return nil, err + } + + if err = prepareHeader(req, param.Header); err != nil { + return nil, err + } + + if err = prepareJSONBody(req, param.Body); err != nil { + return nil, err + } + + return defaultClient.Do(req) +} + func PostForm(url string, param RequestParam) (*http.Response, error) { req, err := http.NewRequest(http.MethodPost, url, nil) if err != nil { diff --git a/utils/reflect2/assign.go b/utils/reflect2/assign.go new file mode 100644 index 0000000..2e8d49d --- /dev/null +++ b/utils/reflect2/assign.go @@ -0,0 +1,29 @@ +package reflect2 + +import "reflect" + +// MergeNonZero 将 src 中非零值字段复制到 dst 中 +func MergeNonZero(dst, src any) { + dstVal := reflect.ValueOf(dst).Elem() + srcVal := reflect.ValueOf(src).Elem() + + for i := 0; i < dstVal.NumField(); i++ { + dstField := dstVal.Field(i) + srcField := srcVal.Field(i) + + // 如果 src 字段是零值,则跳过 + if isZero(srcField) { + continue + } + + // 如果 dst 字段可设置,才设置 + if dstField.CanSet() { + dstField.Set(srcField) + } + } +} + +func isZero(v reflect.Value) bool { + zero := reflect.Zero(v.Type()) + return reflect.DeepEqual(v.Interface(), zero.Interface()) +}