Browse Source

优化任务提交等接口

feature_wq
JeshuaRen 9 months ago
parent
commit
d60ca055b7
8 changed files with 217 additions and 127 deletions
  1. +9
    -5
      sdks/blockchain/blockchain.go
  2. +6
    -0
      sdks/pcmscheduler/client.go
  3. +13
    -8
      sdks/pcmscheduler/jobset.go
  4. +18
    -6
      sdks/pcmscheduler/models.go
  5. +4
    -1
      sdks/scheduler/models.go
  6. +139
    -12
      sdks/uploader/models.go
  7. +26
    -95
      sdks/uploader/uploader.go
  8. +2
    -0
      utils/config/config.go

+ 9
- 5
sdks/blockchain/blockchain.go View File

@@ -16,35 +16,39 @@ type InvokeReq struct {
Args []string `json:"args"`
}

func (c *Client) BlockChainInvoke(req InvokeReq) error {
func (c *Client) BlockChainInvoke(req InvokeReq, token string) error {
targetUrl, err := url.JoinPath(c.baseURL, "/jcc-bcos/contract/invoke")
if err != nil {
return err
}

token, err := c.getToken()
if err != nil {
return err
}
//token, err := c.getToken()
//if err != nil {
// return err
//}

header := make(map[string]string)
header["Content-Type"] = http2.ContentTypeJSON
header["Authorization"] = token
println("token: " + token)

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
Header: header,
})
if err != nil {
println(err)
return err
}

var codeResp response[string]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
println(err)
return fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code != ResponseCodeOK {
println(codeResp.ToError().Message)
return codeResp.ToError()
}



+ 6
- 0
sdks/pcmscheduler/client.go View File

@@ -12,6 +12,12 @@ type response[T any] struct {
Data T `json:"data"`
}

type respons2[T any] struct {
Code int `json:"code"`
Message string `json:"msg"`
Data T `json:"data"`
}

const (
ResponseCodeOK int = 200
)


+ 13
- 8
sdks/pcmscheduler/jobset.go View File

@@ -48,6 +48,8 @@ func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error)
}

type CreateJobReq struct {
//Name string `json:"name"`
//Description string `json:"description"`
JobResources schsdk.JobResources `json:"jobResources"`
DataDistribute DataDistribute `json:"dataDistributes"`
}
@@ -123,7 +125,7 @@ func (c *Client) CreateJob(req CreateJobReq) (*CreateJobResp, error) {

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[CreateJobResp]
var codeResp respons2[CreateJobResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}
@@ -132,7 +134,9 @@ func (c *Client) CreateJob(req CreateJobReq) (*CreateJobResp, error) {
return &codeResp.Data, nil
}

return nil, codeResp.ToError()
//return nil, codeResp.ToError()
println(codeResp.Message)
return nil, err
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
@@ -145,10 +149,11 @@ type RunJobReq struct {
}

type DataScheduleResult struct {
Clusters DataDetail `json:"clusters"`
PackageID cdssdk.PackageID `json:"packageID"`
Status bool `json:"status"`
Msg string `json:"msg"`
Clusters []DataDetail `json:"clusters"`
PackageID cdssdk.PackageID `json:"packageID"`
PackageFullPath string `json:"packageFullPath"`
Status bool `json:"status"`
Msg string `json:"msg"`
}

type DataScheduleResults struct {
@@ -171,7 +176,7 @@ func (c *Client) RunJob(req RunJobReq) error {

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[string]
var codeResp respons2[string]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return fmt.Errorf("parsing response: %w", err)
}
@@ -180,7 +185,7 @@ func (c *Client) RunJob(req RunJobReq) error {
return nil
}

return codeResp.ToError()
return fmt.Errorf("error: %s", codeResp.Message)
}

return fmt.Errorf("unknow response content type: %s", contType)


+ 18
- 6
sdks/pcmscheduler/models.go View File

@@ -32,6 +32,16 @@ const (
RejectedStatus = "rejected"
PendingStatus = "pending"
ApprovedStatus = "approved"
RevokedStatus = "revoked"
CancelStatus = "cancel"
ExpiredStatus = "expired"

ApplyAccess = "apply"
PrivateAccess = "private"
PublicAccess = "public"

PreferencePriority = "preference"
SpecifyClusterPriority = "specify"
)

type TaskID int64
@@ -269,10 +279,11 @@ type LocalUploadInfo struct {
type RemoteUploadInfo struct {
serder.Metadata `union:"url"`
UploadInfoBase
Type string `json:"type"`
Url string `json:"url"`
DataName string `json:"dataName"`
TargetClusters []schsdk.ClusterID `json:"targetClusters"`
Type string `json:"type"`
Url string `json:"url"`
DataName string `json:"dataName"`
Cluster schsdk.ClusterID `json:"clusterID"`
PackageID cdssdk.PackageID `json:"packageID"`
}

type UploadInfoBase struct{}
@@ -428,6 +439,7 @@ type ApplyLevel struct {
type PublicLevel struct {
serder.Metadata `union:"public"`
QueryBindingDataParamBase
Type string `json:"type" binding:"required"`
Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现
UserID cdssdk.UserID `json:"userID" binding:"required"`
Type string `json:"type" binding:"required"`
Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现
}

+ 4
- 1
sdks/scheduler/models.go View File

@@ -86,12 +86,15 @@ type PCMJobInfo struct {
serder.Metadata `union:"PCM"`
JobInfoBase
Type string `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
Files JobFilesInfo `json:"files"`
JobResources JobResources `json:"jobResources"`
}

type JobResources struct {
ScheduleStrategy string `json:"scheduleStrategy"` //任务分配策略:负载均衡、积分优先、随机分配等
//任务分配策略:负载均衡、积分优先、随机分配等,dataLocality, leastLoadFirst
ScheduleStrategy string `json:"scheduleStrategy"`
Clusters []ClusterInfo `json:"clusters"`
}



+ 139
- 12
sdks/uploader/models.go View File

@@ -1,8 +1,11 @@
package uploadersdk

import (
"gitlink.org.cn/cloudream/common/pkgs/types"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/utils/serder"
"time"
)

@@ -22,11 +25,13 @@ func (BlockChain) TableName() string {
}

type Binding struct {
ID DataID `gorm:"column:id;primaryKey;autoIncrement" json:"ID"`
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
Name string `gorm:"column:name" json:"Name"`
DataType string `gorm:"column:data_type" json:"dataType"`
Content string `gorm:"column:content" json:"Content"`
ID DataID `gorm:"column:id;primaryKey;autoIncrement" json:"ID"`
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
Name string `gorm:"column:name" json:"Name"`
DataType string `gorm:"column:data_type" json:"dataType"`
Content string `gorm:"column:content" json:"Content"`
AccessLevel string `gorm:"column:access_level" json:"accessLevel"`
CreateTime time.Time `gorm:"column:created_at" json:"createTime"`
}

type BindingAccessData struct {
@@ -35,16 +40,19 @@ type BindingAccessData struct {
Name string `gorm:"column:name" json:"Name"`
DataType string `gorm:"column:data_type" json:"dataType"`
Content string `gorm:"column:content" json:"Content"`
ApplicantID cdssdk.UserID `json:"applicant_id"`
Status string `json:"status"`
AccessLevel string `gorm:"column:access_level" json:"accessLevel"`
ApplicantID cdssdk.UserID `gorm:"column:applicant_id" json:"applicantID"`
Status string `gorm:"column:status" json:"status"`
}

type BindingDetail struct {
ID DataID `json:"ID"`
Name string `gorm:"column:name" json:"Name"`
Info sch.DataBinding `json:"info"`
Packages []Package `json:"packages"`
Status string `json:"status"`
ID DataID `json:"ID"`
UserID cdssdk.UserID `json:"ownerID"`
Name string `json:"Name"`
Info sch.DataBinding `json:"info"`
Packages []Package `json:"packages"`
Status string `json:"status"`
AccessLevel string `json:"accessLevel"`
}

func (Binding) TableName() string {
@@ -60,3 +68,122 @@ type Folder struct {
func (Folder) TableName() string {
return "folders"
}

type DataID int64
type FolderID int64

type Cluster struct {
PackageID cdssdk.PackageID `gorm:"column:package_id" json:"PackageID"`
ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"`
StorageID cdssdk.StorageID `gorm:"column:storage_id" json:"storageID"`
}

func (Cluster) TableName() string {
return "uploadedCluster" // 确保和数据库中的表名一致
}

type Package struct {
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"`
PackageName string `gorm:"column:package_name" json:"packageName"`
BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"`
DataType string `gorm:"column:data_type" json:"dataType"`
JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段
BindingID DataID `gorm:"column:binding_id" json:"bindingID"`
CreateTime time.Time `gorm:"column:create_time" json:"createTime"`
Objects []cdssdk.Object `gorm:"column:objects" json:"objects"`
UploadedCluster []Cluster `gorm:"column:uploadedCluster" json:"uploadedCluster"`
Versions []PackageVersion `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"`
//BlockChain []BlockChain `gorm:"foreignKey:package_id;references:package_id" json:"blockChains"` // 关联 BlockChain 数据
UploadPriority sch.UploadPriority `gorm:"column:upload_priority" json:"uploadPriority"`
}

type PackageDAO struct {
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"`
PackageName string `gorm:"column:package_name" json:"packageName"`
BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"`
DataType string `gorm:"column:data_type" json:"dataType"`
JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段
BindingID DataID `gorm:"column:binding_id" json:"bindingID"`
CreateTime time.Time `gorm:"column:create_time" json:"createTime"`
UploadedCluster []Cluster `gorm:"foreignKey:package_id;references:package_id" json:"clusters"` // 关联 Cluster 数据
Versions []PackageVersion `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"`
UploadPriority string `gorm:"column:upload_priority" json:"uploadPriority"`
}

type PackageVersion struct {
ParentPackageID cdssdk.PackageID `gorm:"column:parent_package_id" json:"parentPackageID"`
PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"`
PackageName string `gorm:"column:package_name" json:"packageName"`
Version int64 `gorm:"column:package_version" json:"version"`
}

func (PackageVersion) TableName() string {
return "packageVersion" // 确保和数据库中的表名一致
}

//type PackageScheduleType interface {
// Noop()
//}
//
//var PackageScheduleTypeUnion = types.NewTypeUnion[PackageScheduleType](
// (*PackagePreferencesSchedule)(nil),
// (*PackageSpecifyClusterSchedule)(nil),
//)
//
//var _ = serder.UseTypeUnionInternallyTagged(&PackageScheduleTypeUnion, "type")
//
//type PackageScheduleBase struct{}
//
//func (d *PackageScheduleBase) Noop() {}
//
//type PackagePreferencesSchedule struct {
// serder.Metadata `union:"region"`
// PackageScheduleBase
// Type string `json:"type"`
// a sch.Preferences
//}
//
//type PackageSpecifyClusterSchedule struct {
// serder.Metadata `union:"region"`
// PackageScheduleBase
// Type string `json:"type"`
//}

type ScheduleTarget interface {
Noop()
}

var DataScheduleTargetTypeUnion = types.NewTypeUnion[ScheduleTarget](
(*JCSScheduleTarget)(nil),
(*UrlScheduleTarget)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&DataScheduleTargetTypeUnion, "type")

type ScheduleTargetBase struct{}

func (d *ScheduleTargetBase) Noop() {}

type JCSScheduleTarget struct {
ScheduleTargetBase
UserID cdssdk.UserID `json:"userID"`
ScheduleStorages []ScheduleStorage `json:"scheduleStorages"`
}

type UrlScheduleTarget struct {
ScheduleTargetBase
ScheduleUrls []ScheduleUrl `json:"scheduleUrls"`
}

type ScheduleUrl struct {
ClusterID ClusterID `json:"clusterID"`
//RepositoryName string `json:"repositoryName"`
JsonData string `json:"jsonData"`
}

type ScheduleStorage struct {
StorageID cdssdk.StorageID `json:"storageID"`
RootPath string `json:"rootPath"`
}

+ 26
- 95
sdks/uploader/uploader.go View File

@@ -4,115 +4,31 @@ import (
"fmt"
"gitlink.org.cn/cloudream/common/pkgs/types"
sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
"gitlink.org.cn/cloudream/common/sdks/storage/cdsapi"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder"
"net/url"
"strings"
"time"
)

type DataID int64
type FolderID int64

type Cluster struct {
PackageID cdssdk.PackageID `gorm:"column:package_id" json:"PackageID"`
ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"`
StorageID cdssdk.StorageID `gorm:"column:storage_id" json:"storageID"`
}

func (Cluster) TableName() string {
return "uploadedCluster" // 确保和数据库中的表名一致
}

type Package struct {
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"`
PackageName string `gorm:"column:package_name" json:"packageName"`
BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"`
DataType string `gorm:"column:data_type" json:"dataType"`
JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段
BindingID DataID `gorm:"column:binding_id" json:"bindingID"`
CreateTime time.Time `gorm:"column:create_time" json:"createTime"`
Objects []cdssdk.Object `gorm:"column:objects" json:"objects"`
UploadedCluster []Cluster `gorm:"column:uploadedCluster" json:"uploadedCluster"`
Versions []PackageVersion `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"`
//BlockChain []BlockChain `gorm:"foreignKey:package_id;references:package_id" json:"blockChains"` // 关联 BlockChain 数据
}

type PackageDAO struct {
UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"`
PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"`
PackageName string `gorm:"column:package_name" json:"packageName"`
BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"`
DataType string `gorm:"column:data_type" json:"dataType"`
JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段
BindingID DataID `gorm:"column:binding_id" json:"bindingID"`
CreateTime time.Time `gorm:"column:create_time" json:"createTime"`
UploadedCluster []Cluster `gorm:"foreignKey:package_id;references:package_id" json:"clusters"` // 关联 Cluster 数据
Versions []PackageVersion `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"`
}

type PackageVersion struct {
ParentPackageID cdssdk.PackageID `gorm:"column:parent_package_id" json:"parentPackageID"`
PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"`
PackageName string `gorm:"column:package_name" json:"packageName"`
Version int64 `gorm:"column:package_version" json:"version"`
}

func (PackageVersion) TableName() string {
return "packageVersion" // 确保和数据库中的表名一致
}

type DataScheduleReq struct {
PackageID cdssdk.PackageID `json:"packageID"`
DataType string `json:"dataType"`
ScheduleTarget ScheduleTarget `json:"scheduleTarget"`
}

type ScheduleTarget interface {
Noop()
}

var DataScheduleTargetTypeUnion = types.NewTypeUnion[ScheduleTarget](
(*JCSScheduleTarget)(nil),
(*UrlScheduleTarget)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&DataScheduleTargetTypeUnion, "type")

type ScheduleTargetBase struct{}

func (d *ScheduleTargetBase) Noop() {}

type JCSScheduleTarget struct {
ScheduleTargetBase
UserID cdssdk.UserID `json:"userID"`
ScheduleStorages []ScheduleStorage `json:"scheduleStorages"`
}

type UrlScheduleTarget struct {
ScheduleTargetBase
ScheduleUrls []ScheduleUrl `json:"scheduleUrls"`
}

type ScheduleUrl struct {
ClusterID ClusterID `json:"clusterID"`
//RepositoryName string `json:"repositoryName"`
JsonData string `json:"jsonData"`
}

type ScheduleStorage struct {
StorageID cdssdk.StorageID `json:"storageID"`
RootPath string `json:"rootPath"`
}

//type DataScheduleResp struct {
// Results []sch.DataScheduleResult `json:"data"`
//}

type TmpDataScheduleResult struct {
Cluster sch.DataDetail `json:"cluster"`
PackageID cdssdk.PackageID `json:"packageID"`
Status bool `json:"status"`
Msg string `json:"msg"`
}

func (c *Client) DataSchedule(req DataScheduleReq) ([]sch.DataScheduleResult, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/dataSchedule")
if err != nil {
@@ -125,17 +41,29 @@ func (c *Client) DataSchedule(req DataScheduleReq) ([]sch.DataScheduleResult, er
if err != nil {
return nil, err
}
println(resp.Body)

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[[]sch.DataScheduleResult]
var codeResp response[[]TmpDataScheduleResult]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {

return codeResp.Data, nil
var results []sch.DataScheduleResult
for _, tmpResult := range codeResp.Data {
result := sch.DataScheduleResult{
PackageID: tmpResult.PackageID,
Status: tmpResult.Status,
Msg: tmpResult.Msg,
Clusters: []sch.DataDetail{
tmpResult.Cluster,
},
}
results = append(results, result)
}
return results, nil
}

return nil, codeResp.ToError()
@@ -194,7 +122,7 @@ var _ = serder.UseTypeUnionInternallyTagged(&UploadTargetTypeUnion, "type")
type UrlTarget struct {
serder.Metadata `union:"url"`
UploadTargetBase
Clusters []ClusterID `json:"clusters"`
ClusterID ClusterID `json:"clusterId"`
JCSUploadInfo cdsapi.ObjectUploadInfo `form:"JCSUploadInfo"`
}

@@ -211,7 +139,10 @@ func (d *UploadTargetBase) Noop() {}
type UploadResp struct {
PackageID cdssdk.PackageID `json:"packageID"`
ObjectIDs []cdssdk.ObjectID `json:"objectIDs"`
ClusterID ClusterID `json:"clusterID"`
JsonData string `json:"jsonData"`
Status bool `json:"status"`
Message string `json:"message"`
}

func (c *Client) Upload(req UploadReq) (*UploadResp, error) {


+ 2
- 0
utils/config/config.go View File

@@ -33,6 +33,8 @@ func DefaultLoad(modeulName string, defCfg interface{}) error {
// filepath.Join用于将多个路径组合成一个路径
configFilePath := filepath.Join(filepath.Dir(execPath), "..", "confs", fmt.Sprintf("%s.config.json", modeulName))

configFilePath = "D:\\Work\\Codes\\workspace\\workspace\\scheduler\\common\\assets\\confs\\middleware.json"

return Load(configFilePath, defCfg)
}



Loading…
Cancel
Save