Browse Source

Merge pull request '调度系统相关的参数信息' (#52) from feature_rzs into master

master
Sydonian 6 months ago
parent
commit
db9cab87bd
14 changed files with 1573 additions and 286 deletions
  1. +44
    -6
      sdks/blockchain/blockchain.go
  2. +63
    -0
      sdks/hpc/client.go
  3. +5
    -0
      sdks/hpc/config.go
  4. +83
    -0
      sdks/hpc/job.go
  5. +525
    -0
      sdks/hpc/models.go
  6. +24
    -0
      sdks/pcmscheduler/job.go
  7. +111
    -30
      sdks/pcmscheduler/jobset.go
  8. +134
    -39
      sdks/pcmscheduler/models.go
  9. +102
    -0
      sdks/scheduler/jobflow.go
  10. +367
    -20
      sdks/scheduler/models.go
  11. +58
    -52
      sdks/uploader/models.go
  12. +7
    -139
      sdks/uploader/uploader.go
  13. +21
    -0
      utils/http2/http.go
  14. +29
    -0
      utils/reflect2/assign.go

+ 44
- 6
sdks/blockchain/blockchain.go View File

@@ -6,6 +6,7 @@ import (
"gitlink.org.cn/cloudream/common/utils/serder"
"net/url"
"strings"
"time"
)

type InvokeReq struct {
@@ -14,6 +15,7 @@ type InvokeReq struct {
MemberName string `json:"memberName"`
Type string `json:"type"`
Args []string `json:"args"`
Amount int64 `json:"amount"`
}

func (c *Client) BlockChainInvoke(req InvokeReq, token string) error {
@@ -22,15 +24,9 @@ func (c *Client) BlockChainInvoke(req InvokeReq, token string) error {
return err
}

//token, err := c.getToken()
//if err != nil {
// return err
//}

header := make(map[string]string)
header["Content-Type"] = http2.ContentTypeJSON
header["Authorization"] = token
println("token: " + token)

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
@@ -55,6 +51,48 @@ func (c *Client) BlockChainInvoke(req InvokeReq, token string) error {
return nil
}

type UnPledgePointsReq struct {
PrivateKey string `form:"privateKey"`
BusinessCode string `form:"businessCode"`
EndTime time.Time `form:"endTime"`
}

func (c *Client) UnPledgePoints(req UnPledgePointsReq, token string) error {
targetUrl, err := url.JoinPath(c.baseURL, "/jcc-bcos/points/unPledgePoints")
if err != nil {
return err
}

header := make(map[string]string)
header["Content-Type"] = http2.ContentTypeJSON
header["Authorization"] = token

resp, err := http2.PutJSON(targetUrl, http2.RequestParam{
Query: req,
Header: header,
})
if err != nil {
println(err)
return err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[string]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == 200 {
return nil
}

return codeResp.ToError()
}

return nil
}

type TokenReq struct {
Username string `json:"username"`
Password string `json:"password"`


+ 63
- 0
sdks/hpc/client.go View File

@@ -0,0 +1,63 @@
package hpc

import (
"fmt"

"gitlink.org.cn/cloudream/common/sdks"
)

type response[T any] struct {
Code int `json:"code"`
Message string `json:"message"`
Data T `json:"data"`
}

type respons2[T any] struct {
Code int `json:"code"`
Message string `json:"msg"`
Data T `json:"data"`
}

const (
ResponseCodeOK int = 200
)

func (r *response[T]) ToError() *sdks.CodeMessageError {
return &sdks.CodeMessageError{
Code: fmt.Sprintf("%d", r.Code),
Message: r.Message,
}
}

type Client struct {
baseURL string
}

func NewClient(cfg *Config) *Client {
return &Client{
baseURL: cfg.URL,
}
}

type Pool interface {
Acquire() (*Client, error)
Release(cli *Client)
}

type pool struct {
cfg *Config
}

func NewPool(cfg *Config) Pool {
return &pool{
cfg: cfg,
}
}
func (p *pool) Acquire() (*Client, error) {
cli := NewClient(p.cfg)
return cli, nil
}

func (p *pool) Release(cli *Client) {

}

+ 5
- 0
sdks/hpc/config.go View File

@@ -0,0 +1,5 @@
package hpc

type Config struct {
URL string `json:"url"`
}

+ 83
- 0
sdks/hpc/job.go View File

@@ -0,0 +1,83 @@
package hpc

import (
"fmt"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder"
"net/url"
"strings"
)

type CreateHPCJobReq struct {
Name string `json:"name"`
Description string `json:"description"`
ClusterID schsdk.ClusterID `json:"clusterId"`
Backend string `json:"backend"`
App string `json:"app"`
OperateType string `json:"operateType"`
ScriptContent string `json:"scriptContent"`
//Parameters HPCParameter `json:"parameters"`
Parameters map[string]string `json:"parameters"`
}

//type HPCParameter struct {
// JobName string `json:"jobName"`
// Partition string `json:"partition"`
// Ntasks string `json:"ntasks"`
// Nodes string `json:"nodes"`
// BamFile string `json:"bamFile"`
// InputFile string `json:"inputFile"`
//}

type CreateJobResp struct {
Backend string `json:"backend"`
JobInfo HPCJobInfo `json:"jobInfo"`
}

type HPCJobInfo struct {
JobDir string `json:"jobDir"`
JobID string `json:"jobId"`
TaskID string `json:"taskId"`
}

func (c *Client) CreateJob(req CreateHPCJobReq, token string) (*CreateJobResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/hpc/commitHpcTask")
if err != nil {
return nil, err
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
Header: map[string]string{
"Authorization": token,
},
})
if err != nil {
return nil, err
}

// 打印resp.Body内容
//body, err := io.ReadAll(resp.Body)
//if err != nil {
// println("Error reading response body:", err)
//}
//println("Response Body:", string(body))

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp respons2[CreateJobResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {
return &codeResp.Data, nil
}

return nil, fmt.Errorf("error: %s", codeResp.Message)
}

return nil, fmt.Errorf("unknow response content type: %s", contType)

}

+ 525
- 0
sdks/hpc/models.go View File

@@ -0,0 +1,525 @@
package hpc

import (
"gitlink.org.cn/cloudream/common/pkgs/types"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
"time"
)

type ResourceType string

const (
ResourceTypeCPU ResourceType = "CPU"
ResourceTypeNPU ResourceType = "NPU"
ResourceTypeGPU ResourceType = "GPU"
ResourceTypeMLU ResourceType = "MLU"
ResourceTypeStorage ResourceType = "STORAGE"
ResourceTypeMemory ResourceType = "MEMORY"

Split = "/"

CODE = "code"
DATASET = "dataset"
IMAGE = "image"
MODEL = "model"
RESULT = "result"

OrderByName = "name"
OrderBySize = "size"
OrderByTime = "time"

StorageTypeURL = "url"
StorageTypeJCS = "jcs"

RejectedStatus = "rejected"
PendingStatus = "pending"
ApprovedStatus = "approved"
RevokedStatus = "revoked"
CancelStatus = "cancel"
ExpiredStatus = "expired"

ApplyAccess = "apply"
PrivateAccess = "private"
PublicAccess = "public"

PreferencePriority = "preference"
SpecifyClusterPriority = "specify"

FailedStatus = "failed"
SuccessStatus = "success"

Query = "query"
Delete = "delete"

ChildrenType = "children"
ParentType = "parent"

PlatformSugon = "sugon"
PlatformOpenI = "OpenI"
PlatformModelArts = "ModelArts"

URL = "url"
ID = "id"

Startup = "startup"
)

type TaskID int64
type DataID int64

type ClusterDetail struct {
// 集群ID
ClusterId schsdk.ClusterID `json:"clusterID"`
// 集群功能类型:云算,智算,超算
ClusterType string `json:"clusterType"`
// 集群地区:华东地区、华南地区、华北地区、华中地区、西南地区、西北地区、东北地区
Region string `json:"region"`
// 资源类型
Resources2 []ResourceData `json:"resources1,omitempty"`
//Resources2 []ResourceData `json:"resources"`
Resources []ClusterResource `json:"resources"`
}

type ClusterResource struct {
Resource TmpResourceData `json:"resource"`
BaseResources []TmpResourceData `json:"baseResources"`
}

type TmpResourceData struct {
Type ResourceType `json:"type"`
Name string `json:"name"`
Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"`
}

type ResourceData interface {
Noop()
}

var ResourceDataTypeUnion = types.NewTypeUnion[ResourceData](
(*CPUResourceData)(nil),
(*NPUResourceData)(nil),
(*GPUResourceData)(nil),
(*MLUResourceData)(nil),
(*DCUResourceData)(nil),
(*GCUResourceData)(nil),
(*GPGPUResourceData)(nil),
(*StorageResourceData)(nil),
(*MemoryResourceData)(nil),
(*BalanceResourceData)(nil),
(*RateResourceData)(nil),
)
var _ = serder.UseTypeUnionInternallyTagged(&ResourceDataTypeUnion, "type")

type ResourceDataBase struct{}

func (d *ResourceDataBase) Noop() {}

type UnitValue[T any] struct {
Unit string `json:"unit"`
Value T `json:"value"`
}

type CPUResourceData struct {
serder.Metadata `union:"CPU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type NPUResourceData struct {
serder.Metadata `union:"NPU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type GPUResourceData struct {
serder.Metadata `union:"GPU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type MLUResourceData struct {
serder.Metadata `union:"MLU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type DCUResourceData struct {
serder.Metadata `union:"DCU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type GCUResourceData struct {
serder.Metadata `union:"GCU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type GPGPUResourceData struct {
serder.Metadata `union:"ILUVATAR-GPGPU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type StorageResourceData struct {
serder.Metadata `union:"STORAGE"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"`
}

type MemoryResourceData struct {
serder.Metadata `union:"MEMORY"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"`
}

type BalanceResourceData struct {
serder.Metadata `union:"BALANCE"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"`
}

type RateResourceData struct {
serder.Metadata `union:"RATE"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"`
}

type ResourceRange struct {
UserID cdssdk.UserID `json:"userID"`
Type ResourceType `json:"type"`
GPU Range `json:"gpu"`
GPUNumber int `json:"gpuNumber"`
CPU Range `json:"cpu"`
Memory Range `json:"memory"`
Storage Range `json:"storage"`
}

type Range struct {
Min float64 `json:"min"`
Max float64 `json:"max"`
}

type ResourcePriority interface {
Noop()
}

type ResourcePriorityBase struct {
}

var ResourcePriorityTypeUnion = types.NewTypeUnion[ResourcePriority](
(*RegionPriority)(nil),
(*ChipPriority)(nil),
(*BiasPriority)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&ResourcePriorityTypeUnion, "type")

func (d *ResourcePriorityBase) Noop() {}

type RegionPriority struct {
serder.Metadata `union:"region"`
ResourcePriorityBase
Type string `json:"type"`
Options []string `json:"options"`
}

type ChipPriority struct {
serder.Metadata `union:"chip"`
ResourcePriorityBase
Type string `json:"type"`
Options []string `json:"options"`
}

type BiasPriority struct {
serder.Metadata `union:"bias"`
ResourcePriorityBase
Type string `json:"type"`
Options []string `json:"options"`
}

type TaskMessage struct {
Status string `json:"status"`
Message string `json:"message"`
}

type ReportMessage struct {
TaskName string `json:"taskName"`
TaskID string `json:"taskID"`
Status bool `json:"status"`
Message string `json:"message"`
ClusterID schsdk.ClusterID `json:"clusterID"`
Output string `json:"output"`
}

type UploadParams struct {
DataType string `json:"dataType"`
UploadInfo UploadInfo `json:"uploadInfo"`
}

type UploadInfo interface {
Noop()
}

var UploadInfoTypeUnion = types.NewTypeUnion[UploadInfo](
(*LocalUploadInfo)(nil),
(*RemoteUploadInfo)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&UploadInfoTypeUnion, "type")

type LocalUploadInfo struct {
serder.Metadata `union:"local"`
UploadInfoBase
Type string `json:"type"`
LocalPath string `json:"localPath"`
ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
}

type RemoteUploadInfo struct {
serder.Metadata `union:"url"`
UploadInfoBase
Type string `json:"type"`
Url string `json:"url"`
Branch string `json:"branch"`
DataName string `json:"dataName"`
Cluster schsdk.ClusterID `json:"clusterID"`
}

type UploadInfoBase struct{}

func (d *UploadInfoBase) Noop() {}

type UploadPriority interface {
Noop()
}

var UploadPriorityTypeUnion = types.NewTypeUnion[UploadPriority](
(*Preferences)(nil),
(*SpecifyCluster)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&UploadPriorityTypeUnion, "type")

type Preferences struct {
serder.Metadata `union:"preference"`
UploadPriorityBase
Type string `json:"type"`
ResourcePriorities []ResourcePriority `json:"priorities"`
}

type SpecifyCluster struct {
serder.Metadata `union:"specify"`
UploadPriorityBase
Type string `json:"type"`
Clusters []schsdk.ClusterID `json:"clusters"`
}

type UploadPriorityBase struct{}

func (d *UploadPriorityBase) Noop() {}

type QueryData struct {
DataType string `json:"dataType" binding:"required"`
UserID cdssdk.UserID `json:"userID" binding:"required"`
Path string `json:"path"`
PackageID cdssdk.PackageID `json:"packageID" binding:"required"`
CurrentPage int `json:"currentPage" binding:"required"`
PageSize int `json:"pageSize" binding:"required"`
OrderBy string `json:"orderBy" binding:"required"`
}

type DataBinding interface {
Noop()
}

var DataBindingTypeUnion = types.NewTypeUnion[DataBinding](
(*DatasetBinding)(nil),
(*ModelBinding)(nil),
(*CodeBinding)(nil),
(*ImageBinding)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&DataBindingTypeUnion, "type")

type DataBindingBase struct{}

func (d *DataBindingBase) Noop() {}

type DatasetBinding struct {
serder.Metadata `union:"dataset"`
DataBindingBase
Type string `json:"type"`
Name string `json:"name"`
ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
Description string `json:"description"`
Category string `json:"category"`
PackageID cdssdk.PackageID `json:"packageID"`
RepositoryName string `json:"repositoryName"`
ConsumptionPoints float64 `json:"points"`
}

type ModelBinding struct {
serder.Metadata `union:"model"`
DataBindingBase
Type string `json:"type"`
Name string `json:"name"`
ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
Description string `json:"description"`
Category string `json:"category"`
ModelType string `json:"modelType"`
Env string `json:"env"`
Version string `json:"version"`
PackageID cdssdk.PackageID `json:"packageID"`
RepositoryName string `json:"repositoryName"`
}

type CodeBinding struct {
serder.Metadata `union:"code"`
DataBindingBase
Type string `json:"type"`
Name string `json:"name"`
ClusterID schsdk.ClusterID `json:"clusterID"`
Description string `json:"description"`
ImageID schsdk.ImageID `json:"imageID"`
BootstrapObjectID cdssdk.ObjectID `json:"bootstrapObjectID"`
PackageID cdssdk.PackageID `json:"packageID"`
Output string `json:"output"`
// 当集群为openi的时候,需要传入分支
Branch string `json:"branch"`
}

//type ImageBinding struct {
// serder.Metadata `union:"image"`
// DataBindingBase
// Type string `json:"type"`
// Name string `json:"name"`
// ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
// Description string `json:"description"`
// Architecture string `json:"architecture"`
// ResourceType string `json:"resourceType"`
// Tags []string `json:"tags"`
// PackageID cdssdk.PackageID `json:"packageID"`
//}

type ImageBinding struct {
serder.Metadata `union:"image"`
DataBindingBase
Type string `json:"type"`
ID int64 `json:"id"`
Name string `json:"name"`
IDType string `json:"idType"`
ImageID string `json:"imageID"`
ClusterID schsdk.ClusterID `json:"clusterID"`
}

type Image struct {
ImageID schsdk.ImageID `json:"imageID" gorm:"column:ImageID;primaryKey"`
Name string `json:"name" gorm:"column:Name"`
CreateTime time.Time `json:"createTime" gorm:"column:CreateTime"`
ClusterImage []ClusterImage `gorm:"foreignKey:image_id;references:ImageID" json:"clusterImages"`
}

type ClusterImage struct {
ImageID schsdk.ImageID `gorm:"column:image_id" json:"imageID"`
ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"`
OriginImageType string `gorm:"column:origin_image_type" json:"originImageType"`
OriginImageID string `gorm:"column:origin_image_id" json:"originImageID"`
OriginImageName string `gorm:"column:origin_image_name" json:"originImageName"`
ClusterImageCard []ClusterImageCard `gorm:"foreignKey:origin_image_id;references:origin_image_id" json:"cards"`
}

func (ClusterImage) TableName() string {
return "cluster_image"
}

type ClusterImageCard struct {
OriginImageID string `gorm:"column:origin_image_id" json:"originImageID"`
Card string `gorm:"column:card" json:"card"`
}

func (ClusterImageCard) TableName() string {
return "cluster_image_card"
}

type QueryBindingFilters struct {
Status string `json:"status"`
Name string `json:"name"`
}

type QueryBindingDataParam interface {
Noop()
}

var QueryBindingDataParamTypeUnion = types.NewTypeUnion[QueryBindingDataParam](
(*PrivateLevel)(nil),
(*ApplyLevel)(nil),
(*PublicLevel)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&QueryBindingDataParamTypeUnion, "type")

type QueryBindingDataParamBase struct{}

func (d *QueryBindingDataParamBase) Noop() {}

type PrivateLevel struct {
serder.Metadata `union:"private"`
QueryBindingDataParamBase
Type string `json:"type" binding:"required"`
UserID cdssdk.UserID `json:"userID" binding:"required"`
BindingID int64 `json:"bindingID" binding:"required"`
Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现
}

type ApplyLevel struct {
serder.Metadata `union:"apply"`
QueryBindingDataParamBase
Type string `json:"type" binding:"required"`
UserID cdssdk.UserID `json:"userID" binding:"required"`
Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现
}

type PublicLevel struct {
serder.Metadata `union:"public"`
QueryBindingDataParamBase
UserID cdssdk.UserID `json:"userID" binding:"required"`
Type string `json:"type" binding:"required"`
Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现
}

+ 24
- 0
sdks/pcmscheduler/job.go View File

@@ -0,0 +1,24 @@
package sch

import (
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"time"
)

type PCMJob struct {
ID string `gorm:"column:id" json:"ID"`
JobType string `gorm:"column:job_type" json:"jobType"`
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
JobSetID schsdk.JobSetID `gorm:"column:jobset_id" json:"jobSetID"`
LocalJobID string `gorm:"column:local_job_id" json:"localJobID"`
Param string `gorm:"column:param" json:"param"`
Token string `gorm:"column:token" json:"token"`
CreatedAt time.Time `gorm:"column:created_at" json:"createdAt"`
}

type PCMJobDataReturn struct {
JobID string `gorm:"column:job_id" json:"jobID"`
ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"`
PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"`
}

+ 111
- 30
sdks/pcmscheduler/jobset.go View File

@@ -14,22 +14,21 @@ type GetClusterInfoReq struct {
IDs []schsdk.ClusterID `json:"clusterIDs"`
}

//type GetClusterInfoResp struct {
// Data []ClusterDetail `json:"data"`
// TraceId string `json:"traceId"`
//}

func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/queryResources")
func (c *Client) GetClusterInfo(req GetClusterInfoReq, token string) ([]ClusterDetail, error) {
targetUrl, err := url.JoinPath(c.baseURL, "schedule/queryResources")
if err != nil {
return nil, err
}

//resp, err := http2.PostJSON(targetUrl, http2.RequestParam{Body: req})
resp, err := http2.PostJSON(targetUrl, http2.RequestParam{})
resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Header: map[string]string{
"Authorization": token,
},
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {

@@ -57,11 +56,20 @@ func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error)
return nil, fmt.Errorf("unknow response content type: %s", contType)
}

type CreateJobReq struct {
Name string `json:"name"`
Description string `json:"description"`
JobResources schsdk.JobResources `json:"jobResources"`
DataDistribute DataDistribute `json:"dataDistributes"`
type CreateInferenceJobResp struct {
TaskId string `json:"taskId"`
}

type CreateAIJobReq struct {
Name string `json:"name"`
Description string `json:"description"`
JobResources schsdk.JobResources `json:"jobResources"`
DataDistributes DataDistribute `json:"dataDistributes"`
}

type CommonJsonData struct {
ID string `json:"id"`
Name string `json:"name"`
}

type DataDistribute struct {
@@ -73,8 +81,9 @@ type DataDistribute struct {

type DataDetail struct {
ClusterID schsdk.ClusterID `json:"clusterID"`
StorageID cdssdk.StorageID `json:"storageID"`
JsonData string `json:"jsonData"`
//StorageID cdssdk.StorageID `json:"storageID"`
StorageID cdssdk.StorageID
JsonData string `json:"jsonData"`
}

type DatasetDistribute struct {
@@ -86,13 +95,15 @@ type DatasetDistribute struct {
type CodeDistribute struct {
DataName string `json:"dataName"`
PackageID cdssdk.PackageID `json:"packageID"`
Output string `json:"output"`
Clusters []DataDetail `json:"clusters"`
}

type ImageDistribute struct {
DataName string `json:"dataName"`
PackageID cdssdk.PackageID `json:"packageID"`
Clusters []DataDetail `json:"clusters"`
DataName string `json:"dataName"`
//PackageID cdssdk.PackageID `json:"packageID"`
ImageID schsdk.ImageID `json:"packageID"`
Clusters []DataDetail `json:"clusters"`
}

type ModelDistribute struct {
@@ -103,6 +114,7 @@ type ModelDistribute struct {

type CreateJobResp struct {
TaskID TaskID `json:"taskID"`
TaskName string `json:"taskName"`
ScheduleDatas []ScheduleData `json:"scheduleDatas"`
}

@@ -113,21 +125,87 @@ type ScheduleData struct {
ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
}

func (c *Client) CreateJob(req CreateJobReq) (*CreateJobResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/createTask")
func (c *Client) CreateInferenceJob(req CreateAIJobReq, token string) (*CreateInferenceJobResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "inference/createTask")
if err != nil {
return nil, err
}

// 将req转换成json,并打印
//req2, err := serder.ObjectToJSONEx(req)
//if err != nil {
// return nil, fmt.Errorf("request to json: %w", err)
//}
//fmt.Println(string(req2))
resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
Header: map[string]string{
"Authorization": token,
},
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp respons2[CreateInferenceJobResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {
return &codeResp.Data, nil
}

return nil, fmt.Errorf("error: %s", codeResp.Message)
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
}

type StopInferenceJobReq struct {
TaskId string `json:"taskId"`
}

func (c *Client) StopInferenceJob(req StopInferenceJobReq, token string) error {
targetUrl, err := url.JoinPath(c.baseURL, "inference/createTask")
if err != nil {
return err
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
Header: map[string]string{
"Authorization": token,
},
})
if err != nil {
return err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp respons2[CreateInferenceJobResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {
return nil
}

return fmt.Errorf("error: %s", codeResp.Message)
}

return fmt.Errorf("unknow response content type: %s", contType)
}

func (c *Client) CreateJob(req CreateAIJobReq, token string) (*CreateJobResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "schedule/createTask")
if err != nil {
return nil, err
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
Header: map[string]string{
"Authorization": token,
},
})
if err != nil {
return nil, err
@@ -169,14 +247,17 @@ type DataScheduleResults struct {
Results []DataScheduleResult `json:"results"`
}

func (c *Client) RunJob(req RunJobReq) error {
targetUrl, err := url.JoinPath(c.baseURL, "runTask")
func (c *Client) RunJob(req RunJobReq, token string) error {
targetUrl, err := url.JoinPath(c.baseURL, "schedule/runTask")
if err != nil {
return err
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
Header: map[string]string{
"Authorization": token,
},
})
if err != nil {
return err
@@ -206,7 +287,7 @@ type CancelJobReq struct {
}

func (c *Client) CancelJob(req CancelJobReq) error {
targetUrl, err := url.JoinPath(c.baseURL, "/queryResources")
targetUrl, err := url.JoinPath(c.baseURL, "schedule/queryResources")
if err != nil {
return err
}


+ 134
- 39
sdks/pcmscheduler/models.go View File

@@ -2,6 +2,7 @@ package sch

import (
"gitlink.org.cn/cloudream/common/pkgs/types"
"gitlink.org.cn/cloudream/common/sdks/hpc"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
@@ -26,6 +27,11 @@ const (
DATASET = "dataset"
IMAGE = "image"
MODEL = "model"
RESULT = "result"

PackageTypeNormal = "normal"
PackageTypeNull = "null"
Null = "null"

OrderByName = "name"
OrderBySize = "size"
@@ -48,8 +54,11 @@ const (
PreferencePriority = "preference"
SpecifyClusterPriority = "specify"

FailedStatus = "failed"
SuccessStatus = "success"
FailedStatus = "failed"
SuccessStatus = "success"
SucceededStatus = "succeeded"
UploadingStatus = "uploading"
RunningStatus = "running"

Query = "query"
Delete = "delete"
@@ -58,11 +67,36 @@ const (
ParentType = "parent"

PlatformSugon = "sugon"
PlatformOpenI = "openi"
PlatformModelArts = "modelarts"
PlatformOpenI = "OpenI"
PlatformModelArts = "ModelArts"

PlatformAI = "AI" // 智算
PlatformCloud = "CLOUD" // 云算
PlatformCloudInference = "PCM_Inference"
PlatformHPC = "HPCSlurm" //超算

URL = "url"
ID = "id"

Startup = "startup"

Schedule = "schedule"

BlockChainJobCreatePrefix = "job_create_"

Complete = "Complete"

NodeTypeBinding = "binding"
NodeTypeUpload = "upload"
NodeTypeDataReturn = "data_return"
NodeTypeHPCCreate = "hpc_create"
NodeTypeInference = "inference"
NodeTypeAICreate = "ai_job_create"
NodeTypeAIJobRun = "ai_job_run"
)

type TaskID int64
type DataID int64

type ClusterDetail struct {
// 集群ID
@@ -227,6 +261,47 @@ type ResourceRange struct {
Ids []string `json:"ids"`
}

type JobInfo struct {
TaskID string `json:"taskID"`
JobSubmitInfo JobSubmitInfo `json:"jobSubmitInfo"`
ResultFiles []ResultFile `json:"resultFiles"`
}

type ResultFile struct {
ClusterID schsdk.ClusterID `json:"clusterID"`
Objects []cdssdk.Object `json:"objects"`
}

type JobSubmitInfo interface {
Noop()
}

type JobSubmitInfoBase struct {
}

var JobSubmitInfoTypeUnion = types.NewTypeUnion[JobSubmitInfo](
(*PCMJobSubmitInfo)(nil),
(*HPCJobSubmitInfo)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&JobSubmitInfoTypeUnion, "type")

func (d *JobSubmitInfoBase) Noop() {}

type PCMJobSubmitInfo struct {
serder.Metadata `union:"pcm"`
JobSubmitInfoBase
Type string `json:"type"`
Info CreateAIJobReq `json:"info"`
}

type HPCJobSubmitInfo struct {
serder.Metadata `union:"hpc"`
JobSubmitInfoBase
Type string `json:"type"`
Info hpc.CreateHPCJobReq `json:"info"`
}

type Range struct {
Min float64 `json:"min"`
Max float64 `json:"max"`
@@ -275,10 +350,18 @@ type TaskMessage struct {
Message string `json:"message"`
}

type ReportMessage struct {
TaskName string `json:"taskName"`
TaskID string `json:"taskID"`
Status bool `json:"status"`
Message string `json:"message"`
ClusterID schsdk.ClusterID `json:"clusterID"`
Output string `json:"output"`
}

type UploadParams struct {
DataType string `json:"dataType"`
UploadInfo UploadInfo `json:"uploadInfo"`
//UploadPriority UploadPriority `json:"uploadPriority"`
}

type UploadInfo interface {
@@ -348,9 +431,10 @@ type QueryData struct {
UserID cdssdk.UserID `json:"userID" binding:"required"`
Path string `json:"path"`
PackageID cdssdk.PackageID `json:"packageID" binding:"required"`
CurrentPage int `json:"currentPage" binding:"required"`
PageSize int `json:"pageSize" binding:"required"`
CurrentPage int `json:"currentPage"`
PageSize int `json:"pageSize"`
OrderBy string `json:"orderBy" binding:"required"`
PackageName string `json:"packageName"`
}

type DataBinding interface {
@@ -366,46 +450,56 @@ var DataBindingTypeUnion = types.NewTypeUnion[DataBinding](

var _ = serder.UseTypeUnionInternallyTagged(&DataBindingTypeUnion, "type")

type DataBindingBase struct{}
type DataBindingBase struct {
RootPath string `json:"rootPath"`
}

func (d *DataBindingBase) Noop() {}

type DatasetBinding struct {
serder.Metadata `union:"dataset"`
DataBindingBase
Type string `json:"type"`
Name string `json:"name"`
ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
Description string `json:"description"`
Category string `json:"category"`
PackageID cdssdk.PackageID `json:"packageID"`
Type string `json:"type"`
Name string `json:"name"`
OperateType string `json:"operateType"`
ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
Description string `json:"description"`
Category string `json:"category"`
PackageID cdssdk.PackageID `json:"packageID"`
RepositoryName string `json:"repositoryName"`
ConsumptionPoints int64 `json:"points"`
}

type ModelBinding struct {
serder.Metadata `union:"model"`
DataBindingBase
Type string `json:"type"`
Name string `json:"name"`
ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
Description string `json:"description"`
Category string `json:"category"`
ModelType string `json:"modelType"`
Env string `json:"env"`
Version string `json:"version"`
PackageID cdssdk.PackageID `json:"packageID"`
Type string `json:"type"`
Name string `json:"name"`
OperateType string `json:"operateType"`
ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
Description string `json:"description"`
Category string `json:"category"`
ModelType string `json:"modelType"`
Env string `json:"env"`
Version string `json:"version"`
PackageID cdssdk.PackageID `json:"packageID"`
RepositoryName string `json:"repositoryName"`
}

type CodeBinding struct {
serder.Metadata `union:"code"`
DataBindingBase
Type string `json:"type"`
Name string `json:"name"`
ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
Description string `json:"description"`
ImageID int64 `json:"imageID"`
ObjectID cdssdk.ObjectID `json:"objectID"`
FilePath string `json:"filePath"`
PackageID cdssdk.PackageID `json:"packageID"`
Type string `json:"type"`
Name string `json:"name"`
OperateType string `json:"operateType"`
ClusterID schsdk.ClusterID `json:"clusterID"`
Description string `json:"description"`
ImageID schsdk.ImageID `json:"imageID"`
BootstrapObjectID cdssdk.ObjectID `json:"bootstrapObjectID"`
PackageID cdssdk.PackageID `json:"packageID"`
Output string `json:"output"`
// 当集群为openi的时候,需要传入分支
Branch string `json:"branch"`
}

//type ImageBinding struct {
@@ -424,12 +518,13 @@ type CodeBinding struct {
type ImageBinding struct {
serder.Metadata `union:"image"`
DataBindingBase
Type string `json:"type"`
ID int64 `json:"id"`
Name string `json:"name"`
IDType string `json:"idType"`
ImageID string `json:"imageID"`
ClusterID schsdk.ClusterID `json:"clusterID"`
Type string `json:"type"`
ID int64 `json:"id"`
OperateType string `json:"operateType"`
Name string `json:"name"`
IDType string `json:"idType"`
ImageID string `json:"imageID"`
ClusterID schsdk.ClusterID `json:"clusterID"`
}

type Image struct {
@@ -449,7 +544,7 @@ type ClusterImage struct {
}

func (ClusterImage) TableName() string {
return "clusterImage"
return "cluster_image"
}

type ClusterImageCard struct {
@@ -458,7 +553,7 @@ type ClusterImageCard struct {
}

func (ClusterImageCard) TableName() string {
return "clusterImageCard"
return "cluster_image_card"
}

type QueryBindingFilters struct {


+ 102
- 0
sdks/scheduler/jobflow.go View File

@@ -0,0 +1,102 @@
package schsdk

import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"time"
)

type FlowData struct {
Nodes []Node `json:"nodes"`
Edges []Edge `json:"edges"`
}

type Node struct {
ID string `json:"id"`
Type string `json:"type"`
X float64 `json:"x"`
Y float64 `json:"y"`
Properties JobInfo `json:"properties"`
Text *Text `json:"text,omitempty"` // 有些节点没有 text 字段
}

type Edge struct {
ID string `json:"id"`
Type string `json:"type"`
Properties interface{} `json:"properties"` // 为空对象 {}
SourceNodeID string `json:"sourceNodeId"`
TargetNodeID string `json:"targetNodeId"`
SourceAnchorID string `json:"sourceAnchorId"`
TargetAnchorID string `json:"targetAnchorId"`
StartPoint Point `json:"startPoint"`
EndPoint Point `json:"endPoint"`
PointsList []Point `json:"pointsList"`
Text *Text `json:"text,omitempty"` // 有些 edge 有文字标签
}

type Point struct {
X float64 `json:"x"`
Y float64 `json:"y"`
}

type Text struct {
X float64 `json:"x"`
Y float64 `json:"y"`
Value string `json:"value"`
}

type JobFlowDAO struct {
ID int64 `gorm:"column:id;primaryKey;autoIncrement" json:"id"`
UserID cdssdk.UserID `gorm:"column:user_id;not null" json:"userID"`
Name string `gorm:"column:name;size:255;not null" json:"name"`
Description string `gorm:"column:description;size:255" json:"description"`
Content string `gorm:"column:content;type:text" json:"content"`
Status string `gorm:"column:status;type:enum('pending','running','failed','success');default:'pending'" json:"status"`
JobType string `gorm:"column:job_type;size:255" json:"jobType"`
UpdatedAt time.Time `gorm:"column:update_at;autoUpdateTime" json:"updatedAt"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime" json:"createdAt"`
}

type JobFlow struct {
ID int64 `json:"id"`
UserID cdssdk.UserID `json:"userID"`
Name string `json:"name"`
Description string `json:"description"`
Content FlowData `json:"content"`
Status string `json:"status"`
JobType string `json:"jobType"`
UpdatedAt time.Time `json:"updatedAt"`
CreatedAt time.Time `json:"createdAt"`
}

type JobFlowRunStatus struct {
RunID JobSetID `gorm:"column:run_id;primaryKey" json:"runID"`
NodeType string `gorm:"column:node_type;size:100" json:"nodeType"`
NodeID string `gorm:"column:node_id;size:255" json:"nodeID"`
Status string `gorm:"column:status;type:enum('pending','running','fail','success')" json:"status"`
RunOutput string `gorm:"column:run_output;type:text" json:"runOutput"`
RunLog string `gorm:"column:run_log;type:text" json:"runLog"`
}

type JobFlowRunDAO struct {
ID JobSetID `gorm:"column:id;primaryKey;" json:"runID"`
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
Name string `gorm:"column:name;size:255;not null" json:"name"`
Description string `gorm:"column:description;size:255" json:"description"`
Content string `gorm:"column:content;type:text" json:"content"`
Status string `gorm:"column:status;type:enum('running','fail','success')" json:"status"`
Token string `gorm:"column:token;size:255" json:"token"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime" json:"createdAt"`
FinishAt *time.Time `gorm:"column:finish_at" json:"finishedAt"`
}

type JobFlowRun struct {
ID JobSetID `gorm:"column:id;primaryKey;" json:"runID"`
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
Name string `gorm:"column:name;size:255;not null" json:"name"`
Description string `gorm:"column:description;size:255" json:"description"`
Content FlowData `gorm:"column:content;type:text" json:"content"`
Status string `gorm:"column:status;type:enum('running','fail','success')" json:"status"`
Token string `gorm:"column:token;size:255" json:"token"`
CreatedAt time.Time `gorm:"column:created_at;autoCreateTime" json:"createdAt"`
FinishAt *time.Time `gorm:"column:finish_at" json:"finishedAt"`
}

+ 367
- 20
sdks/scheduler/models.go View File

@@ -8,10 +8,12 @@ import (

const (
JobTypeNormal = "Normal"
JobTypePCM = "PCM"
JobTypeResource = "Resource"
JobTypeInstance = "Instance"
JobTypeFinetuning = "Finetuning"
JobTypeDataPreprocess = "DataPreprocess"
JobTypeDataReturn = "DataReturn"

FileInfoTypePackage = "Package"
FileInfoTypeLocalFile = "LocalFile"
@@ -24,12 +26,18 @@ const (
MemoryUtilization = "MemoryUtilization"
GPUUtilization = "GPUUtilization"
CPUUtilization = "CPUUtilization"

MethodPost = "POST"
MethodGet = "GET"
CodeSuccess = 200
)

type JobID string

type JobSetID string

type DataID int64

type ImageID int64

// 计算中心ID
@@ -49,6 +57,9 @@ type JobSetInfo struct {

type JobInfo interface {
GetLocalJobID() string
GetTargetLocalJobIDs() []string
SetTargetLocalJob(info TargetJobInfo)
GetTargetInputParams(targetID string) map[string]string
}

var JobInfoTypeUnion = types.NewTypeUnion[JobInfo](
@@ -59,18 +70,59 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo](
(*UpdateMultiInstanceJobInfo)(nil),
(*FinetuningJobInfo)(nil),
(*DataPreprocessJobInfo)(nil),
(*PCMJobInfo)(nil),
(*AIJobInfo)(nil),
(*HPCJobInfo)(nil),
(*BindingJobInfo)(nil),
(*PCMInferenceJobInfo)(nil),
(*CompleteJobInfo)(nil),
(*StartJobInfo)(nil),
(*NotifyJobInfo)(nil),
(*StopInferenceJobInfo)(nil),
(*UploadJobInfo)(nil),
)
var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type")

type JobInfoBase struct {
LocalJobID string `json:"localJobID"`
LocalJobID string `json:"localJobID"`
TargetJob []TargetJobInfo `json:"targetJob"`
}

type TargetJobInfo struct {
TargetJobID string `json:"targetJobID"`
InputParams map[string]string `json:"inputParams"`
}

func (i *JobInfoBase) GetLocalJobID() string {
return i.LocalJobID
}

func (i *JobInfoBase) GetTargetInputParams(targetID string) map[string]string {
for _, v := range i.TargetJob {
if v.TargetJobID == targetID {
return v.InputParams
}
}
return nil
}

func (i *JobInfoBase) GetTargetLocalJobIDs() []string {
var IDs []string
for _, v := range i.TargetJob {
IDs = append(IDs, v.TargetJobID)
}
return IDs
}

func (i *JobInfoBase) SetTargetLocalJob(info TargetJobInfo) {
for _, target := range i.TargetJob {
// 已经存在,则不用再添加
if target.TargetJobID == info.TargetJobID {
return
}
}
i.TargetJob = append(i.TargetJob, info)
}

type NormalJobInfo struct {
serder.Metadata `union:"Normal"`
JobInfoBase
@@ -82,14 +134,190 @@ type NormalJobInfo struct {
ModelJobInfo ModelJobInfo `json:"modelJobInfo"`
}

type PCMJobInfo struct {
serder.Metadata `union:"PCM"`
type PCMInferenceJobInfo struct {
serder.Metadata `union:"PCM_Inference"`
JobInfoBase
Type string `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
Files JobFilesInfo `json:"files"`
JobResources JobResources `json:"jobResources"`
BindingID DataID `json:"bindingID"`
ResourceChoice ResourceChoice `json:"resourceChoice"`
}

type StopInferenceJobInfo struct {
serder.Metadata `union:"StopInference"`
JobInfoBase
Type string `json:"type"`
Url string `json:"url"`
}

type AIJobInfo struct {
serder.Metadata `union:"AI"`
JobInfoBase
Type string `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
Files JobFilesInfo `json:"files"`
JobResources JobResources `json:"jobResources"`
ResourceChoice ResourceChoice `json:"resourceChoice"`
}

type CompleteJobInfo struct {
serder.Metadata `union:"Complete"`
JobInfoBase
Type string `json:"type"`
}

type StartJobInfo struct {
serder.Metadata `union:"Start"`
JobInfoBase
Type string `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
Files JobFilesInfo `json:"files"`
JobResources JobResources `json:"jobResources"`
Type string `json:"type"`
}

type UploadJobInfo struct {
serder.Metadata `union:"Upload"`
JobInfoBase
Type string `json:"type"`
DataType string `json:"dataType"`
}

type NotifyJobInfo struct {
serder.Metadata `union:"Notify"`
JobInfoBase
Type string `json:"type"`
RequestType string `json:"requestType"`
Url string `json:"url"`
Body any `json:"body"`
Headers map[string]string `json:"headers"`
}

type ResourceChoice struct {
Type string `json:"type"`
ResourceScopes []ResourceScope `json:"resourceScopes"`
}

type ResourceScope struct {
Name string `json:"name"`
Min float64 `json:"min"`
Max float64 `json:"max"`
}

type BindingJobInfo struct {
serder.Metadata `union:"Binding"`
JobInfoBase
Type string `json:"type"`
Info DataBinding `json:"info"`
Name string `json:"name"` // 临时使用
}

type DataBinding interface {
Noop()
}

var DataBindingTypeUnion = types.NewTypeUnion[DataBinding](
(*ModelBinding)(nil),
(*DatasetBinding)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&DataBindingTypeUnion, "type")

type DataBindingBase struct{}

func (d *DataBindingBase) Noop() {}

type ModelBinding struct {
serder.Metadata `union:"model"`
DataBindingBase
Type string `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
ClusterIDs []ClusterID `json:"clusterIDs"`
Category string `json:"category"`
ModelType string `json:"modelType"`
Env string `json:"env"`
Version string `json:"version"`
RepositoryName string `json:"repositoryName"`
}

type DatasetBinding struct {
serder.Metadata `union:"dataset"`
DataBindingBase
Type string `json:"type"`
Name string `json:"name"`
OperateType string `json:"operateType"`
ClusterIDs []ClusterID `json:"clusterIDs"`
Description string `json:"description"`
Category string `json:"category"`
PackageID cdssdk.PackageID `json:"packageID"`
RepositoryName string `json:"repositoryName"`
ConsumptionPoints int64 `json:"points"`
}

type HPCJobInfo struct {
serder.Metadata `union:"HPC"`
JobInfoBase
Type string `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
ClusterID ClusterID `json:"clusterID"`
Backend string `json:"backend"`
App string `json:"app"`
OperateType string `json:"operateType"`
ScriptContent string `json:"scriptContent"`
Parameters HPCParameter `json:"parameters"`
}

type HPCParameter struct {
JobName string `json:"jobName"`
JobDir string `json:"jobDir"`
Partition string `json:"partition"`
Ntasks string `json:"ntasks"`
Nodes string `json:"nodes"`
BamFile string `json:"bamFile"`
HashType string `json:"hashType"`
AttackMode string `json:"attackMode"`
HashInput string `json:"hashInput"`
Mask string `json:"mask"`
Dictionary string `json:"dictionary"`
Dictionary2 string `json:"dictionary2"`
HPCBindingFiles []HPCBindingFile `json:"hpcBindingFiles"`
}

type HPCBindingFile struct {
ParamName string `json:"paramName"`
Resource HPCFile `json:"resource"`
}

type HPCFile interface {
Noop()
}

var HPCFileTypeUnion = types.NewTypeUnion[HPCFile](
(*HPCObject)(nil),
(*HPCPath)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&HPCFileTypeUnion, "type")

type HPCFileBase struct{}

func (d *HPCFileBase) Noop() {}

type HPCObject struct {
serder.Metadata `union:"object"`
HPCFileBase
Type string `json:"type"`
ObjectID cdssdk.ObjectID `json:"objectID"`
}

type HPCPath struct {
serder.Metadata `union:"path"`
HPCFileBase
Type string `json:"type"`
PackageID cdssdk.PackageID `json:"packageID"`
Path string `json:"path"`
}

type JobResources struct {
@@ -99,9 +327,11 @@ type JobResources struct {
}

type ClusterInfo struct {
ClusterID ClusterID `json:"clusterID"`
Resources []JobResource `json:"resources"`
Runtime PCMJobRuntimeInfo `json:"runtime"`
ClusterID ClusterID `json:"clusterID"`
Resources []JobResource `json:"resources"`
//Files JobFilesInfo `json:"files"`
Code JobFileInfo `json:"code"`
Runtime PCMJobRuntimeInfo `json:"runtime"`
}

type PCMJobRuntimeInfo struct {
@@ -115,7 +345,7 @@ type PCMJobRuntimeInfo struct {
//}

type JobResource interface {
Noop2()
Noop()
}

var JobResourceTypeUnion = types.NewTypeUnion[JobResource](
@@ -124,20 +354,30 @@ var JobResourceTypeUnion = types.NewTypeUnion[JobResource](
(*NPU)(nil),
(*MLU)(nil),
(*DCU)(nil),
(*Memory)(nil),
(*MEMORY)(nil),
(*PRICE)(nil),
(*STORAGE)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&JobResourceTypeUnion, "type")

type JobResourceBase struct{}

func (d *JobResourceBase) Noop2() {}
func (d *JobResourceBase) Noop() {}

type CPU struct {
serder.Metadata `union:"CPU"`
JobResourceBase
Type string `json:"type"`
Name string `json:"name"`
Number int64 `json:"number"`
}

type STORAGE struct {
serder.Metadata `union:"STORAGE"`
JobResourceBase
Type string `json:"type"`
Name string `json:"name"`
Number int64 `json:"number"`
}

@@ -145,6 +385,7 @@ type GPU struct {
serder.Metadata `union:"GPU"`
JobResourceBase
Type string `json:"type"`
Name string `json:"name"`
Number int64 `json:"number"`
}

@@ -152,13 +393,15 @@ type NPU struct {
serder.Metadata `union:"NPU"`
JobResourceBase
Type string `json:"type"`
Name string `json:"name"`
Number int64 `json:"number"`
}

type Memory struct {
serder.Metadata `union:"Memory"`
type MEMORY struct {
serder.Metadata `union:"MEMORY"`
JobResourceBase
Type string `json:"type"`
Name string `json:"name"`
Number int64 `json:"number"`
}

@@ -166,6 +409,7 @@ type DCU struct {
serder.Metadata `union:"DCU"`
JobResourceBase
Type string `json:"type"`
Name string `json:"name"`
Number int64 `json:"number"`
}

@@ -173,6 +417,7 @@ type MLU struct {
serder.Metadata `union:"MLU"`
JobResourceBase
Type string `json:"type"`
Name string `json:"name"`
Number int64 `json:"number"`
}

@@ -180,6 +425,7 @@ type PRICE struct {
serder.Metadata `union:"PRICE"`
JobResourceBase
Type string `json:"type"`
Name string `json:"name"`
Number int64 `json:"number"`
}

@@ -209,9 +455,10 @@ type DataPreprocessJobInfo struct {
type DataReturnJobInfo struct {
serder.Metadata `union:"DataReturn"`
JobInfoBase
Type string `json:"type"`
BucketID cdssdk.BucketID `json:"bucketID"`
TargetLocalJobID string `json:"targetLocalJobID"`
Type string `json:"type"`
BucketID cdssdk.BucketID `json:"bucketID"`
TargetLocalJobID string `json:"targetLocalJobID"`
ReportMessage TrainJobStatusReport `json:"report"`
}

// MultiInstanceJobInfo 多实例(推理任务)
@@ -286,6 +533,8 @@ type BindingJobFileInfo struct {
JobFileInfoBase
Type string `json:"type"`
BindingID int64 `json:"bindingID"`
// 用于参数回显
BindingName string `json:"bindingName"`
}

type PackageJobFileInfo struct {
@@ -314,6 +563,8 @@ type ImageJobFileInfo struct {
JobFileInfoBase
Type string `json:"type"`
ImageID ImageID `json:"imageID"`
// 用于参数回显
ImageName string `json:"imageName"`
}

type JobRuntimeInfo struct {
@@ -422,3 +673,99 @@ type InferencePlatform struct {
SimilarityThreshold string `json:"similarityThreshold"`
EntriesPerFile string `json:"entriesPerFile"`
}

type JobOutput interface {
Output2()
}

var JobOutputTypeUnion = types.NewTypeUnion[JobOutput](
(*AIJobOutput)(nil),
(*BindingJobOutput)(nil),
(*UploadJobOutput)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&JobOutputTypeUnion, "type")

type JobOutputBase struct{}

func (d *JobOutputBase) Output2() {}

type PublicOutput struct {
serder.Metadata `union:"object"`
JobOutputBase
Type string `json:"type"`
}

type HPCOutput struct {
serder.Metadata `union:"HPCSlurm"`
JobOutputBase
Type string `json:"type"`
Output string `json:"output"`
}

type AIJobOutput struct {
serder.Metadata `union:"AI"`
JobOutputBase
Type string `json:"type"`
Output string `json:"output"`
}

type BindingJobOutput struct {
serder.Metadata `union:"binding"`
JobOutputBase
Type string `json:"type"`
BindingID DataID `json:"bindingID"`
}

type UploadJobOutput struct {
serder.Metadata `union:"upload"`
JobOutputBase
Type string `json:"type"`
PackageID cdssdk.PackageID `json:"packageID"`
}

type DataReturnJobOutput struct {
serder.Metadata `union:"DataReturn"`
JobOutputBase
Type string `json:"type"`
ReportMessage TrainJobStatusReport `json:"report"`
PackageID cdssdk.PackageID `json:"packageID"`
}

type JobStatusReport interface {
Report()
}

var JobStatusReportTypeUnion = types.NewTypeUnion[JobStatusReport](
(*TrainJobStatusReport)(nil),
(*InferenceJobStatusReport)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&JobStatusReportTypeUnion, "type")

type JobStatusReportBase struct{}

func (d *JobStatusReportBase) Report() {}

type TrainJobStatusReport struct {
serder.Metadata `union:"Train"`
JobStatusReportBase
Type string `json:"type"`
TaskName string `json:"taskName"`
TaskID string `json:"taskID"`
Status bool `json:"status"`
Message string `json:"message"`
ClusterID ClusterID `json:"clusterID"`
Output string `json:"output"`
}

type InferenceJobStatusReport struct {
serder.Metadata `union:"Inference"`
JobStatusReportBase
Type string `json:"type"`
TaskName string `json:"taskName"`
TaskID string `json:"taskID"`
Status bool `json:"status"`
Message string `json:"message"`
URL string `json:"url"`
}

+ 58
- 52
sdks/uploader/models.go View File

@@ -21,18 +21,28 @@ type BlockChain struct {
}

func (BlockChain) TableName() string {
return "BlockChain" // 确保和数据库中的表名一致
return "block_chain" // 确保和数据库中的表名一致
}

type Binding struct {
ID DataID `gorm:"column:id;primaryKey;autoIncrement" json:"ID"`
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
Name string `gorm:"column:name" json:"Name"`
DataType string `gorm:"column:data_type" json:"dataType"`
//JsonData string `gorm:"column:json_data" json:"jsonData"`
Content string `gorm:"column:content" json:"Content"`
AccessLevel string `gorm:"column:access_level" json:"accessLevel"`
CreateTime time.Time `gorm:"column:created_at" json:"createTime"`
ID DataID `gorm:"column:id;primaryKey;autoIncrement" json:"ID"`
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
Name string `gorm:"column:name" json:"Name"`
DataType string `gorm:"column:data_type" json:"dataType"`
Content string `gorm:"column:content" json:"Content"`
AccessLevel string `gorm:"column:access_level" json:"accessLevel"`
CreateTime time.Time `gorm:"column:created_at" json:"createTime"`
}

type BindingDAO struct {
ID DataID `gorm:"column:id;primaryKey;autoIncrement" json:"ID"`
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
Name string `gorm:"column:name" json:"Name"`
DataType string `gorm:"column:data_type" json:"dataType"`
Content string `gorm:"column:content" json:"Content"`
AccessLevel string `gorm:"column:access_level" json:"accessLevel"`
CreateTime time.Time `gorm:"column:created_at" json:"createTime"`
BindingCluster []BindingCluster `gorm:"foreignKey:binding_id;references:id" json:"bindingCluster"`
}

type BindingAccessData struct {
@@ -56,23 +66,28 @@ type BindingDetail struct {
SSOId string `json:"ssoID"`
Name string `json:"Name"`
Info sch.DataBinding `json:"info"`
Packages []Package `json:"packages"`
Package Package `json:"package"`
Status string `json:"status"`
AccessLevel string `json:"accessLevel"`
CreateTime time.Time `json:"createTime"`
}

func (Binding) TableName() string {
return "BindingData" // 确保和数据库中的表名一致
return "bindings" // 确保和数据库中的表名一致
}

type BindingCluster struct {
BindingID DataID `gorm:"column:binding_id" json:"bindingID"`
ClusterID ClusterID `gorm:"column:cluster_id" json:"clusterID"`
Status string `gorm:"column:status" json:"status"`
Param string `gorm:"column:param" json:"Param"`
JsonData string `gorm:"column:json_data" json:"jsonData"`
}

func (BindingCluster) TableName() string {
return "binding_cluster" // 确保和数据库中的表名一致
}

type Folder struct {
PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"`
Path string `gorm:"column:path_name" json:"path"`
@@ -93,23 +108,23 @@ type Cluster struct {
}

func (Cluster) TableName() string {
return "uploadedCluster" // 确保和数据库中的表名一致
return "uploaded_cluster" // 确保和数据库中的表名一致
}

type Package struct {
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"`
PackageName string `gorm:"column:package_name" json:"packageName"`
BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"`
DataType string `gorm:"column:data_type" json:"dataType"`
JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段
BindingID DataID `gorm:"column:binding_id" json:"bindingID"`
CreateTime time.Time `gorm:"column:create_time" json:"createTime"`
Objects []cdssdk.Object `gorm:"column:objects" json:"objects"`
UploadedCluster []Cluster `gorm:"column:uploadedCluster" json:"uploadedCluster"`
Versions []PackageCloneDAO `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"`
//BlockChain []BlockChain `gorm:"foreignKey:package_id;references:package_id" json:"blockChains"` // 关联 BlockChain 数据
UploadPriority sch.UploadPriority `gorm:"column:upload_priority" json:"uploadPriority"`
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"`
PackageName string `gorm:"column:package_name" json:"packageName"`
BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"`
DataType string `gorm:"column:data_type" json:"dataType"`
BindingID DataID `gorm:"column:binding_id" json:"bindingID"`
CreateTime time.Time `gorm:"column:create_time" json:"createTime"`
Objects []cdssdk.Object `gorm:"column:objects" json:"objects"`
UploadedCluster []Cluster `gorm:"column:uploadedCluster" json:"uploadedCluster"`
Versions []PackageCloneDAO `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"`
UploadPriority sch.UploadPriority `gorm:"column:upload_priority" json:"uploadPriority"`
BindingInfo sch.DataBinding `gorm:"column:binding_info" json:"bindingInfo"`
PackageType string `gorm:"column:package_type" json:"packageType"`
}

type PackageDAO struct {
@@ -118,12 +133,13 @@ type PackageDAO struct {
PackageName string `gorm:"column:package_name" json:"packageName"`
BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"`
DataType string `gorm:"column:data_type" json:"dataType"`
JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段
BindingID DataID `gorm:"column:binding_id" json:"bindingID"`
CreateTime time.Time `gorm:"column:create_time" json:"createTime"`
UploadedCluster []Cluster `gorm:"foreignKey:package_id;references:package_id" json:"clusters"` // 关联 Cluster 数据
Versions []PackageCloneDAO `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"`
UploadPriority string `gorm:"column:upload_priority" json:"uploadPriority"`
Param string `gorm:"column:param" json:"param"`
PackageType string `gorm:"column:package_type" json:"packageType"`
}

type PackageCloneDAO struct {
@@ -134,25 +150,25 @@ type PackageCloneDAO struct {
Description string `gorm:"column:description" json:"description"`
BootstrapObjectID cdssdk.ObjectID `gorm:"column:bootstrap_object_id" json:"bootstrapObjectID"`
ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"`
ParentImageID schsdk.ImageID `gorm:"column:parent_image_id" json:"parentImageID"`
ImageID string `gorm:"column:image_id" json:"imageID"`
CreateTime time.Time `gorm:"column:created_at" json:"createTime"`
//ParentImageID schsdk.ImageID `gorm:"column:parent_image_id" json:"parentImageID"`
ImageID schsdk.ImageID `gorm:"column:image_id" json:"imageID"`
BindingID DataID `gorm:"column:binding_id" json:"bindingID"`
CreateTime time.Time `gorm:"column:created_at" json:"createTime"`
}

func (PackageCloneDAO) TableName() string {
return "packageClone" // 确保和数据库中的表名一致
return "package_clone" // 确保和数据库中的表名一致
}

type PackageCloneParam struct {
PackageID cdssdk.PackageID `json:"packageID" binding:"required"`
PackageName string `json:"packageName" binding:"required"`
//BucketID cdssdk.BucketID `json:"bucketID" binding:"required"`
PackageID cdssdk.PackageID `json:"packageID" binding:"required"`
PackageName string `json:"packageName" binding:"required"`
Name string `json:"name"`
Description string `json:"description"`
BootstrapObjectID cdssdk.ObjectID `json:"bootstrapObjectID"`
ClusterID schsdk.ClusterID `json:"clusterID"`
ParentImageID schsdk.ImageID `json:"parentImageID"`
ImageID string `json:"imageID"`
Output string `json:"output"`
ImageID schsdk.ImageID `json:"imageID"`
}

type PackageCloneVO struct {
@@ -163,10 +179,11 @@ type PackageCloneVO struct {
Description string `gorm:"column:description" json:"description"`
BootstrapObjectID cdssdk.ObjectID `gorm:"column:bootstrap_object_id" json:"bootstrapObjectID"`
ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"`
ParentImageID schsdk.ImageID `gorm:"column:parent_image_id" json:"parentImageID"`
ImageID string `gorm:"column:image_id" json:"imageID"`
CreateTime time.Time `gorm:"column:created_at" json:"createTime"`
ClusterMapping ClusterMapping `gorm:"foreignKey:cluster_id;references:cluster_id" json:"cluster"`
//ParentImageID schsdk.ImageID `gorm:"column:parent_image_id" json:"parentImageID"`
ImageID string `gorm:"column:image_id" json:"imageID"`
BindingID DataID `gorm:"column:binding_id" json:"bindingID"`
CreateTime time.Time `gorm:"column:created_at" json:"createTime"`
ClusterMapping ClusterMapping `gorm:"foreignKey:cluster_id;references:cluster_id" json:"cluster"`
}

type ClusterMapping struct {
@@ -178,19 +195,8 @@ type ClusterMapping struct {
}

func (ClusterMapping) TableName() string {
return "ClusterMapping"
}

//type PackageCloneClusterDAO struct {
// ID DataID `gorm:"column:id" json:"ID"`
// ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"`
// ClusterName string `gorm:"column:cluster_name" json:"clusterName"`
//}
//
//type PackageCloneCluster struct {
// ClusterID schsdk.ClusterID `json:"clusterID"`
// ClusterName string `json:"clusterName"`
//}
return "cluster_mapping"
}

type ScheduleTarget interface {
Noop()


+ 7
- 139
sdks/uploader/uploader.go View File

@@ -2,153 +2,21 @@ package uploadersdk

import (
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/types"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder"
"mime/multipart"
"net/url"
"strings"
)

type DataScheduleReq struct {
PackageID cdssdk.PackageID `json:"packageID"`
DataType string `json:"dataType"`
ScheduleTarget ScheduleTarget `json:"scheduleTarget"`
type ObjectUploadReq struct {
Info cdsapi.ObjectUploadInfo `form:"info" binding:"required"`
Files []*multipart.FileHeader `form:"files"`
}

//type DataScheduleResp struct {
// Results []sch.DataScheduleResult `json:"data"`
//}

type TmpDataScheduleResult struct {
Cluster sch.DataDetail `json:"cluster"`
PackageID cdssdk.PackageID `json:"packageID"`
Status bool `json:"status"`
Msg string `json:"msg"`
}

func (c *Client) DataSchedule(req DataScheduleReq) ([]sch.DataScheduleResult, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/dataSchedule")
if err != nil {
return nil, err
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}
println(resp.Body)

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[[]TmpDataScheduleResult]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {
var results []sch.DataScheduleResult
for _, tmpResult := range codeResp.Data {
result := sch.DataScheduleResult{
PackageID: tmpResult.PackageID,
Status: tmpResult.Status,
Msg: tmpResult.Msg,
Clusters: []sch.DataDetail{
tmpResult.Cluster,
},
}
results = append(results, result)
}
return results, nil
}

return nil, codeResp.ToError()
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
}

type UploadReq struct {
DataType string `json:"dataType"`
Source UploadSource `json:"source"`
Target UploadTarget `json:"target"`
//StorageIDs []cdssdk.StorageID `json:"storageIDs"`
}

type UploadSource interface {
Noop()
}

var UploadSourceTypeUnion = types.NewTypeUnion[UploadSource](
(*PackageSource)(nil),
(*UrlSource)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&UploadSourceTypeUnion, "type")

type PackageSource struct {
serder.Metadata `union:"jcs"`
UploadSourceBase
Type string `json:"type"`
PackageID cdssdk.PackageID `json:"packageID"`
}

type UrlSource struct {
serder.Metadata `union:"url"`
UploadSourceBase
Type string `json:"type"`
Url string `json:"url"`
}

type UploadSourceBase struct{}

func (d *UploadSourceBase) Noop() {}

type UploadTarget interface {
Noop()
}

var UploadTargetTypeUnion = types.NewTypeUnion[UploadTarget](
(*UrlTarget)(nil),
(*ApiTarget)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&UploadTargetTypeUnion, "type")

type UrlTarget struct {
serder.Metadata `union:"url"`
UploadTargetBase
Type string `json:"type"`
ClusterID ClusterID `json:"clusterId"`
JCSUploadInfo cdsapi.ObjectUploadInfo `form:"JCSUploadInfo"`
}

type ApiTarget struct {
serder.Metadata `union:"api"`
UploadTargetBase
Type string `json:"type"`
Clusters []ClusterID `json:"clusters"`
}

type UploadTargetBase struct{}

func (d *UploadTargetBase) Noop() {}

type UploadResp struct {
PackageID cdssdk.PackageID `json:"packageID"`
ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
ClusterID ClusterID `json:"clusterID"`
JsonData string `json:"jsonData"`
Status bool `json:"status"`
Message string `json:"message"`
}

func (c *Client) Upload(req UploadReq) (*UploadResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/dataUpload")
func (c *Client) Upload(req ObjectUploadReq) (*cdsapi.ObjectUploadResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/object/upload")
if err != nil {
return nil, err
}
@@ -162,7 +30,7 @@ func (c *Client) Upload(req UploadReq) (*UploadResp, error) {

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[UploadResp]
var codeResp response[cdsapi.ObjectUploadResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}


+ 21
- 0
utils/http2/http.go View File

@@ -119,6 +119,27 @@ func PostJSON(url string, param RequestParam) (*http.Response, error) {
return defaultClient.Do(req)
}

func PutJSON(url string, param RequestParam) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPut, url, nil)
if err != nil {
return nil, err
}

if err = prepareQuery(req, param.Query); err != nil {
return nil, err
}

if err = prepareHeader(req, param.Header); err != nil {
return nil, err
}

if err = prepareJSONBody(req, param.Body); err != nil {
return nil, err
}

return defaultClient.Do(req)
}

func PostForm(url string, param RequestParam) (*http.Response, error) {
req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {


+ 29
- 0
utils/reflect2/assign.go View File

@@ -0,0 +1,29 @@
package reflect2

import "reflect"

// MergeNonZero 将 src 中非零值字段复制到 dst 中
func MergeNonZero(dst, src any) {
dstVal := reflect.ValueOf(dst).Elem()
srcVal := reflect.ValueOf(src).Elem()

for i := 0; i < dstVal.NumField(); i++ {
dstField := dstVal.Field(i)
srcField := srcVal.Field(i)

// 如果 src 字段是零值,则跳过
if isZero(srcField) {
continue
}

// 如果 dst 字段可设置,才设置
if dstField.CanSet() {
dstField.Set(srcField)
}
}
}

func isZero(v reflect.Value) bool {
zero := reflect.Zero(v.Type())
return reflect.DeepEqual(v.Interface(), zero.Interface())
}

Loading…
Cancel
Save