From 171f54a6b2f916247af7370f08f9e0182e41b992 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Wed, 18 Dec 2024 15:28:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A5=E5=85=A5=E5=AD=98=E8=AF=81=E5=92=8Cup?= =?UTF-8?q?loader=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/blockchain/blockchain.go | 34 +++++++ sdks/blockchain/client.go | 57 +++++++++++ sdks/blockchain/config.go | 9 ++ sdks/blockchain/models.go | 3 + sdks/blockchain/test.go | 37 +++++++ sdks/blockchain/uploader.go | 179 +++++++++++++++++++++++++++++++++ sdks/pcmscheduler/jobset.go | 139 +++++++++++++++++++------- sdks/pcmscheduler/models.go | 122 +++++++++++++---------- sdks/scheduler/models.go | 105 ++++++++++++++++++++ sdks/uploader/client.go | 57 +++++++++++ sdks/uploader/config.go | 5 + sdks/uploader/models.go | 28 ++++++ sdks/uploader/uploader.go | 182 ++++++++++++++++++++++++++++++++++ utils/config/config.go | 6 +- 14 files changed, 874 insertions(+), 89 deletions(-) create mode 100644 sdks/blockchain/blockchain.go create mode 100644 sdks/blockchain/client.go create mode 100644 sdks/blockchain/config.go create mode 100644 sdks/blockchain/models.go create mode 100644 sdks/blockchain/test.go create mode 100644 sdks/blockchain/uploader.go create mode 100644 sdks/uploader/client.go create mode 100644 sdks/uploader/config.go create mode 100644 sdks/uploader/models.go create mode 100644 sdks/uploader/uploader.go diff --git a/sdks/blockchain/blockchain.go b/sdks/blockchain/blockchain.go new file mode 100644 index 0000000..9a51461 --- /dev/null +++ b/sdks/blockchain/blockchain.go @@ -0,0 +1,34 @@ +package blockchain + +import ( + "gitlink.org.cn/cloudream/common/utils/http2" + "net/url" +) + +type InvokeReq struct { + ContractAddress string `json:"contractAddress"` + FunctionName string `json:"functionName"` + MemberName string `json:"memberName"` + Type string `json:"type"` + Args []string `json:"args"` +} + +func (c *Client) BlockChainInvoke(req InvokeReq) error { + targetUrl, err := url.JoinPath(c.baseURL, "/contract/invoke") + if err != nil { + return err + } + + header := make(map[string]string) + header["Content-Type"] = http2.ContentTypeJSON + + _, err = http2.PostJSON(targetUrl, http2.RequestParam{ + Body: req, + Header: header, + }) + if err != nil { + return err + } + + return nil +} diff --git a/sdks/blockchain/client.go b/sdks/blockchain/client.go new file mode 100644 index 0000000..7121aa0 --- /dev/null +++ b/sdks/blockchain/client.go @@ -0,0 +1,57 @@ +package blockchain + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/sdks" +) + +type response[T any] struct { + Code int `json:"code"` + Message string `json:"message"` + Data T `json:"data"` +} + +const ( + ResponseCodeOK int = 200 +) + +func (r *response[T]) ToError() *sdks.CodeMessageError { + return &sdks.CodeMessageError{ + Code: fmt.Sprintf("%d", r.Code), + Message: r.Message, + } +} + +type Client struct { + baseURL string +} + +func NewClient(cfg *Config) *Client { + return &Client{ + baseURL: cfg.URL, + } +} + +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) +} + +type pool struct { + cfg *Config +} + +func NewPool(cfg *Config) Pool { + return &pool{ + cfg: cfg, + } +} +func (p *pool) Acquire() (*Client, error) { + cli := NewClient(p.cfg) + return cli, nil +} + +func (p *pool) Release(cli *Client) { + +} diff --git a/sdks/blockchain/config.go b/sdks/blockchain/config.go new file mode 100644 index 0000000..59254f5 --- /dev/null +++ b/sdks/blockchain/config.go @@ -0,0 +1,9 @@ +package blockchain + +type Config struct { + URL string `json:"url"` + ContractAddress string `json:"contractAddress"` + FunctionName string `json:"functionName"` + MemberName string `json:"memberName"` + Type string `json:"type"` +} diff --git a/sdks/blockchain/models.go b/sdks/blockchain/models.go new file mode 100644 index 0000000..8da50d0 --- /dev/null +++ b/sdks/blockchain/models.go @@ -0,0 +1,3 @@ +package blockchain + +type ClusterID string diff --git a/sdks/blockchain/test.go b/sdks/blockchain/test.go new file mode 100644 index 0000000..8a85711 --- /dev/null +++ b/sdks/blockchain/test.go @@ -0,0 +1,37 @@ +package blockchain + +import ( + "fmt" + "io/ioutil" + "net/http" + "strings" +) + +func main() { + url := "http://localhost:2006/contract/invoke" + method := "POST" + payload := strings.NewReader(`{` + " " + ` "contractAddress" : "0xc860ab27901b3c2b810165a6096c64d88763617f",` + " " + ` "functionName" : "storeEvidence",` + " " + ` "args" : ["3","touteng"],` + " " + ` "memberName" :"pcm",` + " " + ` "type": "2"` + " " + ` }`) + client := &http.Client{} + req, err := http.NewRequest(method, url, payload) + if err != nil { + fmt.Println(err) + return + } + req.Header.Add("User-Agent", "Apifox/1.0.0 (https://apifox.com)") + req.Header.Add("Content-Type", "application/json") + req.Header.Add("Accept", "*/*") + req.Header.Add("Host", "localhost:2006") + req.Header.Add("Connection", "keep-alive") + res, err := client.Do(req) + if err != nil { + fmt.Println(err) + return + } + defer res.Body.Close() + body, err := ioutil.ReadAll(res.Body) + if err != nil { + fmt.Println(err) + return + } + fmt.Println(string(body)) +} diff --git a/sdks/blockchain/uploader.go b/sdks/blockchain/uploader.go new file mode 100644 index 0000000..14bd147 --- /dev/null +++ b/sdks/blockchain/uploader.go @@ -0,0 +1,179 @@ +package blockchain + +import ( + "fmt" + "gitlink.org.cn/cloudream/common/pkgs/types" + sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/serder" + "net/url" + "strings" + "time" +) + +type DataID int64 + +type Cluster struct { + DataID DataID `gorm:"column:dataID" json:"dataID"` + ClusterID schsdk.ClusterID `gorm:"column:clusterID" json:"clusterID"` + StorageID cdssdk.StorageID `gorm:"column:storageID" json:"storageID"` +} + +func (Cluster) TableName() string { + return "UploadedCluster" // 确保和数据库中的表名一致 +} + +type UploadedData struct { + ID DataID `gorm:"column:id;primaryKey" json:"id"` + Name string `gorm:"column:name" json:"name"` + DataType string `gorm:"column:dataType" json:"dataType"` + PackageID cdssdk.PackageID `gorm:"column:packageID" json:"packageID"` + JsonData string `gorm:"column:jsonData" json:"jsonData"` // JSON 数据字段 + UploadTime time.Time `gorm:"column:uploadTime" json:"uploadTime"` + UploadedCluster []Cluster `gorm:"foreignKey:dataID;references:id" json:"Clusters"` // 关联 Cluster 数据 +} + +type DataScheduleReq struct { + PackageID cdssdk.PackageID `json:"packageID"` + DataType string `json:"dataType"` + Clusters []Cluster `json:"clusters"` +} + +type codeRepository struct { + RepositoryName string + ClusterID ClusterID +} + +type DataScheduleResp struct { + Results []sch.DataScheduleResult `json:"results"` +} + +func (c *Client) DataSchedule(req DataScheduleReq) (*DataScheduleResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/jobSet/schedule") + if err != nil { + return nil, err + } + + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp response[DataScheduleResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} + +type UploadReq struct { + Type string `json:"type"` + Source UploadSource `json:"source"` + Target UploadTarget `json:"target"` + StorageIDs []cdssdk.StorageID `json:"storageIDs"` +} + +type UploadSource interface { + Noop() +} + +var UploadSourceTypeUnion = types.NewTypeUnion[UploadSource]( + (*PackageSource)(nil), + (*UrlSource)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&UploadSourceTypeUnion, "type") + +type PackageSource struct { + serder.Metadata `union:"packageSource"` + UploadSourceBase + Type string `json:"type"` + PackageID cdssdk.PackageID `json:"packageID"` +} + +type UrlSource struct { + serder.Metadata `union:"urlSource"` + UploadSourceBase + Type string `json:"type"` + Url string `json:"url"` +} + +type UploadSourceBase struct{} + +func (d *UploadSourceBase) Noop() {} + +type UploadTarget interface { + Noop() +} + +var UploadTargetTypeUnion = types.NewTypeUnion[UploadTarget]( + (*UrlTarget)(nil), + (*ApiTarget)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&UploadTargetTypeUnion, "type") + +type UrlTarget struct { + serder.Metadata `union:"url"` + UploadTargetBase + Clusters []ClusterID `json:"clusters"` +} + +type ApiTarget struct { + serder.Metadata `union:"api"` + UploadTargetBase + Clusters []ClusterID `json:"clusters"` +} + +type UploadTargetBase struct{} + +func (d *UploadTargetBase) Noop() {} + +type UploadResp struct { + PackageID cdssdk.PackageID `json:"packageID"` + JsonData string `json:"jsonData"` +} + +func (c *Client) Upload(req UploadReq) (*UploadResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/data/upload") + if err != nil { + return nil, err + } + + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp response[UploadResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} diff --git a/sdks/pcmscheduler/jobset.go b/sdks/pcmscheduler/jobset.go index ea4d69c..450be9a 100644 --- a/sdks/pcmscheduler/jobset.go +++ b/sdks/pcmscheduler/jobset.go @@ -11,7 +11,12 @@ import ( ) type GetClusterInfoReq struct { - IDs []ClusterID `json:"ids"` + IDs []schsdk.ClusterID `json:"clusterIDs"` +} + +type GetClusterInfoResp struct { + Data []ClusterDetail `json:"data"` + TraceId string `json:"traceId"` } func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error) { @@ -19,7 +24,8 @@ func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error) if err != nil { return nil, err } - resp, err := http2.GetJSON(targetUrl, http2.RequestParam{Body: req}) + + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{Body: req}) if err != nil { return nil, err } @@ -42,8 +48,8 @@ func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error) } type CreateJobReq struct { - DataDistribute DataDistribute `json:"dataDistribute"` - Resources schsdk.JobResourcesInfo `json:"resources"` + JobResources schsdk.JobResources `json:"jobResources"` + DataDistribute DataDistribute `json:"dataDistribute"` } type DataDistribute struct { @@ -53,33 +59,33 @@ type DataDistribute struct { Model []ModelDistribute `json:"model"` } +type DataDetail struct { + ClusterID schsdk.ClusterID `json:"clusterID"` + JsonData string `json:"jsonData"` +} + type DatasetDistribute struct { DataName string `json:"dataName"` PackageID cdssdk.PackageID `json:"packageID"` - Clusters []ClusterID `json:"clusters"` + Clusters []DataDetail `json:"clusters"` } type CodeDistribute struct { DataName string `json:"dataName"` PackageID cdssdk.PackageID `json:"packageID"` - Clusters []ClusterID `json:"clusters"` + Clusters []DataDetail `json:"clusters"` } type ImageDistribute struct { DataName string `json:"dataName"` PackageID cdssdk.PackageID `json:"packageID"` - Clusters []ClusterID `json:"clusters"` + Clusters []DataDetail `json:"clusters"` } type ModelDistribute struct { DataName string `json:"dataName"` PackageID cdssdk.PackageID `json:"packageID"` - Clusters []ClusterID `json:"clusters"` -} - -type Cluster struct { - ClusterID ClusterID `json:"clusterID"` - StorageID cdssdk.StorageID `json:"storageID"` + Clusters []DataDetail `json:"clusters"` } type CreateJobResp struct { @@ -88,35 +94,40 @@ type CreateJobResp struct { } type ScheduleData struct { - DataType string `json:"dataType"` - PackageID cdssdk.PackageID `json:"packageID"` - StorageType string `json:"storageType"` - ClusterIDs []ClusterID `json:"clusterIDs"` + DataType string `json:"dataType"` + PackageID cdssdk.PackageID `json:"packageID"` + StorageType string `json:"storageType"` + ClusterIDs []schsdk.ClusterID `json:"clusterIDs"` } -func (c *Client) CreateJob(req CreateJobReq) (CreateJobResp, error) { +func (c *Client) CreateJob(req CreateJobReq) (*CreateJobResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/jobSet/submit") + if err != nil { + return nil, err + } -} + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } -type DataScheduleReq struct { - PackageID cdssdk.PackageID `json:"packageID"` - StorageType string `json:"storageType"` - Clusters []Cluster `json:"clusters"` -} + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp response[CreateJobResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } -type DataScheduleResp struct { - Results []DataScheduleResult `json:"results"` -} + if codeResp.Code == ResponseCodeOK { + return &codeResp.Data, nil + } -type DataScheduleResult struct { - ClusterID ClusterID `json:"clusterID"` - PackageID cdssdk.PackageID `json:"packageID"` - PackageFullPath string `json:"packageFullPath"` - Status bool `json:"status"` - Msg string `json:"msg"` -} + return nil, codeResp.ToError() + } -func (c *Client) DataSchedule(req DataScheduleReq) (DataScheduleResp, error) { + return nil, fmt.Errorf("unknow response content type: %s", contType) } @@ -125,12 +136,47 @@ type RunJobReq struct { ScheduledDatas []DataScheduleResults `json:"scheduledDatas"` } +type DataScheduleResult struct { + Clusters DataDetail `json:"clusters"` + PackageID cdssdk.PackageID `json:"packageID"` + PackageFullPath string `json:"packageFullPath"` + Status bool `json:"status"` + Msg string `json:"msg"` +} + type DataScheduleResults struct { DataType string `json:"dataType"` Results []DataScheduleResult `json:"results"` } func (c *Client) RunJob(req RunJobReq) error { + targetUrl, err := url.JoinPath(c.baseURL, "/jobSet/submit") + if err != nil { + return err + } + + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Body: req, + }) + if err != nil { + return err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp response[string] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return nil + } + + return codeResp.ToError() + } + + return fmt.Errorf("unknow response content type: %s", contType) } @@ -140,5 +186,28 @@ type CancelJobReq struct { } func (c *Client) CancelJob(req CancelJobReq) error { + targetUrl, err := url.JoinPath(c.baseURL, "/queryResources") + if err != nil { + return err + } + resp, err := http2.GetJSON(targetUrl, http2.RequestParam{Body: req}) + if err != nil { + return err + } + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + + var codeResp response[string] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return nil + } + + return codeResp.ToError() + } + return fmt.Errorf("unknow response content type: %s", contType) } diff --git a/sdks/pcmscheduler/models.go b/sdks/pcmscheduler/models.go index e9bda19..466d560 100644 --- a/sdks/pcmscheduler/models.go +++ b/sdks/pcmscheduler/models.go @@ -2,6 +2,7 @@ package sch import ( "gitlink.org.cn/cloudream/common/pkgs/types" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" "gitlink.org.cn/cloudream/common/utils/serder" ) @@ -21,12 +22,26 @@ const ( MODEL = "model" ) -type ClusterID int64 type TaskID int64 type ClusterDetail struct { - ID ClusterID `json:"id"` - Resources []ResourceData `json:"resources"` + // 集群ID + ClusterId schsdk.ClusterID `json:"clusterID"` + // 集群功能类型:云算,智算,超算 + ClusterType string `json:"clusterType"` + // 集群地区:华东地区、华南地区、华北地区、华中地区、西南地区、西北地区、东北地区 + Region string `json:"region"` + // 资源类型 + Resources2 []ResourceData `json:"resources1,omitempty"` + //Resources []ResourceData `json:"resources"` + Resources []TmpResourceData `json:"resources"` +} + +type TmpResourceData struct { + Type ResourceType `json:"type"` + Name string `json:"name"` + Total UnitValue[float64] `json:"total"` + Available UnitValue[float64] `json:"available"` } type ResourceData interface { @@ -38,10 +53,13 @@ var ResourceDataTypeUnion = types.NewTypeUnion[ResourceData]( (*NPUResourceData)(nil), (*GPUResourceData)(nil), (*MLUResourceData)(nil), + (*DCUResourceData)(nil), + (*GCUResourceData)(nil), + (*GPGPUResourceData)(nil), (*StorageResourceData)(nil), (*MemoryResourceData)(nil), ) -var _ = serder.UseTypeUnionInternallyTagged(&ResourceDataTypeUnion, "name") +var _ = serder.UseTypeUnionInternallyTagged(&ResourceDataTypeUnion, "type") type ResourceDataBase struct{} @@ -55,99 +73,84 @@ type UnitValue[T any] struct { type CPUResourceData struct { serder.Metadata `union:"CPU"` ResourceDataBase + Type string `json:"type"` Name ResourceType `json:"name"` Total UnitValue[int64] `json:"total"` Available UnitValue[int64] `json:"available"` } -func NewCPUResourceData(total UnitValue[int64], available UnitValue[int64]) *CPUResourceData { - return &CPUResourceData{ - Name: ResourceTypeCPU, - Total: total, - Available: available, - } -} - type NPUResourceData struct { serder.Metadata `union:"NPU"` ResourceDataBase + Type string `json:"type"` Name ResourceType `json:"name"` Total UnitValue[int64] `json:"total"` Available UnitValue[int64] `json:"available"` } -func NewNPUResourceData(total UnitValue[int64], available UnitValue[int64]) *NPUResourceData { - return &NPUResourceData{ - Name: ResourceTypeNPU, - Total: total, - Available: available, - } -} - type GPUResourceData struct { serder.Metadata `union:"GPU"` ResourceDataBase + Type string `json:"type"` Name ResourceType `json:"name"` Total UnitValue[int64] `json:"total"` Available UnitValue[int64] `json:"available"` } -func NewGPUResourceData(total UnitValue[int64], available UnitValue[int64]) *GPUResourceData { - return &GPUResourceData{ - Name: ResourceTypeGPU, - Total: total, - Available: available, - } -} - type MLUResourceData struct { serder.Metadata `union:"MLU"` ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[int64] `json:"total"` + Available UnitValue[int64] `json:"available"` +} + +type DCUResourceData struct { + serder.Metadata `union:"DCU"` + ResourceDataBase + Type string `json:"type"` Name ResourceType `json:"name"` Total UnitValue[int64] `json:"total"` Available UnitValue[int64] `json:"available"` } -func NewMLUResourceData(total UnitValue[int64], available UnitValue[int64]) *MLUResourceData { - return &MLUResourceData{ - Name: ResourceTypeMLU, - Total: total, - Available: available, - } +type GCUResourceData struct { + serder.Metadata `union:"GCU"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[int64] `json:"total"` + Available UnitValue[int64] `json:"available"` +} + +type GPGPUResourceData struct { + serder.Metadata `union:"ILUVATAR-GPGPU"` + ResourceDataBase + Type string `json:"type"` + Name ResourceType `json:"name"` + Total UnitValue[int64] `json:"total"` + Available UnitValue[int64] `json:"available"` } type StorageResourceData struct { serder.Metadata `union:"STORAGE"` ResourceDataBase + Type string `json:"type"` Name ResourceType `json:"name"` Total UnitValue[float64] `json:"total"` Available UnitValue[float64] `json:"available"` } -func NewStorageResourceData(total UnitValue[float64], available UnitValue[float64]) *StorageResourceData { - return &StorageResourceData{ - Name: ResourceTypeStorage, - Total: total, - Available: available, - } -} - type MemoryResourceData struct { serder.Metadata `union:"MEMORY"` ResourceDataBase + Type string `json:"type"` Name ResourceType `json:"name"` Total UnitValue[float64] `json:"total"` Available UnitValue[float64] `json:"available"` } -func NewMemoryResourceData(total UnitValue[float64], available UnitValue[float64]) *MemoryResourceData { - return &MemoryResourceData{ - Name: ResourceTypeMemory, - Total: total, - Available: available, - } -} - type ResourcePriority interface { Noop() } @@ -168,21 +171,31 @@ func (d *ResourcePriorityBase) Noop() {} type RegionPriority struct { serder.Metadata `union:"region"` ResourcePriorityBase + Type string `json:"type"` Options []string `json:"options"` } type ChipPriority struct { serder.Metadata `union:"chip"` ResourcePriorityBase + Type string `json:"type"` Options []string `json:"options"` } type BiasPriority struct { serder.Metadata `union:"bias"` ResourcePriorityBase + Type string `json:"type"` Options []string `json:"options"` } +type UploadParams struct { + DataType string `json:"dataType"` + DataName string `json:"dataName"` + UploadInfo UploadInfo `json:"uploadInfo"` + UploadPriority UploadPriority `json:"uploadPriority"` +} + type UploadInfo interface { Noop() } @@ -197,13 +210,16 @@ var _ = serder.UseTypeUnionInternallyTagged(&UploadInfoTypeUnion, "type") type LocalUploadInfo struct { serder.Metadata `union:"local"` UploadInfoBase + Type string `json:"type"` LocalPath string `json:"localPath"` } type RemoteUploadInfo struct { serder.Metadata `union:"url"` UploadInfoBase - Url string `json:"url"` + Type string `json:"type"` + Url string `json:"url"` + TargetClusters []schsdk.ClusterID `json:"targetClusters"` } type UploadInfoBase struct{} @@ -224,13 +240,15 @@ var _ = serder.UseTypeUnionInternallyTagged(&UploadPriorityTypeUnion, "type") type Preferences struct { serder.Metadata `union:"preference"` UploadPriorityBase + Type string `json:"type"` ResourcePriorities []ResourcePriority `json:"priorities"` } type SpecifyCluster struct { serder.Metadata `union:"specify"` UploadPriorityBase - Clusters []ClusterID `json:"clusters"` + Type string `json:"type"` + Clusters []schsdk.ClusterID `json:"clusters"` } type UploadPriorityBase struct{} diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index b6821e7..f8bccf4 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -38,6 +38,8 @@ type ECSInstanceID string type NodeID int64 type Address string +type ClusterID string + type JobSetInfo struct { Jobs []JobInfo `json:"jobs"` } @@ -54,6 +56,7 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( (*UpdateMultiInstanceJobInfo)(nil), (*FinetuningJobInfo)(nil), (*DataPreprocessJobInfo)(nil), + (*PCMJobInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") @@ -76,6 +79,98 @@ type NormalJobInfo struct { ModelJobInfo ModelJobInfo `json:"modelJobInfo"` } +type PCMJobInfo struct { + serder.Metadata `union:"PCM"` + JobInfoBase + Type string `json:"type"` + Files JobFilesInfo `json:"files"` + JobResources JobResources `json:"jobResources"` +} + +type JobResources struct { + ScheduleStrategy string `json:"scheduleStrategy"` //任务分配策略:负载均衡、积分优先、随机分配等 + Clusters []ClusterInfo `json:"clusters"` +} + +type ClusterInfo struct { + ClusterID ClusterID `json:"clusterID"` + Resources []Resource `json:"resources"` + Runtime JobRuntimeInfo `json:"runtime"` +} + +type Resource struct { + Resource []JobResource `json:"resource"` +} + +type JobResource interface { + Noop() +} + +var JobResourceTypeUnion = types.NewTypeUnion[JobResource]( + (*CPU)(nil), + (*GPU)(nil), + (*NPU)(nil), + (*MLU)(nil), + (*DCU)(nil), + (*Memory)(nil), + (*PRICE)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&JobResourceTypeUnion, "type") + +type JobResourceBase struct{} + +func (d *JobResourceBase) Noop() {} + +type CPU struct { + serder.Metadata `union:"CPU"` + JobResourceBase + Type string `json:"type"` + Number int64 `json:"number"` +} + +type GPU struct { + serder.Metadata `union:"GPU"` + JobResourceBase + Type string `json:"type"` + Number int64 `json:"number"` +} + +type NPU struct { + serder.Metadata `union:"NPU"` + JobResourceBase + Type string `json:"type"` + Number int64 `json:"number"` +} + +type Memory struct { + serder.Metadata `union:"Memory"` + JobResourceBase + Type string `json:"type"` + Number int64 `json:"number"` +} + +type DCU struct { + serder.Metadata `union:"DCU"` + JobResourceBase + Type string `json:"type"` + Number int64 `json:"number"` +} + +type MLU struct { + serder.Metadata `union:"MLU"` + JobResourceBase + Type string `json:"type"` + Number int64 `json:"number"` +} + +type PRICE struct { + serder.Metadata `union:"PRICE"` + JobResourceBase + Type string `json:"type"` + Number int64 `json:"number"` +} + // FinetuningJobInfo 模型微调 type FinetuningJobInfo struct { serder.Metadata `union:"Finetuning"` @@ -154,6 +249,7 @@ type JobFilesInfo struct { Dataset JobFileInfo `json:"dataset"` Code JobFileInfo `json:"code"` Image JobFileInfo `json:"image"` + Model JobFileInfo `json:"model"` } type JobFileInfo interface { @@ -165,6 +261,7 @@ var FileInfoTypeUnion = types.NewTypeUnion[JobFileInfo]( (*LocalJobFileInfo)(nil), (*DataReturnJobFileInfo)(nil), (*ImageJobFileInfo)(nil), + (*BindingJobFileInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&FileInfoTypeUnion, "type") @@ -172,6 +269,13 @@ type JobFileInfoBase struct{} func (i *JobFileInfoBase) Noop() {} +type BindingJobFileInfo struct { + serder.Metadata `union:"Binding"` + JobFileInfoBase + Type string `json:"type"` + BindingID int64 `json:"bindingID"` +} + type PackageJobFileInfo struct { serder.Metadata `union:"Package"` JobFileInfoBase @@ -203,6 +307,7 @@ type ImageJobFileInfo struct { type JobRuntimeInfo struct { Command string `json:"command"` Envs []KVPair `json:"envs"` + Params []KVPair `json:"params"` } type KVPair struct { diff --git a/sdks/uploader/client.go b/sdks/uploader/client.go new file mode 100644 index 0000000..7b1f385 --- /dev/null +++ b/sdks/uploader/client.go @@ -0,0 +1,57 @@ +package uploadersdk + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/sdks" +) + +type response[T any] struct { + Code int `json:"code"` + Message string `json:"message"` + Data T `json:"data"` +} + +const ( + ResponseCodeOK int = 200 +) + +func (r *response[T]) ToError() *sdks.CodeMessageError { + return &sdks.CodeMessageError{ + Code: fmt.Sprintf("%d", r.Code), + Message: r.Message, + } +} + +type Client struct { + baseURL string +} + +func NewClient(cfg *Config) *Client { + return &Client{ + baseURL: cfg.URL, + } +} + +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) +} + +type pool struct { + cfg *Config +} + +func NewPool(cfg *Config) Pool { + return &pool{ + cfg: cfg, + } +} +func (p *pool) Acquire() (*Client, error) { + cli := NewClient(p.cfg) + return cli, nil +} + +func (p *pool) Release(cli *Client) { + +} diff --git a/sdks/uploader/config.go b/sdks/uploader/config.go new file mode 100644 index 0000000..71cf23d --- /dev/null +++ b/sdks/uploader/config.go @@ -0,0 +1,5 @@ +package uploadersdk + +type Config struct { + URL string `json:"url"` +} diff --git a/sdks/uploader/models.go b/sdks/uploader/models.go new file mode 100644 index 0000000..863cb1f --- /dev/null +++ b/sdks/uploader/models.go @@ -0,0 +1,28 @@ +package uploadersdk + +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + +type ClusterID string + +type BlockChain struct { + DataID DataID `gorm:"column:dataID" json:"dataID"` + BlockChainID string `gorm:"column:blockChainID" json:"blockChainID"` + FileName string `gorm:"column:fileName" json:"fileName"` + FileHash string `gorm:"column:fileHash" json:"fileHash"` + FileSize int64 `gorm:"column:fileSize" json:"fileSize"` +} + +func (BlockChain) TableName() string { + return "BlockChain" // 确保和数据库中的表名一致 +} + +type BindingData struct { + ID DataID `gorm:"column:ID" json:"ID"` + UserID cdssdk.UserID `gorm:"column:userID" json:"userID"` + BindingName string `gorm:"column:bindingName" json:"bindingName"` + BindingType string `gorm:"column:bindingType" json:"bindingType"` +} + +func (BindingData) TableName() string { + return "BindingData" // 确保和数据库中的表名一致 +} diff --git a/sdks/uploader/uploader.go b/sdks/uploader/uploader.go new file mode 100644 index 0000000..8c607d4 --- /dev/null +++ b/sdks/uploader/uploader.go @@ -0,0 +1,182 @@ +package uploadersdk + +import ( + "fmt" + "gitlink.org.cn/cloudream/common/pkgs/types" + sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/serder" + "net/url" + "strings" + "time" +) + +type DataID int64 + +type Cluster struct { + DataID DataID `gorm:"column:dataID" json:"dataID"` + ClusterID schsdk.ClusterID `gorm:"column:clusterID" json:"clusterID"` + StorageID cdssdk.StorageID `gorm:"column:storageID" json:"storageID"` +} + +func (Cluster) TableName() string { + return "UploadedCluster" // 确保和数据库中的表名一致 +} + +type UploadedData struct { + ID DataID `gorm:"column:id;primaryKey" json:"id"` + UserID cdssdk.UserID `gorm:"column:userID" json:"userID"` + Name string `gorm:"column:name" json:"name"` + DataType string `gorm:"column:dataType" json:"dataType"` + PackageID cdssdk.PackageID `gorm:"column:packageID" json:"packageID"` + JsonData string `gorm:"column:jsonData" json:"jsonData"` // JSON 数据字段 + BindingID DataID `gorm:"column:bindingID" json:"bindingID"` + UploadTime time.Time `gorm:"column:uploadTime" json:"uploadTime"` + UploadedCluster []Cluster `gorm:"foreignKey:dataID;references:id" json:"clusters"` // 关联 Cluster 数据 + BlockChain []BlockChain `gorm:"foreignKey:dataID;references:id" json:"blockChains"` // 关联 BlockChain 数据 +} + +type DataScheduleReq struct { + PackageID cdssdk.PackageID `json:"packageID"` + DataType string `json:"dataType"` + Clusters []Cluster `json:"clusters"` +} + +type codeRepository struct { + RepositoryName string + ClusterID ClusterID +} + +type DataScheduleResp struct { + Results []sch.DataScheduleResult `json:"results"` +} + +func (c *Client) DataSchedule(req DataScheduleReq) (*DataScheduleResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/jobSet/schedule") + if err != nil { + return nil, err + } + + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp response[DataScheduleResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} + +type UploadReq struct { + Type string `json:"type"` + Source UploadSource `json:"source"` + Target UploadTarget `json:"target"` + StorageIDs []cdssdk.StorageID `json:"storageIDs"` +} + +type UploadSource interface { + Noop() +} + +var UploadSourceTypeUnion = types.NewTypeUnion[UploadSource]( + (*PackageSource)(nil), + (*UrlSource)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&UploadSourceTypeUnion, "type") + +type PackageSource struct { + serder.Metadata `union:"packageSource"` + UploadSourceBase + Type string `json:"type"` + PackageID cdssdk.PackageID `json:"packageID"` +} + +type UrlSource struct { + serder.Metadata `union:"urlSource"` + UploadSourceBase + Type string `json:"type"` + Url string `json:"url"` +} + +type UploadSourceBase struct{} + +func (d *UploadSourceBase) Noop() {} + +type UploadTarget interface { + Noop() +} + +var UploadTargetTypeUnion = types.NewTypeUnion[UploadTarget]( + (*UrlTarget)(nil), + (*ApiTarget)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&UploadTargetTypeUnion, "type") + +type UrlTarget struct { + serder.Metadata `union:"url"` + UploadTargetBase + Clusters []ClusterID `json:"clusters"` +} + +type ApiTarget struct { + serder.Metadata `union:"api"` + UploadTargetBase + Clusters []ClusterID `json:"clusters"` +} + +type UploadTargetBase struct{} + +func (d *UploadTargetBase) Noop() {} + +type UploadResp struct { + PackageID cdssdk.PackageID `json:"packageID"` + JsonData string `json:"jsonData"` +} + +func (c *Client) Upload(req UploadReq) (*UploadResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/data/upload") + if err != nil { + return nil, err + } + + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp response[UploadResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} diff --git a/utils/config/config.go b/utils/config/config.go index d1bdb89..3c4fe38 100644 --- a/utils/config/config.go +++ b/utils/config/config.go @@ -3,10 +3,9 @@ package config import ( "encoding/json" "fmt" + "github.com/imdario/mergo" "os" "path/filepath" - - "github.com/imdario/mergo" ) // Load 从本地文件读取配置,加载配置文件 @@ -32,6 +31,9 @@ func DefaultLoad(modeulName string, defCfg interface{}) error { // TODO 可以考虑根据环境变量读取不同的配置 // filepath.Join用于将多个路径组合成一个路径 configFilePath := filepath.Join(filepath.Dir(execPath), "..", "confs", fmt.Sprintf("%s.config.json", modeulName)) + + configFilePath = "D:\\Work\\Codes\\workspace\\workspace\\scheduler\\common\\assets\\confs\\middleware.json" + return Load(configFilePath, defCfg) }