diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 2073cf4..8262d75 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -78,6 +78,7 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( (*StartJobInfo)(nil), (*NotifyJobInfo)(nil), (*StopInferenceJobInfo)(nil), + (*UploadJobInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") @@ -175,6 +176,13 @@ type StartJobInfo struct { Type string `json:"type"` } +type UploadJobInfo struct { + serder.Metadata `union:"Upload"` + JobInfoBase + Type string `json:"type"` + DataType string `json:"dataType"` +} + type NotifyJobInfo struct { serder.Metadata `union:"Notify"` JobInfoBase @@ -210,6 +218,7 @@ type DataBinding interface { var DataBindingTypeUnion = types.NewTypeUnion[DataBinding]( (*ModelBinding)(nil), + (*DatasetBinding)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&DataBindingTypeUnion, "type") @@ -232,6 +241,20 @@ type ModelBinding struct { RepositoryName string `json:"repositoryName"` } +type DatasetBinding struct { + serder.Metadata `union:"dataset"` + DataBindingBase + Type string `json:"type"` + Name string `json:"name"` + OperateType string `json:"operateType"` + ClusterIDs []ClusterID `json:"clusterIDs"` + Description string `json:"description"` + Category string `json:"category"` + PackageID cdssdk.PackageID `json:"packageID"` + RepositoryName string `json:"repositoryName"` + ConsumptionPoints int64 `json:"points"` +} + type HPCJobInfo struct { serder.Metadata `union:"HPC"` JobInfoBase @@ -658,6 +681,7 @@ type JobOutput interface { var JobOutputTypeUnion = types.NewTypeUnion[JobOutput]( (*AIJobOutput)(nil), (*BindingJobOutput)(nil), + (*UploadJobOutput)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobOutputTypeUnion, "type") @@ -693,6 +717,13 @@ type BindingJobOutput struct { BindingID DataID `json:"bindingID"` } +type UploadJobOutput struct { + serder.Metadata `union:"upload"` + JobOutputBase + Type string `json:"type"` + PackageID cdssdk.PackageID `json:"packageID"` +} + type DataReturnJobOutput struct { serder.Metadata `union:"DataReturn"` JobOutputBase diff --git a/sdks/uploader/uploader.go b/sdks/uploader/uploader.go index 54f17df..6cae186 100644 --- a/sdks/uploader/uploader.go +++ b/sdks/uploader/uploader.go @@ -2,152 +2,21 @@ package uploadersdk import ( "fmt" - "gitlink.org.cn/cloudream/common/pkgs/types" - cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" "gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/serder" + "mime/multipart" "net/url" "strings" ) -//type DataScheduleReq struct { -// PackageID cdssdk.PackageID `json:"packageID"` -// DataType string `json:"dataType"` -// ScheduleTarget ScheduleTarget `json:"scheduleTarget"` -//} - -//type DataScheduleResp struct { -// Results []sch.DataScheduleResult `json:"data"` -//} - -//type TmpDataScheduleResult struct { -// Cluster sch.DataDetail `json:"cluster"` -// PackageID cdssdk.PackageID `json:"packageID"` -// Status bool `json:"status"` -// Msg string `json:"msg"` -//} - -//func (c *Client) DataSchedule(req DataScheduleReq) ([]sch.DataScheduleResult, error) { -// targetUrl, err := url.JoinPath(c.baseURL, "/dataSchedule") -// if err != nil { -// return nil, err -// } -// -// resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ -// Body: req, -// }) -// if err != nil { -// return nil, err -// } -// println(resp.Body) -// -// contType := resp.Header.Get("Content-Type") -// if strings.Contains(contType, http2.ContentTypeJSON) { -// var codeResp response[[]TmpDataScheduleResult] -// if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { -// return nil, fmt.Errorf("parsing response: %w", err) -// } -// -// if codeResp.Code == ResponseCodeOK { -// var results []sch.DataScheduleResult -// for _, tmpResult := range codeResp.Data { -// result := sch.DataScheduleResult{ -// PackageID: tmpResult.PackageID, -// Status: tmpResult.Status, -// Msg: tmpResult.Msg, -// Clusters: []sch.DataDetail{ -// tmpResult.Cluster, -// }, -// } -// results = append(results, result) -// } -// return results, nil -// } -// -// return nil, codeResp.ToError() -// } -// -// return nil, fmt.Errorf("unknow response content type: %s", contType) -//} - -type UploadReq struct { - DataType string `json:"dataType"` - Source UploadSource `json:"source"` - Target UploadTarget `json:"target"` - //StorageIDs []cdssdk.StorageID `json:"storageIDs"` -} - -type UploadSource interface { - Noop() -} - -var UploadSourceTypeUnion = types.NewTypeUnion[UploadSource]( - (*PackageSource)(nil), - (*UrlSource)(nil), -) - -var _ = serder.UseTypeUnionInternallyTagged(&UploadSourceTypeUnion, "type") - -type PackageSource struct { - serder.Metadata `union:"jcs"` - UploadSourceBase - Type string `json:"type"` - PackageID cdssdk.PackageID `json:"packageID"` -} - -type UrlSource struct { - serder.Metadata `union:"url"` - UploadSourceBase - Type string `json:"type"` - Url string `json:"url"` -} - -type UploadSourceBase struct{} - -func (d *UploadSourceBase) Noop() {} - -type UploadTarget interface { - Noop() -} - -var UploadTargetTypeUnion = types.NewTypeUnion[UploadTarget]( - (*UrlTarget)(nil), - (*ApiTarget)(nil), -) - -var _ = serder.UseTypeUnionInternallyTagged(&UploadTargetTypeUnion, "type") - -type UrlTarget struct { - serder.Metadata `union:"url"` - UploadTargetBase - Type string `json:"type"` - ClusterID ClusterID `json:"clusterId"` - JCSUploadInfo cdsapi.ObjectUploadInfo `form:"JCSUploadInfo"` -} - -type ApiTarget struct { - serder.Metadata `union:"api"` - UploadTargetBase - Type string `json:"type"` - Clusters []ClusterID `json:"clusters"` -} - -type UploadTargetBase struct{} - -func (d *UploadTargetBase) Noop() {} - -type UploadResp struct { - PackageID cdssdk.PackageID `json:"packageID"` - ObjectIDs []cdssdk.ObjectID `json:"objectIDs"` - ClusterID ClusterID `json:"clusterID"` - JsonData string `json:"jsonData"` - Status bool `json:"status"` - Message string `json:"message"` +type ObjectUploadReq struct { + Info cdsapi.ObjectUploadInfo `form:"info" binding:"required"` + Files []*multipart.FileHeader `form:"files"` } -func (c *Client) Upload(req UploadReq) (*UploadResp, error) { - targetUrl, err := url.JoinPath(c.baseURL, "/dataUpload") +func (c *Client) Upload(req ObjectUploadReq) (*cdsapi.ObjectUploadResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/object/upload") if err != nil { return nil, err } @@ -161,7 +30,7 @@ func (c *Client) Upload(req UploadReq) (*UploadResp, error) { contType := resp.Header.Get("Content-Type") if strings.Contains(contType, http2.ContentTypeJSON) { - var codeResp response[UploadResp] + var codeResp response[cdsapi.ObjectUploadResp] if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { return nil, fmt.Errorf("parsing response: %w", err) } diff --git a/utils/reflect2/assign.go b/utils/reflect2/assign.go new file mode 100644 index 0000000..2e8d49d --- /dev/null +++ b/utils/reflect2/assign.go @@ -0,0 +1,29 @@ +package reflect2 + +import "reflect" + +// MergeNonZero 将 src 中非零值字段复制到 dst 中 +func MergeNonZero(dst, src any) { + dstVal := reflect.ValueOf(dst).Elem() + srcVal := reflect.ValueOf(src).Elem() + + for i := 0; i < dstVal.NumField(); i++ { + dstField := dstVal.Field(i) + srcField := srcVal.Field(i) + + // 如果 src 字段是零值,则跳过 + if isZero(srcField) { + continue + } + + // 如果 dst 字段可设置,才设置 + if dstField.CanSet() { + dstField.Set(srcField) + } + } +} + +func isZero(v reflect.Value) bool { + zero := reflect.Zero(v.Type()) + return reflect.DeepEqual(v.Interface(), zero.Interface()) +}