Browse Source

Merge pull request '新增云算创建功能及相关配置' (#58) from feature_zj into feature_rzs

pull/60/head
JeshuaRen 3 months ago
parent
commit
13063ef198
5 changed files with 217 additions and 0 deletions
  1. +63
    -0
      sdks/cloud/client.go
  2. +5
    -0
      sdks/cloud/config.go
  3. +113
    -0
      sdks/cloud/job.go
  4. +3
    -0
      sdks/cloud/models.go
  5. +33
    -0
      sdks/scheduler/models.go

+ 63
- 0
sdks/cloud/client.go View File

@@ -0,0 +1,63 @@
package cloud

import (
"fmt"

"gitlink.org.cn/cloudream/common/sdks"
)

type response[T any] struct {
Code int `json:"code"`
Message string `json:"message"`
Data T `json:"data"`
}

type respons2[T any] struct {
Code int `json:"code"`
Message string `json:"msg"`
Data T `json:"data"`
}

const (
ResponseCodeOK int = 200
)

func (r *response[T]) ToError() *sdks.CodeMessageError {
return &sdks.CodeMessageError{
Code: fmt.Sprintf("%d", r.Code),
Message: r.Message,
}
}

type Client struct {
baseURL string
}

func NewClient(cfg *Config) *Client {
return &Client{
baseURL: cfg.URL,
}
}

type Pool interface {
Acquire() (*Client, error)
Release(cli *Client)
}

type pool struct {
cfg *Config
}

func NewPool(cfg *Config) Pool {
return &pool{
cfg: cfg,
}
}
func (p *pool) Acquire() (*Client, error) {
cli := NewClient(p.cfg)
return cli, nil
}

func (p *pool) Release(cli *Client) {

}

+ 5
- 0
sdks/cloud/config.go View File

@@ -0,0 +1,5 @@
package cloud

type Config struct {
URL string `json:"url"`
}

+ 113
- 0
sdks/cloud/job.go View File

@@ -0,0 +1,113 @@
package cloud

import (
"fmt"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder"
"net/url"
"strings"
)

//type CreateParam struct {
// ClusterId string `json:"clusterId,omitempty"`
// ContainerGroupName string `json:"containerGroupName"`
// Name string `json:"name"`
// Image string `json:"image"`
// Cpu string `json:"cpu,omitempty"`
// Memory string `json:"memory,omitempty"`
// Port int32 `json:"port,omitempty"`
// NodePort int32 `json:"nodePort,omitempty"`
// MountPath string `json:"mountPath,omitempty"`
// Args []string `json:"args,omitempty"`
// Envs []struct {
// Name string `json:"name,omitempty"`
// Value string `json:"value,omitempty"`
// } `json:"envs,omitempty"`
// ContainerCreateParameter ContainerCreateParameter `json:"containerCreateParameter,omitempty"`
//}

type CreateCloudJobReq struct {
ClusterId schsdk.ClusterID `json:"clusterId,omitempty"`
ContainerGroupName string `json:"containerGroupName"`
Name string `json:"name"`
Description string `json:"description"`
Image string `json:"image"`
Cpu string `json:"cpu,omitempty"`
Memory string `json:"memory,omitempty"`
Port int32 `json:"port,omitempty"`
NodePort int32 `json:"nodePort,omitempty"`
MountPath string `json:"mountPath,omitempty"`
Args []string `json:"args,omitempty"`
Envs []interface{} `json:"envs,omitempty"`
}

// type T2 struct {
// Code int `json:"code"`
// Msg string `json:"msg"`
// Data struct {
// Command interface{} `json:"Command"`
// Args []string `json:"args"`
// ContainerPorts struct {
// NodePort int `json:"nodePort"`
// Port int `json:"port"`
// } `json:"containerPorts"`
// Image string `json:"image"`
// Limits struct {
// Cpu string `json:"cpu"`
// Memory string `json:"memory"`
// } `json:"limits"`
// Name string `json:"name"`
// } `json:"data"`
// TraceId string `json:"traceId"`
// }
type CreateCloudResp struct {
Command interface{} `json:"Command"`
Args []string `json:"args"`
ContainerPorts map[string]int64 `json:"containerPorts"`
Image string `json:"image"`
Limits map[string]string `json:"limits"`
Name string `json:"name"`
TaskID TaskID `json:"taskId"`
}

func (c *Client) CreateJob(req CreateCloudJobReq, token string) (*CreateCloudResp, error) {
targetUrl, err := url.JoinPath(c.baseURL, "/cloud/container/create")
if err != nil {
return nil, err
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
Header: map[string]string{
"Authorization": token,
},
})
if err != nil {
return nil, err
}

// 打印resp.Body内容
//body, err := io.ReadAll(resp.Body)
//if err != nil {
// println("Error reading response body:", err)
//}
//println("Response Body:", string(body))

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp respons2[CreateCloudResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == ResponseCodeOK {
return &codeResp.Data, nil
}

return nil, fmt.Errorf("error: %s", codeResp.Message)
}

return nil, fmt.Errorf("unknow response content type: %s", contType)

}

+ 3
- 0
sdks/cloud/models.go View File

@@ -0,0 +1,3 @@
package cloud

type TaskID string

+ 33
- 0
sdks/scheduler/models.go View File

@@ -81,6 +81,7 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo](
(*StopInferenceJobInfo)(nil), (*StopInferenceJobInfo)(nil),
(*UploadJobInfo)(nil), (*UploadJobInfo)(nil),
(*BroadcastWaitInfo)(nil), (*BroadcastWaitInfo)(nil),
(*CloudJobInfo)(nil),
) )
var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type")


@@ -211,6 +212,24 @@ type Wait struct {
WaitJobID string `json:"waitJobID"` WaitJobID string `json:"waitJobID"`
} }


type CloudJobInfo struct {
serder.Metadata `union:"CLOUD"`
JobInfoBase
Type string `json:"type"`

ClusterID ClusterID `json:"clusterID,omitempty"`
ContainerGroupName string `json:"containerGroupName"`
Name string `json:"name"`
Image string `json:"image"`
Cpu string `json:"cpu,omitempty"`
Memory string `json:"memory,omitempty"`
Port int32 `json:"port,omitempty"`
NodePort int32 `json:"nodePort,omitempty"`
MountPath string `json:"mountPath,omitempty"`
Args []string `json:"args,omitempty"`
Envs []interface{} `json:"envs,omitempty"`
}

type NotifyJobInfo struct { type NotifyJobInfo struct {
serder.Metadata `union:"Notify"` serder.Metadata `union:"Notify"`
JobInfoBase JobInfoBase
@@ -682,6 +701,7 @@ var JobOutputTypeUnion = types.NewTypeUnion[JobOutput](
(*PublicOutput)(nil), (*PublicOutput)(nil),
(*PCMInferenceJobOutput)(nil), (*PCMInferenceJobOutput)(nil),
(*BroadcastWaitOutput)(nil), (*BroadcastWaitOutput)(nil),
(*CloudJobOutput)(nil),
) )


var _ = serder.UseTypeUnionInternallyTagged(&JobOutputTypeUnion, "type") var _ = serder.UseTypeUnionInternallyTagged(&JobOutputTypeUnion, "type")
@@ -765,6 +785,19 @@ type DataReturnJobOutput struct {
PackageID cdssdk.PackageID `json:"packageID"` PackageID cdssdk.PackageID `json:"packageID"`
} }


type CloudJobOutput struct {
serder.Metadata `union:"CLOUD"`
JobOutputBase
Type string `json:"type"`

Command interface{} `json:"Command"`
Args []string `json:"args"`
ContainerPorts map[string]interface{} `json:"containerPorts"`
Image string `json:"image"`
Limits map[string]interface{} `json:"limits"`
Name string `json:"name"`
}

type JobStatusReport interface { type JobStatusReport interface {
Report() Report()
} }


Loading…
Cancel
Save