Browse Source

接入存证和uploader服务

feature_wq
JeshuaRen 11 months ago
parent
commit
171f54a6b2
14 changed files with 874 additions and 89 deletions
  1. +34
    -0
      sdks/blockchain/blockchain.go
  2. +57
    -0
      sdks/blockchain/client.go
  3. +9
    -0
      sdks/blockchain/config.go
  4. +3
    -0
      sdks/blockchain/models.go
  5. +37
    -0
      sdks/blockchain/test.go
  6. +179
    -0
      sdks/blockchain/uploader.go
  7. +104
    -35
      sdks/pcmscheduler/jobset.go
  8. +70
    -52
      sdks/pcmscheduler/models.go
  9. +105
    -0
      sdks/scheduler/models.go
  10. +57
    -0
      sdks/uploader/client.go
  11. +5
    -0
      sdks/uploader/config.go
  12. +28
    -0
      sdks/uploader/models.go
  13. +182
    -0
      sdks/uploader/uploader.go
  14. +4
    -2
      utils/config/config.go

+ 34
- 0
sdks/blockchain/blockchain.go View File

@@ -0,0 +1,34 @@
package blockchain

import (
"gitlink.org.cn/cloudream/common/utils/http2"
"net/url"
)

type InvokeReq struct {
ContractAddress string `json:"contractAddress"`
FunctionName string `json:"functionName"`
MemberName string `json:"memberName"`
Type string `json:"type"`
Args []string `json:"args"`
}

func (c *Client) BlockChainInvoke(req InvokeReq) error {
targetUrl, err := url.JoinPath(c.baseURL, "/contract/invoke")
if err != nil {
return err
}

header := make(map[string]string)
header["Content-Type"] = http2.ContentTypeJSON

_, err = http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
Header: header,
})
if err != nil {
return err
}

return nil
}

+ 57
- 0
sdks/blockchain/client.go View File

@@ -0,0 +1,57 @@
package blockchain

import (
"fmt"

"gitlink.org.cn/cloudream/common/sdks"
)

type response[T any] struct {
Code int `json:"code"`
Message string `json:"message"`
Data T `json:"data"`
}

const (
ResponseCodeOK int = 200
)

func (r *response[T]) ToError() *sdks.CodeMessageError {
return &sdks.CodeMessageError{
Code: fmt.Sprintf("%d", r.Code),
Message: r.Message,
}
}

type Client struct {
baseURL string
}

func NewClient(cfg *Config) *Client {
return &Client{
baseURL: cfg.URL,
}
}

type Pool interface {
Acquire() (*Client, error)
Release(cli *Client)
}

type pool struct {
cfg *Config
}

func NewPool(cfg *Config) Pool {
return &pool{
cfg: cfg,
}
}
func (p *pool) Acquire() (*Client, error) {
cli := NewClient(p.cfg)
return cli, nil
}

func (p *pool) Release(cli *Client) {

}

+ 9
- 0
sdks/blockchain/config.go View File

@@ -0,0 +1,9 @@
package blockchain

type Config struct {
URL string `json:"url"`
ContractAddress string `json:"contractAddress"`
FunctionName string `json:"functionName"`
MemberName string `json:"memberName"`
Type string `json:"type"`
}

+ 3
- 0
sdks/blockchain/models.go View File

@@ -0,0 +1,3 @@
package blockchain

type ClusterID string

+ 37
- 0
sdks/blockchain/test.go View File

@@ -0,0 +1,37 @@
package blockchain

import (
"fmt"
"io/ioutil"
"net/http"
"strings"
)

func main() {
url := "http://localhost:2006/contract/invoke"
method := "POST"
payload := strings.NewReader(`{` + " " + ` "contractAddress" : "0xc860ab27901b3c2b810165a6096c64d88763617f",` + " " + ` "functionName" : "storeEvidence",` + " " + ` "args" : ["3","touteng"],` + " " + ` "memberName" :"pcm",` + " " + ` "type": "2"` + " " + ` }`)
client := &http.Client{}
req, err := http.NewRequest(method, url, payload)
if err != nil {
fmt.Println(err)
return
}
req.Header.Add("User-Agent", "Apifox/1.0.0 (https://apifox.com)")
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Accept", "*/*")
req.Header.Add("Host", "localhost:2006")
req.Header.Add("Connection", "keep-alive")
res, err := client.Do(req)
if err != nil {
fmt.Println(err)
return
}
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(string(body))
}

+ 179
- 0
sdks/blockchain/uploader.go View File

@@ -0,0 +1,179 @@
package blockchain

import (
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/types"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder"
"net/url"
"strings"
"time"
)

type DataID int64

type Cluster struct {
DataID DataID `gorm:"column:dataID" json:"dataID"`
ClusterID schsdk.ClusterID `gorm:"column:clusterID" json:"clusterID"`
StorageID cdssdk.StorageID `gorm:"column:storageID" json:"storageID"`
}

func (Cluster) TableName() string {
return "UploadedCluster" // 确保和数据库中的表名一致
}

type UploadedData struct {
ID DataID `gorm:"column:id;primaryKey" json:"id"`
Name string `gorm:"column:name" json:"name"`
DataType string `gorm:"column:dataType" json:"dataType"`
PackageID cdssdk.PackageID `gorm:"column:packageID" json:"packageID"`
JsonData string `gorm:"column:jsonData" json:"jsonData"` // JSON 数据字段
UploadTime time.Time `gorm:"column:uploadTime" json:"uploadTime"`
UploadedCluster []Cluster `gorm:"foreignKey:dataID;references:id" json:"Clusters"` // 关联 Cluster 数据
}

type DataScheduleReq struct {
PackageID cdssdk.PackageID `json:"packageID"`
DataType string `json:"dataType"`
Clusters []Cluster `json:"clusters"`
}

type codeRepository struct {
RepositoryName string
ClusterID ClusterID
}

type DataScheduleResp struct {
Results []sch.DataScheduleResult `json:"results"`
}

func (c *Client) DataSchedule(req DataScheduleReq) (*DataScheduleResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/jobSet/schedule")
if err != nil {
return nil, err
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[DataScheduleResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
}

type UploadReq struct {
Type string `json:"type"`
Source UploadSource `json:"source"`
Target UploadTarget `json:"target"`
StorageIDs []cdssdk.StorageID `json:"storageIDs"`
}

type UploadSource interface {
Noop()
}

var UploadSourceTypeUnion = types.NewTypeUnion[UploadSource](
(*PackageSource)(nil),
(*UrlSource)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&UploadSourceTypeUnion, "type")

type PackageSource struct {
serder.Metadata `union:"packageSource"`
UploadSourceBase
Type string `json:"type"`
PackageID cdssdk.PackageID `json:"packageID"`
}

type UrlSource struct {
serder.Metadata `union:"urlSource"`
UploadSourceBase
Type string `json:"type"`
Url string `json:"url"`
}

type UploadSourceBase struct{}

func (d *UploadSourceBase) Noop() {}

type UploadTarget interface {
Noop()
}

var UploadTargetTypeUnion = types.NewTypeUnion[UploadTarget](
(*UrlTarget)(nil),
(*ApiTarget)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&UploadTargetTypeUnion, "type")

type UrlTarget struct {
serder.Metadata `union:"url"`
UploadTargetBase
Clusters []ClusterID `json:"clusters"`
}

type ApiTarget struct {
serder.Metadata `union:"api"`
UploadTargetBase
Clusters []ClusterID `json:"clusters"`
}

type UploadTargetBase struct{}

func (d *UploadTargetBase) Noop() {}

type UploadResp struct {
PackageID cdssdk.PackageID `json:"packageID"`
JsonData string `json:"jsonData"`
}

func (c *Client) Upload(req UploadReq) (*UploadResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/data/upload")
if err != nil {
return nil, err
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[UploadResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
}

+ 104
- 35
sdks/pcmscheduler/jobset.go View File

@@ -11,7 +11,12 @@ import (
) )


type GetClusterInfoReq struct { type GetClusterInfoReq struct {
IDs []ClusterID `json:"ids"`
IDs []schsdk.ClusterID `json:"clusterIDs"`
}

type GetClusterInfoResp struct {
Data []ClusterDetail `json:"data"`
TraceId string `json:"traceId"`
} }


func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error) { func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error) {
@@ -19,7 +24,8 @@ func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error)
if err != nil { if err != nil {
return nil, err return nil, err
} }
resp, err := http2.GetJSON(targetUrl, http2.RequestParam{Body: req})

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{Body: req})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -42,8 +48,8 @@ func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error)
} }


type CreateJobReq struct { type CreateJobReq struct {
DataDistribute DataDistribute `json:"dataDistribute"`
Resources schsdk.JobResourcesInfo `json:"resources"`
JobResources schsdk.JobResources `json:"jobResources"`
DataDistribute DataDistribute `json:"dataDistribute"`
} }


type DataDistribute struct { type DataDistribute struct {
@@ -53,33 +59,33 @@ type DataDistribute struct {
Model []ModelDistribute `json:"model"` Model []ModelDistribute `json:"model"`
} }


type DataDetail struct {
ClusterID schsdk.ClusterID `json:"clusterID"`
JsonData string `json:"jsonData"`
}

type DatasetDistribute struct { type DatasetDistribute struct {
DataName string `json:"dataName"` DataName string `json:"dataName"`
PackageID cdssdk.PackageID `json:"packageID"` PackageID cdssdk.PackageID `json:"packageID"`
Clusters []ClusterID `json:"clusters"`
Clusters []DataDetail `json:"clusters"`
} }


type CodeDistribute struct { type CodeDistribute struct {
DataName string `json:"dataName"` DataName string `json:"dataName"`
PackageID cdssdk.PackageID `json:"packageID"` PackageID cdssdk.PackageID `json:"packageID"`
Clusters []ClusterID `json:"clusters"`
Clusters []DataDetail `json:"clusters"`
} }


type ImageDistribute struct { type ImageDistribute struct {
DataName string `json:"dataName"` DataName string `json:"dataName"`
PackageID cdssdk.PackageID `json:"packageID"` PackageID cdssdk.PackageID `json:"packageID"`
Clusters []ClusterID `json:"clusters"`
Clusters []DataDetail `json:"clusters"`
} }


type ModelDistribute struct { type ModelDistribute struct {
DataName string `json:"dataName"` DataName string `json:"dataName"`
PackageID cdssdk.PackageID `json:"packageID"` PackageID cdssdk.PackageID `json:"packageID"`
Clusters []ClusterID `json:"clusters"`
}

type Cluster struct {
ClusterID ClusterID `json:"clusterID"`
StorageID cdssdk.StorageID `json:"storageID"`
Clusters []DataDetail `json:"clusters"`
} }


type CreateJobResp struct { type CreateJobResp struct {
@@ -88,35 +94,40 @@ type CreateJobResp struct {
} }


type ScheduleData struct { type ScheduleData struct {
DataType string `json:"dataType"`
PackageID cdssdk.PackageID `json:"packageID"`
StorageType string `json:"storageType"`
ClusterIDs []ClusterID `json:"clusterIDs"`
DataType string `json:"dataType"`
PackageID cdssdk.PackageID `json:"packageID"`
StorageType string `json:"storageType"`
ClusterIDs []schsdk.ClusterID `json:"clusterIDs"`
} }


func (c *Client) CreateJob(req CreateJobReq) (CreateJobResp, error) {
func (c *Client) CreateJob(req CreateJobReq) (*CreateJobResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/jobSet/submit")
if err != nil {
return nil, err
}


}
resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}


type DataScheduleReq struct {
PackageID cdssdk.PackageID `json:"packageID"`
StorageType string `json:"storageType"`
Clusters []Cluster `json:"clusters"`
}
contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[CreateJobResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}


type DataScheduleResp struct {
Results []DataScheduleResult `json:"results"`
}
if codeResp.Code == ResponseCodeOK {
return &codeResp.Data, nil
}


type DataScheduleResult struct {
ClusterID ClusterID `json:"clusterID"`
PackageID cdssdk.PackageID `json:"packageID"`
PackageFullPath string `json:"packageFullPath"`
Status bool `json:"status"`
Msg string `json:"msg"`
}
return nil, codeResp.ToError()
}


func (c *Client) DataSchedule(req DataScheduleReq) (DataScheduleResp, error) {
return nil, fmt.Errorf("unknow response content type: %s", contType)


} }


@@ -125,12 +136,47 @@ type RunJobReq struct {
ScheduledDatas []DataScheduleResults `json:"scheduledDatas"` ScheduledDatas []DataScheduleResults `json:"scheduledDatas"`
} }


type DataScheduleResult struct {
Clusters DataDetail `json:"clusters"`
PackageID cdssdk.PackageID `json:"packageID"`
PackageFullPath string `json:"packageFullPath"`
Status bool `json:"status"`
Msg string `json:"msg"`
}

type DataScheduleResults struct { type DataScheduleResults struct {
DataType string `json:"dataType"` DataType string `json:"dataType"`
Results []DataScheduleResult `json:"results"` Results []DataScheduleResult `json:"results"`
} }


func (c *Client) RunJob(req RunJobReq) error { func (c *Client) RunJob(req RunJobReq) error {
targetUrl, err := url.JoinPath(c.baseURL, "/jobSet/submit")
if err != nil {
return err
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
})
if err != nil {
return err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[string]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {
return nil
}

return codeResp.ToError()
}

return fmt.Errorf("unknow response content type: %s", contType)


} }


@@ -140,5 +186,28 @@ type CancelJobReq struct {
} }


func (c *Client) CancelJob(req CancelJobReq) error { func (c *Client) CancelJob(req CancelJobReq) error {
targetUrl, err := url.JoinPath(c.baseURL, "/queryResources")
if err != nil {
return err
}
resp, err := http2.GetJSON(targetUrl, http2.RequestParam{Body: req})
if err != nil {
return err
}
contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {

var codeResp response[string]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {
return nil
}

return codeResp.ToError()
}


return fmt.Errorf("unknow response content type: %s", contType)
} }

+ 70
- 52
sdks/pcmscheduler/models.go View File

@@ -2,6 +2,7 @@ package sch


import ( import (
"gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/pkgs/types"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
) )


@@ -21,12 +22,26 @@ const (
MODEL = "model" MODEL = "model"
) )


type ClusterID int64
type TaskID int64 type TaskID int64


type ClusterDetail struct { type ClusterDetail struct {
ID ClusterID `json:"id"`
Resources []ResourceData `json:"resources"`
// 集群ID
ClusterId schsdk.ClusterID `json:"clusterID"`
// 集群功能类型:云算,智算,超算
ClusterType string `json:"clusterType"`
// 集群地区:华东地区、华南地区、华北地区、华中地区、西南地区、西北地区、东北地区
Region string `json:"region"`
// 资源类型
Resources2 []ResourceData `json:"resources1,omitempty"`
//Resources []ResourceData `json:"resources"`
Resources []TmpResourceData `json:"resources"`
}

type TmpResourceData struct {
Type ResourceType `json:"type"`
Name string `json:"name"`
Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"`
} }


type ResourceData interface { type ResourceData interface {
@@ -38,10 +53,13 @@ var ResourceDataTypeUnion = types.NewTypeUnion[ResourceData](
(*NPUResourceData)(nil), (*NPUResourceData)(nil),
(*GPUResourceData)(nil), (*GPUResourceData)(nil),
(*MLUResourceData)(nil), (*MLUResourceData)(nil),
(*DCUResourceData)(nil),
(*GCUResourceData)(nil),
(*GPGPUResourceData)(nil),
(*StorageResourceData)(nil), (*StorageResourceData)(nil),
(*MemoryResourceData)(nil), (*MemoryResourceData)(nil),
) )
var _ = serder.UseTypeUnionInternallyTagged(&ResourceDataTypeUnion, "name")
var _ = serder.UseTypeUnionInternallyTagged(&ResourceDataTypeUnion, "type")


type ResourceDataBase struct{} type ResourceDataBase struct{}


@@ -55,99 +73,84 @@ type UnitValue[T any] struct {
type CPUResourceData struct { type CPUResourceData struct {
serder.Metadata `union:"CPU"` serder.Metadata `union:"CPU"`
ResourceDataBase ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"` Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"` Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"` Available UnitValue[int64] `json:"available"`
} }


func NewCPUResourceData(total UnitValue[int64], available UnitValue[int64]) *CPUResourceData {
return &CPUResourceData{
Name: ResourceTypeCPU,
Total: total,
Available: available,
}
}

type NPUResourceData struct { type NPUResourceData struct {
serder.Metadata `union:"NPU"` serder.Metadata `union:"NPU"`
ResourceDataBase ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"` Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"` Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"` Available UnitValue[int64] `json:"available"`
} }


func NewNPUResourceData(total UnitValue[int64], available UnitValue[int64]) *NPUResourceData {
return &NPUResourceData{
Name: ResourceTypeNPU,
Total: total,
Available: available,
}
}

type GPUResourceData struct { type GPUResourceData struct {
serder.Metadata `union:"GPU"` serder.Metadata `union:"GPU"`
ResourceDataBase ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"` Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"` Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"` Available UnitValue[int64] `json:"available"`
} }


func NewGPUResourceData(total UnitValue[int64], available UnitValue[int64]) *GPUResourceData {
return &GPUResourceData{
Name: ResourceTypeGPU,
Total: total,
Available: available,
}
}

type MLUResourceData struct { type MLUResourceData struct {
serder.Metadata `union:"MLU"` serder.Metadata `union:"MLU"`
ResourceDataBase ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type DCUResourceData struct {
serder.Metadata `union:"DCU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"` Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"` Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"` Available UnitValue[int64] `json:"available"`
} }


func NewMLUResourceData(total UnitValue[int64], available UnitValue[int64]) *MLUResourceData {
return &MLUResourceData{
Name: ResourceTypeMLU,
Total: total,
Available: available,
}
type GCUResourceData struct {
serder.Metadata `union:"GCU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
}

type GPGPUResourceData struct {
serder.Metadata `union:"ILUVATAR-GPGPU"`
ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"`
Total UnitValue[int64] `json:"total"`
Available UnitValue[int64] `json:"available"`
} }


type StorageResourceData struct { type StorageResourceData struct {
serder.Metadata `union:"STORAGE"` serder.Metadata `union:"STORAGE"`
ResourceDataBase ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"` Name ResourceType `json:"name"`
Total UnitValue[float64] `json:"total"` Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"` Available UnitValue[float64] `json:"available"`
} }


func NewStorageResourceData(total UnitValue[float64], available UnitValue[float64]) *StorageResourceData {
return &StorageResourceData{
Name: ResourceTypeStorage,
Total: total,
Available: available,
}
}

type MemoryResourceData struct { type MemoryResourceData struct {
serder.Metadata `union:"MEMORY"` serder.Metadata `union:"MEMORY"`
ResourceDataBase ResourceDataBase
Type string `json:"type"`
Name ResourceType `json:"name"` Name ResourceType `json:"name"`
Total UnitValue[float64] `json:"total"` Total UnitValue[float64] `json:"total"`
Available UnitValue[float64] `json:"available"` Available UnitValue[float64] `json:"available"`
} }


func NewMemoryResourceData(total UnitValue[float64], available UnitValue[float64]) *MemoryResourceData {
return &MemoryResourceData{
Name: ResourceTypeMemory,
Total: total,
Available: available,
}
}

type ResourcePriority interface { type ResourcePriority interface {
Noop() Noop()
} }
@@ -168,21 +171,31 @@ func (d *ResourcePriorityBase) Noop() {}
type RegionPriority struct { type RegionPriority struct {
serder.Metadata `union:"region"` serder.Metadata `union:"region"`
ResourcePriorityBase ResourcePriorityBase
Type string `json:"type"`
Options []string `json:"options"` Options []string `json:"options"`
} }


type ChipPriority struct { type ChipPriority struct {
serder.Metadata `union:"chip"` serder.Metadata `union:"chip"`
ResourcePriorityBase ResourcePriorityBase
Type string `json:"type"`
Options []string `json:"options"` Options []string `json:"options"`
} }


type BiasPriority struct { type BiasPriority struct {
serder.Metadata `union:"bias"` serder.Metadata `union:"bias"`
ResourcePriorityBase ResourcePriorityBase
Type string `json:"type"`
Options []string `json:"options"` Options []string `json:"options"`
} }


type UploadParams struct {
DataType string `json:"dataType"`
DataName string `json:"dataName"`
UploadInfo UploadInfo `json:"uploadInfo"`
UploadPriority UploadPriority `json:"uploadPriority"`
}

type UploadInfo interface { type UploadInfo interface {
Noop() Noop()
} }
@@ -197,13 +210,16 @@ var _ = serder.UseTypeUnionInternallyTagged(&UploadInfoTypeUnion, "type")
type LocalUploadInfo struct { type LocalUploadInfo struct {
serder.Metadata `union:"local"` serder.Metadata `union:"local"`
UploadInfoBase UploadInfoBase
Type string `json:"type"`
LocalPath string `json:"localPath"` LocalPath string `json:"localPath"`
} }


type RemoteUploadInfo struct { type RemoteUploadInfo struct {
serder.Metadata `union:"url"` serder.Metadata `union:"url"`
UploadInfoBase UploadInfoBase
Url string `json:"url"`
Type string `json:"type"`
Url string `json:"url"`
TargetClusters []schsdk.ClusterID `json:"targetClusters"`
} }


type UploadInfoBase struct{} type UploadInfoBase struct{}
@@ -224,13 +240,15 @@ var _ = serder.UseTypeUnionInternallyTagged(&UploadPriorityTypeUnion, "type")
type Preferences struct { type Preferences struct {
serder.Metadata `union:"preference"` serder.Metadata `union:"preference"`
UploadPriorityBase UploadPriorityBase
Type string `json:"type"`
ResourcePriorities []ResourcePriority `json:"priorities"` ResourcePriorities []ResourcePriority `json:"priorities"`
} }


type SpecifyCluster struct { type SpecifyCluster struct {
serder.Metadata `union:"specify"` serder.Metadata `union:"specify"`
UploadPriorityBase UploadPriorityBase
Clusters []ClusterID `json:"clusters"`
Type string `json:"type"`
Clusters []schsdk.ClusterID `json:"clusters"`
} }


type UploadPriorityBase struct{} type UploadPriorityBase struct{}


+ 105
- 0
sdks/scheduler/models.go View File

@@ -38,6 +38,8 @@ type ECSInstanceID string
type NodeID int64 type NodeID int64
type Address string type Address string


type ClusterID string

type JobSetInfo struct { type JobSetInfo struct {
Jobs []JobInfo `json:"jobs"` Jobs []JobInfo `json:"jobs"`
} }
@@ -54,6 +56,7 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo](
(*UpdateMultiInstanceJobInfo)(nil), (*UpdateMultiInstanceJobInfo)(nil),
(*FinetuningJobInfo)(nil), (*FinetuningJobInfo)(nil),
(*DataPreprocessJobInfo)(nil), (*DataPreprocessJobInfo)(nil),
(*PCMJobInfo)(nil),
) )
var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type")


@@ -76,6 +79,98 @@ type NormalJobInfo struct {
ModelJobInfo ModelJobInfo `json:"modelJobInfo"` ModelJobInfo ModelJobInfo `json:"modelJobInfo"`
} }


type PCMJobInfo struct {
serder.Metadata `union:"PCM"`
JobInfoBase
Type string `json:"type"`
Files JobFilesInfo `json:"files"`
JobResources JobResources `json:"jobResources"`
}

type JobResources struct {
ScheduleStrategy string `json:"scheduleStrategy"` //任务分配策略:负载均衡、积分优先、随机分配等
Clusters []ClusterInfo `json:"clusters"`
}

type ClusterInfo struct {
ClusterID ClusterID `json:"clusterID"`
Resources []Resource `json:"resources"`
Runtime JobRuntimeInfo `json:"runtime"`
}

type Resource struct {
Resource []JobResource `json:"resource"`
}

type JobResource interface {
Noop()
}

var JobResourceTypeUnion = types.NewTypeUnion[JobResource](
(*CPU)(nil),
(*GPU)(nil),
(*NPU)(nil),
(*MLU)(nil),
(*DCU)(nil),
(*Memory)(nil),
(*PRICE)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&JobResourceTypeUnion, "type")

type JobResourceBase struct{}

func (d *JobResourceBase) Noop() {}

type CPU struct {
serder.Metadata `union:"CPU"`
JobResourceBase
Type string `json:"type"`
Number int64 `json:"number"`
}

type GPU struct {
serder.Metadata `union:"GPU"`
JobResourceBase
Type string `json:"type"`
Number int64 `json:"number"`
}

type NPU struct {
serder.Metadata `union:"NPU"`
JobResourceBase
Type string `json:"type"`
Number int64 `json:"number"`
}

type Memory struct {
serder.Metadata `union:"Memory"`
JobResourceBase
Type string `json:"type"`
Number int64 `json:"number"`
}

type DCU struct {
serder.Metadata `union:"DCU"`
JobResourceBase
Type string `json:"type"`
Number int64 `json:"number"`
}

type MLU struct {
serder.Metadata `union:"MLU"`
JobResourceBase
Type string `json:"type"`
Number int64 `json:"number"`
}

type PRICE struct {
serder.Metadata `union:"PRICE"`
JobResourceBase
Type string `json:"type"`
Number int64 `json:"number"`
}

// FinetuningJobInfo 模型微调 // FinetuningJobInfo 模型微调
type FinetuningJobInfo struct { type FinetuningJobInfo struct {
serder.Metadata `union:"Finetuning"` serder.Metadata `union:"Finetuning"`
@@ -154,6 +249,7 @@ type JobFilesInfo struct {
Dataset JobFileInfo `json:"dataset"` Dataset JobFileInfo `json:"dataset"`
Code JobFileInfo `json:"code"` Code JobFileInfo `json:"code"`
Image JobFileInfo `json:"image"` Image JobFileInfo `json:"image"`
Model JobFileInfo `json:"model"`
} }


type JobFileInfo interface { type JobFileInfo interface {
@@ -165,6 +261,7 @@ var FileInfoTypeUnion = types.NewTypeUnion[JobFileInfo](
(*LocalJobFileInfo)(nil), (*LocalJobFileInfo)(nil),
(*DataReturnJobFileInfo)(nil), (*DataReturnJobFileInfo)(nil),
(*ImageJobFileInfo)(nil), (*ImageJobFileInfo)(nil),
(*BindingJobFileInfo)(nil),
) )
var _ = serder.UseTypeUnionInternallyTagged(&FileInfoTypeUnion, "type") var _ = serder.UseTypeUnionInternallyTagged(&FileInfoTypeUnion, "type")


@@ -172,6 +269,13 @@ type JobFileInfoBase struct{}


func (i *JobFileInfoBase) Noop() {} func (i *JobFileInfoBase) Noop() {}


type BindingJobFileInfo struct {
serder.Metadata `union:"Binding"`
JobFileInfoBase
Type string `json:"type"`
BindingID int64 `json:"bindingID"`
}

type PackageJobFileInfo struct { type PackageJobFileInfo struct {
serder.Metadata `union:"Package"` serder.Metadata `union:"Package"`
JobFileInfoBase JobFileInfoBase
@@ -203,6 +307,7 @@ type ImageJobFileInfo struct {
type JobRuntimeInfo struct { type JobRuntimeInfo struct {
Command string `json:"command"` Command string `json:"command"`
Envs []KVPair `json:"envs"` Envs []KVPair `json:"envs"`
Params []KVPair `json:"params"`
} }


type KVPair struct { type KVPair struct {


+ 57
- 0
sdks/uploader/client.go View File

@@ -0,0 +1,57 @@
package uploadersdk

import (
"fmt"

"gitlink.org.cn/cloudream/common/sdks"
)

type response[T any] struct {
Code int `json:"code"`
Message string `json:"message"`
Data T `json:"data"`
}

const (
ResponseCodeOK int = 200
)

func (r *response[T]) ToError() *sdks.CodeMessageError {
return &sdks.CodeMessageError{
Code: fmt.Sprintf("%d", r.Code),
Message: r.Message,
}
}

type Client struct {
baseURL string
}

func NewClient(cfg *Config) *Client {
return &Client{
baseURL: cfg.URL,
}
}

type Pool interface {
Acquire() (*Client, error)
Release(cli *Client)
}

type pool struct {
cfg *Config
}

func NewPool(cfg *Config) Pool {
return &pool{
cfg: cfg,
}
}
func (p *pool) Acquire() (*Client, error) {
cli := NewClient(p.cfg)
return cli, nil
}

func (p *pool) Release(cli *Client) {

}

+ 5
- 0
sdks/uploader/config.go View File

@@ -0,0 +1,5 @@
package uploadersdk

type Config struct {
URL string `json:"url"`
}

+ 28
- 0
sdks/uploader/models.go View File

@@ -0,0 +1,28 @@
package uploadersdk

import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"

type ClusterID string

type BlockChain struct {
DataID DataID `gorm:"column:dataID" json:"dataID"`
BlockChainID string `gorm:"column:blockChainID" json:"blockChainID"`
FileName string `gorm:"column:fileName" json:"fileName"`
FileHash string `gorm:"column:fileHash" json:"fileHash"`
FileSize int64 `gorm:"column:fileSize" json:"fileSize"`
}

func (BlockChain) TableName() string {
return "BlockChain" // 确保和数据库中的表名一致
}

type BindingData struct {
ID DataID `gorm:"column:ID" json:"ID"`
UserID cdssdk.UserID `gorm:"column:userID" json:"userID"`
BindingName string `gorm:"column:bindingName" json:"bindingName"`
BindingType string `gorm:"column:bindingType" json:"bindingType"`
}

func (BindingData) TableName() string {
return "BindingData" // 确保和数据库中的表名一致
}

+ 182
- 0
sdks/uploader/uploader.go View File

@@ -0,0 +1,182 @@
package uploadersdk

import (
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/types"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder"
"net/url"
"strings"
"time"
)

type DataID int64

type Cluster struct {
DataID DataID `gorm:"column:dataID" json:"dataID"`
ClusterID schsdk.ClusterID `gorm:"column:clusterID" json:"clusterID"`
StorageID cdssdk.StorageID `gorm:"column:storageID" json:"storageID"`
}

func (Cluster) TableName() string {
return "UploadedCluster" // 确保和数据库中的表名一致
}

type UploadedData struct {
ID DataID `gorm:"column:id;primaryKey" json:"id"`
UserID cdssdk.UserID `gorm:"column:userID" json:"userID"`
Name string `gorm:"column:name" json:"name"`
DataType string `gorm:"column:dataType" json:"dataType"`
PackageID cdssdk.PackageID `gorm:"column:packageID" json:"packageID"`
JsonData string `gorm:"column:jsonData" json:"jsonData"` // JSON 数据字段
BindingID DataID `gorm:"column:bindingID" json:"bindingID"`
UploadTime time.Time `gorm:"column:uploadTime" json:"uploadTime"`
UploadedCluster []Cluster `gorm:"foreignKey:dataID;references:id" json:"clusters"` // 关联 Cluster 数据
BlockChain []BlockChain `gorm:"foreignKey:dataID;references:id" json:"blockChains"` // 关联 BlockChain 数据
}

type DataScheduleReq struct {
PackageID cdssdk.PackageID `json:"packageID"`
DataType string `json:"dataType"`
Clusters []Cluster `json:"clusters"`
}

type codeRepository struct {
RepositoryName string
ClusterID ClusterID
}

type DataScheduleResp struct {
Results []sch.DataScheduleResult `json:"results"`
}

func (c *Client) DataSchedule(req DataScheduleReq) (*DataScheduleResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/jobSet/schedule")
if err != nil {
return nil, err
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[DataScheduleResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
}

type UploadReq struct {
Type string `json:"type"`
Source UploadSource `json:"source"`
Target UploadTarget `json:"target"`
StorageIDs []cdssdk.StorageID `json:"storageIDs"`
}

type UploadSource interface {
Noop()
}

var UploadSourceTypeUnion = types.NewTypeUnion[UploadSource](
(*PackageSource)(nil),
(*UrlSource)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&UploadSourceTypeUnion, "type")

type PackageSource struct {
serder.Metadata `union:"packageSource"`
UploadSourceBase
Type string `json:"type"`
PackageID cdssdk.PackageID `json:"packageID"`
}

type UrlSource struct {
serder.Metadata `union:"urlSource"`
UploadSourceBase
Type string `json:"type"`
Url string `json:"url"`
}

type UploadSourceBase struct{}

func (d *UploadSourceBase) Noop() {}

type UploadTarget interface {
Noop()
}

var UploadTargetTypeUnion = types.NewTypeUnion[UploadTarget](
(*UrlTarget)(nil),
(*ApiTarget)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&UploadTargetTypeUnion, "type")

type UrlTarget struct {
serder.Metadata `union:"url"`
UploadTargetBase
Clusters []ClusterID `json:"clusters"`
}

type ApiTarget struct {
serder.Metadata `union:"api"`
UploadTargetBase
Clusters []ClusterID `json:"clusters"`
}

type UploadTargetBase struct{}

func (d *UploadTargetBase) Noop() {}

type UploadResp struct {
PackageID cdssdk.PackageID `json:"packageID"`
JsonData string `json:"jsonData"`
}

func (c *Client) Upload(req UploadReq) (*UploadResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/data/upload")
if err != nil {
return nil, err
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[UploadResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
}

+ 4
- 2
utils/config/config.go View File

@@ -3,10 +3,9 @@ package config
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/imdario/mergo"
"os" "os"
"path/filepath" "path/filepath"

"github.com/imdario/mergo"
) )


// Load 从本地文件读取配置,加载配置文件 // Load 从本地文件读取配置,加载配置文件
@@ -32,6 +31,9 @@ func DefaultLoad(modeulName string, defCfg interface{}) error {
// TODO 可以考虑根据环境变量读取不同的配置 // TODO 可以考虑根据环境变量读取不同的配置
// filepath.Join用于将多个路径组合成一个路径 // filepath.Join用于将多个路径组合成一个路径
configFilePath := filepath.Join(filepath.Dir(execPath), "..", "confs", fmt.Sprintf("%s.config.json", modeulName)) configFilePath := filepath.Join(filepath.Dir(execPath), "..", "confs", fmt.Sprintf("%s.config.json", modeulName))

configFilePath = "D:\\Work\\Codes\\workspace\\workspace\\scheduler\\common\\assets\\confs\\middleware.json"

return Load(configFilePath, defCfg) return Load(configFilePath, defCfg)
} }




Loading…
Cancel
Save