Browse Source

新增数据上传节点

pull/52/head
JeshuaRen 6 months ago
parent
commit
921a874200
3 changed files with 67 additions and 138 deletions
  1. +31
    -0
      sdks/scheduler/models.go
  2. +7
    -138
      sdks/uploader/uploader.go
  3. +29
    -0
      utils/reflect2/assign.go

+ 31
- 0
sdks/scheduler/models.go View File

@@ -78,6 +78,7 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo](
(*StartJobInfo)(nil), (*StartJobInfo)(nil),
(*NotifyJobInfo)(nil), (*NotifyJobInfo)(nil),
(*StopInferenceJobInfo)(nil), (*StopInferenceJobInfo)(nil),
(*UploadJobInfo)(nil),
) )
var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type")


@@ -175,6 +176,13 @@ type StartJobInfo struct {
Type string `json:"type"` Type string `json:"type"`
} }


type UploadJobInfo struct {
serder.Metadata `union:"Upload"`
JobInfoBase
Type string `json:"type"`
DataType string `json:"dataType"`
}

type NotifyJobInfo struct { type NotifyJobInfo struct {
serder.Metadata `union:"Notify"` serder.Metadata `union:"Notify"`
JobInfoBase JobInfoBase
@@ -210,6 +218,7 @@ type DataBinding interface {


var DataBindingTypeUnion = types.NewTypeUnion[DataBinding]( var DataBindingTypeUnion = types.NewTypeUnion[DataBinding](
(*ModelBinding)(nil), (*ModelBinding)(nil),
(*DatasetBinding)(nil),
) )


var _ = serder.UseTypeUnionInternallyTagged(&DataBindingTypeUnion, "type") var _ = serder.UseTypeUnionInternallyTagged(&DataBindingTypeUnion, "type")
@@ -232,6 +241,20 @@ type ModelBinding struct {
RepositoryName string `json:"repositoryName"` RepositoryName string `json:"repositoryName"`
} }


type DatasetBinding struct {
serder.Metadata `union:"dataset"`
DataBindingBase
Type string `json:"type"`
Name string `json:"name"`
OperateType string `json:"operateType"`
ClusterIDs []ClusterID `json:"clusterIDs"`
Description string `json:"description"`
Category string `json:"category"`
PackageID cdssdk.PackageID `json:"packageID"`
RepositoryName string `json:"repositoryName"`
ConsumptionPoints int64 `json:"points"`
}

type HPCJobInfo struct { type HPCJobInfo struct {
serder.Metadata `union:"HPC"` serder.Metadata `union:"HPC"`
JobInfoBase JobInfoBase
@@ -658,6 +681,7 @@ type JobOutput interface {
var JobOutputTypeUnion = types.NewTypeUnion[JobOutput]( var JobOutputTypeUnion = types.NewTypeUnion[JobOutput](
(*AIJobOutput)(nil), (*AIJobOutput)(nil),
(*BindingJobOutput)(nil), (*BindingJobOutput)(nil),
(*UploadJobOutput)(nil),
) )


var _ = serder.UseTypeUnionInternallyTagged(&JobOutputTypeUnion, "type") var _ = serder.UseTypeUnionInternallyTagged(&JobOutputTypeUnion, "type")
@@ -693,6 +717,13 @@ type BindingJobOutput struct {
BindingID DataID `json:"bindingID"` BindingID DataID `json:"bindingID"`
} }


type UploadJobOutput struct {
serder.Metadata `union:"upload"`
JobOutputBase
Type string `json:"type"`
PackageID cdssdk.PackageID `json:"packageID"`
}

type DataReturnJobOutput struct { type DataReturnJobOutput struct {
serder.Metadata `union:"DataReturn"` serder.Metadata `union:"DataReturn"`
JobOutputBase JobOutputBase


+ 7
- 138
sdks/uploader/uploader.go View File

@@ -2,152 +2,21 @@ package uploadersdk


import ( import (
"fmt" "fmt"
"gitlink.org.cn/cloudream/common/pkgs/types"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
"gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
"mime/multipart"
"net/url" "net/url"
"strings" "strings"
) )


//type DataScheduleReq struct {
// PackageID cdssdk.PackageID `json:"packageID"`
// DataType string `json:"dataType"`
// ScheduleTarget ScheduleTarget `json:"scheduleTarget"`
//}

//type DataScheduleResp struct {
// Results []sch.DataScheduleResult `json:"data"`
//}

//type TmpDataScheduleResult struct {
// Cluster sch.DataDetail `json:"cluster"`
// PackageID cdssdk.PackageID `json:"packageID"`
// Status bool `json:"status"`
// Msg string `json:"msg"`
//}

//func (c *Client) DataSchedule(req DataScheduleReq) ([]sch.DataScheduleResult, error) {
// targetUrl, err := url.JoinPath(c.baseURL, "/dataSchedule")
// if err != nil {
// return nil, err
// }
//
// resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
// Body: req,
// })
// if err != nil {
// return nil, err
// }
// println(resp.Body)
//
// contType := resp.Header.Get("Content-Type")
// if strings.Contains(contType, http2.ContentTypeJSON) {
// var codeResp response[[]TmpDataScheduleResult]
// if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
// return nil, fmt.Errorf("parsing response: %w", err)
// }
//
// if codeResp.Code == ResponseCodeOK {
// var results []sch.DataScheduleResult
// for _, tmpResult := range codeResp.Data {
// result := sch.DataScheduleResult{
// PackageID: tmpResult.PackageID,
// Status: tmpResult.Status,
// Msg: tmpResult.Msg,
// Clusters: []sch.DataDetail{
// tmpResult.Cluster,
// },
// }
// results = append(results, result)
// }
// return results, nil
// }
//
// return nil, codeResp.ToError()
// }
//
// return nil, fmt.Errorf("unknow response content type: %s", contType)
//}

type UploadReq struct {
DataType string `json:"dataType"`
Source UploadSource `json:"source"`
Target UploadTarget `json:"target"`
//StorageIDs []cdssdk.StorageID `json:"storageIDs"`
}

type UploadSource interface {
Noop()
}

var UploadSourceTypeUnion = types.NewTypeUnion[UploadSource](
(*PackageSource)(nil),
(*UrlSource)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&UploadSourceTypeUnion, "type")

type PackageSource struct {
serder.Metadata `union:"jcs"`
UploadSourceBase
Type string `json:"type"`
PackageID cdssdk.PackageID `json:"packageID"`
}

type UrlSource struct {
serder.Metadata `union:"url"`
UploadSourceBase
Type string `json:"type"`
Url string `json:"url"`
}

type UploadSourceBase struct{}

func (d *UploadSourceBase) Noop() {}

type UploadTarget interface {
Noop()
}

var UploadTargetTypeUnion = types.NewTypeUnion[UploadTarget](
(*UrlTarget)(nil),
(*ApiTarget)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&UploadTargetTypeUnion, "type")

type UrlTarget struct {
serder.Metadata `union:"url"`
UploadTargetBase
Type string `json:"type"`
ClusterID ClusterID `json:"clusterId"`
JCSUploadInfo cdsapi.ObjectUploadInfo `form:"JCSUploadInfo"`
}

type ApiTarget struct {
serder.Metadata `union:"api"`
UploadTargetBase
Type string `json:"type"`
Clusters []ClusterID `json:"clusters"`
}

type UploadTargetBase struct{}

func (d *UploadTargetBase) Noop() {}

type UploadResp struct {
PackageID cdssdk.PackageID `json:"packageID"`
ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
ClusterID ClusterID `json:"clusterID"`
JsonData string `json:"jsonData"`
Status bool `json:"status"`
Message string `json:"message"`
type ObjectUploadReq struct {
Info cdsapi.ObjectUploadInfo `form:"info" binding:"required"`
Files []*multipart.FileHeader `form:"files"`
} }


func (c *Client) Upload(req UploadReq) (*UploadResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/dataUpload")
func (c *Client) Upload(req ObjectUploadReq) (*cdsapi.ObjectUploadResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/object/upload")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -161,7 +30,7 @@ func (c *Client) Upload(req UploadReq) (*UploadResp, error) {


contType := resp.Header.Get("Content-Type") contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) { if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[UploadResp]
var codeResp response[cdsapi.ObjectUploadResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err) return nil, fmt.Errorf("parsing response: %w", err)
} }


+ 29
- 0
utils/reflect2/assign.go View File

@@ -0,0 +1,29 @@
package reflect2

import "reflect"

// MergeNonZero 将 src 中非零值字段复制到 dst 中
func MergeNonZero(dst, src any) {
dstVal := reflect.ValueOf(dst).Elem()
srcVal := reflect.ValueOf(src).Elem()

for i := 0; i < dstVal.NumField(); i++ {
dstField := dstVal.Field(i)
srcField := srcVal.Field(i)

// 如果 src 字段是零值,则跳过
if isZero(srcField) {
continue
}

// 如果 dst 字段可设置,才设置
if dstField.CanSet() {
dstField.Set(srcField)
}
}
}

func isZero(v reflect.Value) bool {
zero := reflect.Zero(v.Type())
return reflect.DeepEqual(v.Interface(), zero.Interface())
}

Loading…
Cancel
Save