diff --git a/sdks/blockchain/blockchain.go b/sdks/blockchain/blockchain.go index b85b859..a7cb793 100644 --- a/sdks/blockchain/blockchain.go +++ b/sdks/blockchain/blockchain.go @@ -16,35 +16,39 @@ type InvokeReq struct { Args []string `json:"args"` } -func (c *Client) BlockChainInvoke(req InvokeReq) error { +func (c *Client) BlockChainInvoke(req InvokeReq, token string) error { targetUrl, err := url.JoinPath(c.baseURL, "/jcc-bcos/contract/invoke") if err != nil { return err } - token, err := c.getToken() - if err != nil { - return err - } + //token, err := c.getToken() + //if err != nil { + // return err + //} header := make(map[string]string) header["Content-Type"] = http2.ContentTypeJSON header["Authorization"] = token + println("token: " + token) resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ Body: req, Header: header, }) if err != nil { + println(err) return err } var codeResp response[string] if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + println(err) return fmt.Errorf("parsing response: %w", err) } if codeResp.Code != ResponseCodeOK { + println(codeResp.ToError().Message) return codeResp.ToError() } diff --git a/sdks/pcmscheduler/client.go b/sdks/pcmscheduler/client.go index a058dc7..fde07fc 100644 --- a/sdks/pcmscheduler/client.go +++ b/sdks/pcmscheduler/client.go @@ -12,6 +12,12 @@ type response[T any] struct { Data T `json:"data"` } +type respons2[T any] struct { + Code int `json:"code"` + Message string `json:"msg"` + Data T `json:"data"` +} + const ( ResponseCodeOK int = 200 ) diff --git a/sdks/pcmscheduler/jobset.go b/sdks/pcmscheduler/jobset.go index f06eca1..0248ff0 100644 --- a/sdks/pcmscheduler/jobset.go +++ b/sdks/pcmscheduler/jobset.go @@ -48,6 +48,8 @@ func (c *Client) GetClusterInfo(req GetClusterInfoReq) ([]ClusterDetail, error) } type CreateJobReq struct { + //Name string `json:"name"` + //Description string `json:"description"` JobResources schsdk.JobResources `json:"jobResources"` DataDistribute DataDistribute `json:"dataDistributes"` } @@ -123,7 +125,7 @@ func (c *Client) CreateJob(req CreateJobReq) (*CreateJobResp, error) { contType := resp.Header.Get("Content-Type") if strings.Contains(contType, http2.ContentTypeJSON) { - var codeResp response[CreateJobResp] + var codeResp respons2[CreateJobResp] if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { return nil, fmt.Errorf("parsing response: %w", err) } @@ -132,7 +134,9 @@ func (c *Client) CreateJob(req CreateJobReq) (*CreateJobResp, error) { return &codeResp.Data, nil } - return nil, codeResp.ToError() + //return nil, codeResp.ToError() + println(codeResp.Message) + return nil, err } return nil, fmt.Errorf("unknow response content type: %s", contType) @@ -145,10 +149,11 @@ type RunJobReq struct { } type DataScheduleResult struct { - Clusters DataDetail `json:"clusters"` - PackageID cdssdk.PackageID `json:"packageID"` - Status bool `json:"status"` - Msg string `json:"msg"` + Clusters []DataDetail `json:"clusters"` + PackageID cdssdk.PackageID `json:"packageID"` + PackageFullPath string `json:"packageFullPath"` + Status bool `json:"status"` + Msg string `json:"msg"` } type DataScheduleResults struct { @@ -171,7 +176,7 @@ func (c *Client) RunJob(req RunJobReq) error { contType := resp.Header.Get("Content-Type") if strings.Contains(contType, http2.ContentTypeJSON) { - var codeResp response[string] + var codeResp respons2[string] if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { return fmt.Errorf("parsing response: %w", err) } @@ -180,7 +185,7 @@ func (c *Client) RunJob(req RunJobReq) error { return nil } - return codeResp.ToError() + return fmt.Errorf("error: %s", codeResp.Message) } return fmt.Errorf("unknow response content type: %s", contType) diff --git a/sdks/pcmscheduler/models.go b/sdks/pcmscheduler/models.go index fefa7c8..0a09be7 100644 --- a/sdks/pcmscheduler/models.go +++ b/sdks/pcmscheduler/models.go @@ -32,6 +32,16 @@ const ( RejectedStatus = "rejected" PendingStatus = "pending" ApprovedStatus = "approved" + RevokedStatus = "revoked" + CancelStatus = "cancel" + ExpiredStatus = "expired" + + ApplyAccess = "apply" + PrivateAccess = "private" + PublicAccess = "public" + + PreferencePriority = "preference" + SpecifyClusterPriority = "specify" ) type TaskID int64 @@ -269,10 +279,11 @@ type LocalUploadInfo struct { type RemoteUploadInfo struct { serder.Metadata `union:"url"` UploadInfoBase - Type string `json:"type"` - Url string `json:"url"` - DataName string `json:"dataName"` - TargetClusters []schsdk.ClusterID `json:"targetClusters"` + Type string `json:"type"` + Url string `json:"url"` + DataName string `json:"dataName"` + Cluster schsdk.ClusterID `json:"clusterID"` + PackageID cdssdk.PackageID `json:"packageID"` } type UploadInfoBase struct{} @@ -428,6 +439,7 @@ type ApplyLevel struct { type PublicLevel struct { serder.Metadata `union:"public"` QueryBindingDataParamBase - Type string `json:"type" binding:"required"` - Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现 + UserID cdssdk.UserID `json:"userID" binding:"required"` + Type string `json:"type" binding:"required"` + Info DataBinding `json:"info"` // 可选,用于精细筛选,功能暂未实现 } diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 10bd51b..906796d 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -86,12 +86,15 @@ type PCMJobInfo struct { serder.Metadata `union:"PCM"` JobInfoBase Type string `json:"type"` + Name string `json:"name"` + Description string `json:"description"` Files JobFilesInfo `json:"files"` JobResources JobResources `json:"jobResources"` } type JobResources struct { - ScheduleStrategy string `json:"scheduleStrategy"` //任务分配策略:负载均衡、积分优先、随机分配等 + //任务分配策略:负载均衡、积分优先、随机分配等,dataLocality, leastLoadFirst + ScheduleStrategy string `json:"scheduleStrategy"` Clusters []ClusterInfo `json:"clusters"` } diff --git a/sdks/uploader/models.go b/sdks/uploader/models.go index 33e51e2..82c7ee1 100644 --- a/sdks/uploader/models.go +++ b/sdks/uploader/models.go @@ -1,8 +1,11 @@ package uploadersdk import ( + "gitlink.org.cn/cloudream/common/pkgs/types" sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/utils/serder" "time" ) @@ -22,11 +25,13 @@ func (BlockChain) TableName() string { } type Binding struct { - ID DataID `gorm:"column:id;primaryKey;autoIncrement" json:"ID"` - UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"` - Name string `gorm:"column:name" json:"Name"` - DataType string `gorm:"column:data_type" json:"dataType"` - Content string `gorm:"column:content" json:"Content"` + ID DataID `gorm:"column:id;primaryKey;autoIncrement" json:"ID"` + UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"` + Name string `gorm:"column:name" json:"Name"` + DataType string `gorm:"column:data_type" json:"dataType"` + Content string `gorm:"column:content" json:"Content"` + AccessLevel string `gorm:"column:access_level" json:"accessLevel"` + CreateTime time.Time `gorm:"column:created_at" json:"createTime"` } type BindingAccessData struct { @@ -35,16 +40,19 @@ type BindingAccessData struct { Name string `gorm:"column:name" json:"Name"` DataType string `gorm:"column:data_type" json:"dataType"` Content string `gorm:"column:content" json:"Content"` - ApplicantID cdssdk.UserID `json:"applicant_id"` - Status string `json:"status"` + AccessLevel string `gorm:"column:access_level" json:"accessLevel"` + ApplicantID cdssdk.UserID `gorm:"column:applicant_id" json:"applicantID"` + Status string `gorm:"column:status" json:"status"` } type BindingDetail struct { - ID DataID `json:"ID"` - Name string `gorm:"column:name" json:"Name"` - Info sch.DataBinding `json:"info"` - Packages []Package `json:"packages"` - Status string `json:"status"` + ID DataID `json:"ID"` + UserID cdssdk.UserID `json:"ownerID"` + Name string `json:"Name"` + Info sch.DataBinding `json:"info"` + Packages []Package `json:"packages"` + Status string `json:"status"` + AccessLevel string `json:"accessLevel"` } func (Binding) TableName() string { @@ -60,3 +68,122 @@ type Folder struct { func (Folder) TableName() string { return "folders" } + +type DataID int64 +type FolderID int64 + +type Cluster struct { + PackageID cdssdk.PackageID `gorm:"column:package_id" json:"PackageID"` + ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"` + StorageID cdssdk.StorageID `gorm:"column:storage_id" json:"storageID"` +} + +func (Cluster) TableName() string { + return "uploadedCluster" // 确保和数据库中的表名一致 +} + +type Package struct { + UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"` + PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"` + PackageName string `gorm:"column:package_name" json:"packageName"` + BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"` + DataType string `gorm:"column:data_type" json:"dataType"` + JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段 + BindingID DataID `gorm:"column:binding_id" json:"bindingID"` + CreateTime time.Time `gorm:"column:create_time" json:"createTime"` + Objects []cdssdk.Object `gorm:"column:objects" json:"objects"` + UploadedCluster []Cluster `gorm:"column:uploadedCluster" json:"uploadedCluster"` + Versions []PackageVersion `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"` + //BlockChain []BlockChain `gorm:"foreignKey:package_id;references:package_id" json:"blockChains"` // 关联 BlockChain 数据 + UploadPriority sch.UploadPriority `gorm:"column:upload_priority" json:"uploadPriority"` +} + +type PackageDAO struct { + UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"` + PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"` + PackageName string `gorm:"column:package_name" json:"packageName"` + BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"` + DataType string `gorm:"column:data_type" json:"dataType"` + JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段 + BindingID DataID `gorm:"column:binding_id" json:"bindingID"` + CreateTime time.Time `gorm:"column:create_time" json:"createTime"` + UploadedCluster []Cluster `gorm:"foreignKey:package_id;references:package_id" json:"clusters"` // 关联 Cluster 数据 + Versions []PackageVersion `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"` + UploadPriority string `gorm:"column:upload_priority" json:"uploadPriority"` +} + +type PackageVersion struct { + ParentPackageID cdssdk.PackageID `gorm:"column:parent_package_id" json:"parentPackageID"` + PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"` + PackageName string `gorm:"column:package_name" json:"packageName"` + Version int64 `gorm:"column:package_version" json:"version"` +} + +func (PackageVersion) TableName() string { + return "packageVersion" // 确保和数据库中的表名一致 +} + +//type PackageScheduleType interface { +// Noop() +//} +// +//var PackageScheduleTypeUnion = types.NewTypeUnion[PackageScheduleType]( +// (*PackagePreferencesSchedule)(nil), +// (*PackageSpecifyClusterSchedule)(nil), +//) +// +//var _ = serder.UseTypeUnionInternallyTagged(&PackageScheduleTypeUnion, "type") +// +//type PackageScheduleBase struct{} +// +//func (d *PackageScheduleBase) Noop() {} +// +//type PackagePreferencesSchedule struct { +// serder.Metadata `union:"region"` +// PackageScheduleBase +// Type string `json:"type"` +// a sch.Preferences +//} +// +//type PackageSpecifyClusterSchedule struct { +// serder.Metadata `union:"region"` +// PackageScheduleBase +// Type string `json:"type"` +//} + +type ScheduleTarget interface { + Noop() +} + +var DataScheduleTargetTypeUnion = types.NewTypeUnion[ScheduleTarget]( + (*JCSScheduleTarget)(nil), + (*UrlScheduleTarget)(nil), +) + +var _ = serder.UseTypeUnionInternallyTagged(&DataScheduleTargetTypeUnion, "type") + +type ScheduleTargetBase struct{} + +func (d *ScheduleTargetBase) Noop() {} + +type JCSScheduleTarget struct { + ScheduleTargetBase + UserID cdssdk.UserID `json:"userID"` + ScheduleStorages []ScheduleStorage `json:"scheduleStorages"` +} + +type UrlScheduleTarget struct { + ScheduleTargetBase + ScheduleUrls []ScheduleUrl `json:"scheduleUrls"` +} + +type ScheduleUrl struct { + ClusterID ClusterID `json:"clusterID"` + //RepositoryName string `json:"repositoryName"` + JsonData string `json:"jsonData"` +} + +type ScheduleStorage struct { + StorageID cdssdk.StorageID `json:"storageID"` + RootPath string `json:"rootPath"` +} diff --git a/sdks/uploader/uploader.go b/sdks/uploader/uploader.go index f9951c2..a421d7b 100644 --- a/sdks/uploader/uploader.go +++ b/sdks/uploader/uploader.go @@ -4,115 +4,31 @@ import ( "fmt" "gitlink.org.cn/cloudream/common/pkgs/types" sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler" - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" "gitlink.org.cn/cloudream/common/utils/http2" "gitlink.org.cn/cloudream/common/utils/serder" "net/url" "strings" - "time" ) -type DataID int64 -type FolderID int64 - -type Cluster struct { - PackageID cdssdk.PackageID `gorm:"column:package_id" json:"PackageID"` - ClusterID schsdk.ClusterID `gorm:"column:cluster_id" json:"clusterID"` - StorageID cdssdk.StorageID `gorm:"column:storage_id" json:"storageID"` -} - -func (Cluster) TableName() string { - return "uploadedCluster" // 确保和数据库中的表名一致 -} - -type Package struct { - UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"` - PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"` - PackageName string `gorm:"column:package_name" json:"packageName"` - BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"` - DataType string `gorm:"column:data_type" json:"dataType"` - JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段 - BindingID DataID `gorm:"column:binding_id" json:"bindingID"` - CreateTime time.Time `gorm:"column:create_time" json:"createTime"` - Objects []cdssdk.Object `gorm:"column:objects" json:"objects"` - UploadedCluster []Cluster `gorm:"column:uploadedCluster" json:"uploadedCluster"` - Versions []PackageVersion `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"` - //BlockChain []BlockChain `gorm:"foreignKey:package_id;references:package_id" json:"blockChains"` // 关联 BlockChain 数据 -} - -type PackageDAO struct { - UserID cdssdk.UserID `gorm:"column:user_id" json:"userID"` - PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"` - PackageName string `gorm:"column:package_name" json:"packageName"` - BucketID cdssdk.BucketID `gorm:"column:bucket_id" json:"bucketID"` - DataType string `gorm:"column:data_type" json:"dataType"` - JsonData string `gorm:"column:json_data" json:"jsonData"` // JSON 数据字段 - BindingID DataID `gorm:"column:binding_id" json:"bindingID"` - CreateTime time.Time `gorm:"column:create_time" json:"createTime"` - UploadedCluster []Cluster `gorm:"foreignKey:package_id;references:package_id" json:"clusters"` // 关联 Cluster 数据 - Versions []PackageVersion `gorm:"foreignKey:parent_package_id;references:package_id" json:"versions"` -} - -type PackageVersion struct { - ParentPackageID cdssdk.PackageID `gorm:"column:parent_package_id" json:"parentPackageID"` - PackageID cdssdk.PackageID `gorm:"column:package_id" json:"packageID"` - PackageName string `gorm:"column:package_name" json:"packageName"` - Version int64 `gorm:"column:package_version" json:"version"` -} - -func (PackageVersion) TableName() string { - return "packageVersion" // 确保和数据库中的表名一致 -} - type DataScheduleReq struct { PackageID cdssdk.PackageID `json:"packageID"` DataType string `json:"dataType"` ScheduleTarget ScheduleTarget `json:"scheduleTarget"` } -type ScheduleTarget interface { - Noop() -} - -var DataScheduleTargetTypeUnion = types.NewTypeUnion[ScheduleTarget]( - (*JCSScheduleTarget)(nil), - (*UrlScheduleTarget)(nil), -) - -var _ = serder.UseTypeUnionInternallyTagged(&DataScheduleTargetTypeUnion, "type") - -type ScheduleTargetBase struct{} - -func (d *ScheduleTargetBase) Noop() {} - -type JCSScheduleTarget struct { - ScheduleTargetBase - UserID cdssdk.UserID `json:"userID"` - ScheduleStorages []ScheduleStorage `json:"scheduleStorages"` -} - -type UrlScheduleTarget struct { - ScheduleTargetBase - ScheduleUrls []ScheduleUrl `json:"scheduleUrls"` -} - -type ScheduleUrl struct { - ClusterID ClusterID `json:"clusterID"` - //RepositoryName string `json:"repositoryName"` - JsonData string `json:"jsonData"` -} - -type ScheduleStorage struct { - StorageID cdssdk.StorageID `json:"storageID"` - RootPath string `json:"rootPath"` -} - //type DataScheduleResp struct { // Results []sch.DataScheduleResult `json:"data"` //} +type TmpDataScheduleResult struct { + Cluster sch.DataDetail `json:"cluster"` + PackageID cdssdk.PackageID `json:"packageID"` + Status bool `json:"status"` + Msg string `json:"msg"` +} + func (c *Client) DataSchedule(req DataScheduleReq) ([]sch.DataScheduleResult, error) { targetUrl, err := url.JoinPath(c.baseURL, "/dataSchedule") if err != nil { @@ -125,17 +41,29 @@ func (c *Client) DataSchedule(req DataScheduleReq) ([]sch.DataScheduleResult, er if err != nil { return nil, err } + println(resp.Body) contType := resp.Header.Get("Content-Type") if strings.Contains(contType, http2.ContentTypeJSON) { - var codeResp response[[]sch.DataScheduleResult] + var codeResp response[[]TmpDataScheduleResult] if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { return nil, fmt.Errorf("parsing response: %w", err) } if codeResp.Code == ResponseCodeOK { - - return codeResp.Data, nil + var results []sch.DataScheduleResult + for _, tmpResult := range codeResp.Data { + result := sch.DataScheduleResult{ + PackageID: tmpResult.PackageID, + Status: tmpResult.Status, + Msg: tmpResult.Msg, + Clusters: []sch.DataDetail{ + tmpResult.Cluster, + }, + } + results = append(results, result) + } + return results, nil } return nil, codeResp.ToError() @@ -194,7 +122,7 @@ var _ = serder.UseTypeUnionInternallyTagged(&UploadTargetTypeUnion, "type") type UrlTarget struct { serder.Metadata `union:"url"` UploadTargetBase - Clusters []ClusterID `json:"clusters"` + ClusterID ClusterID `json:"clusterId"` JCSUploadInfo cdsapi.ObjectUploadInfo `form:"JCSUploadInfo"` } @@ -211,7 +139,10 @@ func (d *UploadTargetBase) Noop() {} type UploadResp struct { PackageID cdssdk.PackageID `json:"packageID"` ObjectIDs []cdssdk.ObjectID `json:"objectIDs"` + ClusterID ClusterID `json:"clusterID"` JsonData string `json:"jsonData"` + Status bool `json:"status"` + Message string `json:"message"` } func (c *Client) Upload(req UploadReq) (*UploadResp, error) { diff --git a/utils/config/config.go b/utils/config/config.go index d0f930b..889094b 100644 --- a/utils/config/config.go +++ b/utils/config/config.go @@ -33,6 +33,8 @@ func DefaultLoad(modeulName string, defCfg interface{}) error { // filepath.Join用于将多个路径组合成一个路径 configFilePath := filepath.Join(filepath.Dir(execPath), "..", "confs", fmt.Sprintf("%s.config.json", modeulName)) + configFilePath = "D:\\Work\\Codes\\workspace\\workspace\\scheduler\\common\\assets\\confs\\middleware.json" + return Load(configFilePath, defCfg) }