Browse Source

新增超算任务类型

pull/52/head
JeshuaRen 8 months ago
parent
commit
4374600529
5 changed files with 685 additions and 0 deletions
  1. +63
    -0
      sdks/hpc/client.go
  2. +5
    -0
      sdks/hpc/config.go
  3. +71
    -0
      sdks/hpc/job.go
  4. +525
    -0
      sdks/hpc/models.go
  5. +21
    -0
      sdks/scheduler/models.go

+ 63
- 0
sdks/hpc/client.go View File

@@ -0,0 +1,63 @@
package hpc

import (
"fmt"

"gitlink.org.cn/cloudream/common/sdks"
)

type response[T any] struct {
Code int `json:"code"`
Message string `json:"message"`
Data T `json:"data"`
}

type respons2[T any] struct {
Code int `json:"code"`
Message string `json:"msg"`
Data T `json:"data"`
}

const (
ResponseCodeOK int = 200
)

func (r *response[T]) ToError() *sdks.CodeMessageError {
return &sdks.CodeMessageError{
Code: fmt.Sprintf("%d", r.Code),
Message: r.Message,
}
}

type Client struct {
baseURL string
}

func NewClient(cfg *Config) *Client {
return &Client{
baseURL: cfg.URL,
}
}

type Pool interface {
Acquire() (*Client, error)
Release(cli *Client)
}

type pool struct {
cfg *Config
}

func NewPool(cfg *Config) Pool {
return &pool{
cfg: cfg,
}
}
func (p *pool) Acquire() (*Client, error) {
cli := NewClient(p.cfg)
return cli, nil
}

func (p *pool) Release(cli *Client) {

}

+ 5
- 0
sdks/hpc/config.go View File

@@ -0,0 +1,5 @@
package hpc

type Config struct {
URL string `json:"url"`
}

+ 71
- 0
sdks/hpc/job.go View File

@@ -0,0 +1,71 @@
package hpc

import (
"fmt"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder"
"net/url"
"strings"
)

type CreateJobReq struct {
Name string `json:"name"`
ClusterID schsdk.ClusterID `json:"clusterId"`
Backend string `json:"backend"`
App string `json:"app"`
OperateType string `json:"operateType"`
Parameters HPCParameter `json:"parameters"`
}

type HPCParameter struct {
JobName string `json:"jobName"`
Partition string `json:"partition"`
Ntasks string `json:"ntasks"`
Nodes string `json:"nodes"`
InputFile string `json:"inputFile"`
}

type CreateJobResp struct {
Backend string `json:"backend"`
JobInfo HPCJobInfo `json:"jobInfo"`
}

type HPCJobInfo struct {
JobDir string `json:"jobDir"`
JobID string `json:"jobId"`
}

func (c *Client) CreateJob(req CreateJobReq, token string) (*CreateJobResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/hpc/commitHpcTask")
if err != nil {
return nil, err
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
Header: map[string]string{
"Authorization": token,
},
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp respons2[CreateJobResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {
return &codeResp.Data, nil
}

return nil, fmt.Errorf("error: %s", codeResp.Message)
}

return nil, fmt.Errorf("unknow response content type: %s", contType)

}

+ 525
- 0
sdks/hpc/models.go View File

@@ -0,0 +1,525 @@
package hpc

import (
"gitlink.org.cn/cloudream/common/pkgs/types"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
"time"
)

type ResourceType string

const (
ResourceTypeCPU ResourceType = "CPU"
ResourceTypeNPU ResourceType = "NPU"
ResourceTypeGPU ResourceType = "GPU"
ResourceTypeMLU ResourceType = "MLU"
ResourceTypeStorage ResourceType = "STORAGE"
ResourceTypeMemory ResourceType = "MEMORY"

Split = "/"

CODE = "code"
DATASET = "dataset"
IMAGE = "image"
MODEL = "model"
RESULT = "result"

OrderByName = "name"
OrderBySize = "size"
OrderByTime = "time"

StorageTypeURL = "url"
StorageTypeJCS = "jcs"

RejectedStatus = "rejected"
PendingStatus = "pending"
ApprovedStatus = "approved"
RevokedStatus = "revoked"
CancelStatus = "cancel"
ExpiredStatus = "expired"

ApplyAccess = "apply"
PrivateAccess = "private"
PublicAccess = "public"

PreferencePriority = "preference"
SpecifyClusterPriority = "specify"

FailedStatus = "failed"
SuccessStatus = "success"

Query = "query"
Delete = "delete"

ChildrenType = "children"
ParentType = "parent"

PlatformSugon = "sugon"
PlatformOpenI = "OpenI"
PlatformModelArts = "ModelArts"

URL = "url"
ID = "id"

Startup = "startup"
)

type TaskID int64
type DataID int64

type ClusterDetail struct {
// 集群ID
ClusterId schsdk.ClusterID `json:"clusterID"`
// 集群功能类型:云算,智算,超算
ClusterType string `json:"clusterType"`
// 集群地区:华东地区、华南地区、华北地区、华中地区、西南地区、西北地区、东北地区
Region string `json:"region"`
// 资源类型
Resources2 []ResourceData `json:"resources1,omitempty"`
//Resources2 []ResourceData `json:"resources"`
Resources []ClusterResource `json:"resources"`
}

type ClusterResource struct {
Resource TmpResourceData `json:"resource"`
BaseResources []TmpResourceData `json:"baseResources"`
}

type TmpResourceData struct {
Type ResourceType `json:"type"`
Name string `json:"name"`
Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"`
}

type ResourceData interface {
Noop()
}

var ResourceDataTypeUnion = types.NewTypeUnion[ResourceData](
(*CPUResourceData)(nil),
(*NPUResourceData)(nil),
(*GPUResourceData)(nil),
(*MLUResourceData)(nil),
(*DCUResourceData)(nil),
(*GCUResourceData)(nil),
(*GPGPUResourceData)(nil),
(*StorageResourceData)(nil),
(*MemoryResourceData)(nil),
(*BalanceResourceData)(nil),
(*RateResourceData)(nil),
)
var _ = serder.UseTypeUnionInternallyTagged(&ResourceDataTypeUnion, "type")

type ResourceDataBase struct{}

func (d *ResourceDataBase) Noop() {}

type UnitValue[T any] struct {
Unit string `json:"unit"`
Value T `json:"value"`
}

type CPUResourceData struct {
serder.Metadata `union:"CPU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type NPUResourceData struct {
serder.Metadata `union:"NPU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type GPUResourceData struct {
serder.Metadata `union:"GPU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type MLUResourceData struct {
serder.Metadata `union:"MLU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type DCUResourceData struct {
serder.Metadata `union:"DCU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type GCUResourceData struct {
serder.Metadata `union:"GCU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type GPGPUResourceData struct {
serder.Metadata `union:"ILUVATAR-GPGPU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type StorageResourceData struct {
serder.Metadata `union:"STORAGE"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"`
}

type MemoryResourceData struct {
serder.Metadata `union:"MEMORY"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"`
}

type BalanceResourceData struct {
serder.Metadata `union:"BALANCE"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"`
}

type RateResourceData struct {
serder.Metadata `union:"RATE"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"`
}

type ResourceRange struct {
UserID cdssdk.UserID `json:"userID"`
Type ResourceType `json:"type"`
GPU Range `json:"gpu"`
GPUNumber int `json:"gpuNumber"`
CPU Range `json:"cpu"`
Memory Range `json:"memory"`
Storage Range `json:"storage"`
}

type Range struct {
Min float64 `json:"min"`
Max float64 `json:"max"`
}

type ResourcePriority interface {
Noop()
}

type ResourcePriorityBase struct {
}

var ResourcePriorityTypeUnion = types.NewTypeUnion[ResourcePriority](
(*RegionPriority)(nil),
(*ChipPriority)(nil),
(*BiasPriority)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&ResourcePriorityTypeUnion, "type")

func (d *ResourcePriorityBase) Noop() {}

type RegionPriority struct {
serder.Metadata `union:"region"`
ResourcePriorityBase
Type string `json:"type"`
Options []string `json:"options"`
}

type ChipPriority struct {
serder.Metadata `union:"chip"`
ResourcePriorityBase
Type string `json:"type"`
Options []string `json:"options"`
}

type BiasPriority struct {
serder.Metadata `union:"bias"`
ResourcePriorityBase
Type string `json:"type"`
Options []string `json:"options"`
}

type TaskMessage struct {
Status string `json:"status"`
Message string `json:"message"`
}

type ReportMessage struct {
TaskName string `json:"taskName"`
TaskID string `json:"taskID"`
Status bool `json:"status"`
Message string `json:"message"`
ClusterID schsdk.ClusterID `json:"clusterID"`
Output string `json:"output"`
}

type UploadParams struct {
DataType string `json:"dataType"`
UploadInfo UploadInfo `json:"uploadInfo"`
}

type UploadInfo interface {
Noop()
}

var UploadInfoTypeUnion = types.NewTypeUnion[UploadInfo](
(*LocalUploadInfo)(nil),
(*RemoteUploadInfo)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&UploadInfoTypeUnion, "type")

type LocalUploadInfo struct {
serder.Metadata `union:"local"`
UploadInfoBase
Type string `json:"type"`
LocalPath string `json:"localPath"`
ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
}

type RemoteUploadInfo struct {
serder.Metadata `union:"url"`
UploadInfoBase
Type string `json:"type"`
Url string `json:"url"`
Branch string `json:"branch"`
DataName string `json:"dataName"`
Cluster schsdk.ClusterID `json:"clusterID"`
}

type UploadInfoBase struct{}

func (d *UploadInfoBase) Noop() {}

type UploadPriority interface {
Noop()
}

var UploadPriorityTypeUnion = types.NewTypeUnion[UploadPriority](
(*Preferences)(nil),
(*SpecifyCluster)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&UploadPriorityTypeUnion, "type")

type Preferences struct {
serder.Metadata `union:"preference"`
UploadPriorityBase
Type string `json:"type"`
ResourcePriorities []ResourcePriority `json:"priorities"`
}

type SpecifyCluster struct {
serder.Metadata `union:"specify"`
UploadPriorityBase
Type string `json:"type"`
Clusters []schsdk.ClusterID `json:"clusters"`
}

type UploadPriorityBase struct{}

func (d *UploadPriorityBase) Noop() {}

type QueryData struct {
DataType string `json:"dataType" binding:"required"`
UserID cdssdk.UserID `json:"userID" binding:"required"`
Path string `json:"path"`
PackageID cdssdk.PackageID `json:"packageID" binding:"required"`
CurrentPage int `json:"currentPage" binding:"required"`
PageSize int `json:"pageSize" binding:"required"`
OrderBy string `json:"orderBy" binding:"required"`
}

type DataBinding interface {
Noop()
}

var DataBindingTypeUnion = types.NewTypeUnion[DataBinding](
(*DatasetBinding)(nil),
(*ModelBinding)(nil),
(*CodeBinding)(nil),
(*ImageBinding)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&DataBindingTypeUnion, "type")

type DataBindingBase struct{}

func (d *DataBindingBase) Noop() {}

type DatasetBinding struct {
serder.Metadata `union:"dataset"`
DataBindingBase
Type string `json:"type"`
Name string `json:"name"`
ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
Description string `json:"description"`
Category string `json:"category"`
PackageID cdssdk.PackageID `json:"packageID"`
RepositoryName string `json:"repositoryName"`
ConsumptionPoints float64 `json:"points"`
}

type ModelBinding struct {
serder.Metadata `union:"model"`
DataBindingBase
Type string `json:"type"`
Name string `json:"name"`
ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
Description string `json:"description"`
Category string `json:"category"`
ModelType string `json:"modelType"`
Env string `json:"env"`
Version string `json:"version"`
PackageID cdssdk.PackageID `json:"packageID"`
RepositoryName string `json:"repositoryName"`
}

type CodeBinding struct {
serder.Metadata `union:"code"`
DataBindingBase
Type string `json:"type"`
Name string `json:"name"`
ClusterID schsdk.ClusterID `json:"clusterID"`
Description string `json:"description"`
ImageID schsdk.ImageID `json:"imageID"`
BootstrapObjectID cdssdk.ObjectID `json:"bootstrapObjectID"`
PackageID cdssdk.PackageID `json:"packageID"`
Output string `json:"output"`
// 当集群为openi的时候,需要传入分支
Branch string `json:"branch"`
}

//type ImageBinding struct {
// serder.Metadata `union:"image"`
// DataBindingBase
// Type string `json:"type"`
// Name string `json:"name"`
// ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
// Description string `json:"description"`
// Architecture string `json:"architecture"`
// ResourceType string `json:"resourceType"`
// Tags []string `json:"tags"`
// PackageID cdssdk.PackageID `json:"packageID"`
//}

type ImageBinding struct {
serder.Metadata `union:"image"`
DataBindingBase
Type string `json:"type"`
ID int64 `json:"id"`
Name string `json:"name"`
IDType string `json:"idType"`
ImageID string `json:"imageID"`
ClusterID schsdk.ClusterID `json:"clusterID"`
}

type Image struct {
ImageID schsdk.ImageID `json:"imageID" gorm:"column:ImageID;primaryKey"`
Name string `json:"name" gorm:"column:Name"`
CreateTime time.Time `json:"createTime" gorm:"column:CreateTime"`
ClusterImage []ClusterImage `gorm:"foreignKey:image_id;references:ImageID" json:"clusterImages"`
}

type ClusterImage struct {
ImageID schsdk.ImageID `gorm:"column:image_id" json:"imageID"`
ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"`
OriginImageType string `gorm:"column:origin_image_type" json:"originImageType"`
OriginImageID string `gorm:"column:origin_image_id" json:"originImageID"`
OriginImageName string `gorm:"column:origin_image_name" json:"originImageName"`
ClusterImageCard []ClusterImageCard `gorm:"foreignKey:origin_image_id;references:origin_image_id" json:"cards"`
}

func (ClusterImage) TableName() string {
return "clusterImage"
}

type ClusterImageCard struct {
OriginImageID string `gorm:"column:origin_image_id" json:"originImageID"`
Card string `gorm:"column:card" json:"card"`
}

func (ClusterImageCard) TableName() string {
return "clusterImageCard"
}

type QueryBindingFilters struct {
Status string `json:"status"`
Name string `json:"name"`
}

type QueryBindingDataParam interface {
Noop()
}

var QueryBindingDataParamTypeUnion = types.NewTypeUnion[QueryBindingDataParam](
(*PrivateLevel)(nil),
(*ApplyLevel)(nil),
(*PublicLevel)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&QueryBindingDataParamTypeUnion, "type")

type QueryBindingDataParamBase struct{}

func (d *QueryBindingDataParamBase) Noop() {}

type PrivateLevel struct {
serder.Metadata `union:"private"`
QueryBindingDataParamBase
Type string `json:"type" binding:"required"`
UserID cdssdk.UserID `json:"userID" binding:"required"`
BindingID int64 `json:"bindingID" binding:"required"`
Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现
}

type ApplyLevel struct {
serder.Metadata `union:"apply"`
QueryBindingDataParamBase
Type string `json:"type" binding:"required"`
UserID cdssdk.UserID `json:"userID" binding:"required"`
Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现
}

type PublicLevel struct {
serder.Metadata `union:"public"`
QueryBindingDataParamBase
UserID cdssdk.UserID `json:"userID" binding:"required"`
Type string `json:"type" binding:"required"`
Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现
}

+ 21
- 0
sdks/scheduler/models.go View File

@@ -60,6 +60,7 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo](
(*FinetuningJobInfo)(nil),
(*DataPreprocessJobInfo)(nil),
(*PCMJobInfo)(nil),
(*HPCJobInfo)(nil),
)
var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type")

@@ -92,6 +93,26 @@ type PCMJobInfo struct {
JobResources JobResources `json:"jobResources"`
}

type HPCJobInfo struct {
serder.Metadata `union:"HPC"`
JobInfoBase
Type string `json:"type"`
Name string `json:"name"`
ClusterID ClusterID `json:"clusterID"`
Backend string `json:"backend"`
App string `json:"app"`
OperateType string `json:"operateType"`
Parameters HPCParameter `json:"parameters"`
}

type HPCParameter struct {
JobName string `json:"jobName"`
Partition string `json:"partition"`
Ntasks string `json:"ntasks"`
Nodes string `json:"nodes"`
InputFile string `json:"inputFile"`
}

type JobResources struct {
//任务分配策略:负载均衡、积分优先、随机分配等,dataLocality, leastLoadFirst
ScheduleStrategy string `json:"scheduleStrategy"`


Loading…
Cancel
Save