From d4df1102bce0ac359725c0b10d80bb95f1b2aaa3 Mon Sep 17 00:00:00 2001 From: jagger Date: Tue, 29 Jul 2025 19:37:29 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E4=BA=91=E7=AE=97=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=E5=8A=9F=E8=83=BD=E5=8F=8A=E7=9B=B8=E5=85=B3=E9=85=8D?= =?UTF-8?q?=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/cloud/client.go | 63 ++++++++++++++++++++++ sdks/cloud/config.go | 5 ++ sdks/cloud/job.go | 113 +++++++++++++++++++++++++++++++++++++++ sdks/cloud/models.go | 3 ++ sdks/scheduler/models.go | 33 ++++++++++++ 5 files changed, 217 insertions(+) create mode 100644 sdks/cloud/client.go create mode 100644 sdks/cloud/config.go create mode 100644 sdks/cloud/job.go create mode 100644 sdks/cloud/models.go diff --git a/sdks/cloud/client.go b/sdks/cloud/client.go new file mode 100644 index 0000000..a75e989 --- /dev/null +++ b/sdks/cloud/client.go @@ -0,0 +1,63 @@ +package cloud + +import ( + "fmt" + + "gitlink.org.cn/cloudream/common/sdks" +) + +type response[T any] struct { + Code int `json:"code"` + Message string `json:"message"` + Data T `json:"data"` +} + +type respons2[T any] struct { + Code int `json:"code"` + Message string `json:"msg"` + Data T `json:"data"` +} + +const ( + ResponseCodeOK int = 200 +) + +func (r *response[T]) ToError() *sdks.CodeMessageError { + return &sdks.CodeMessageError{ + Code: fmt.Sprintf("%d", r.Code), + Message: r.Message, + } +} + +type Client struct { + baseURL string +} + +func NewClient(cfg *Config) *Client { + return &Client{ + baseURL: cfg.URL, + } +} + +type Pool interface { + Acquire() (*Client, error) + Release(cli *Client) +} + +type pool struct { + cfg *Config +} + +func NewPool(cfg *Config) Pool { + return &pool{ + cfg: cfg, + } +} +func (p *pool) Acquire() (*Client, error) { + cli := NewClient(p.cfg) + return cli, nil +} + +func (p *pool) Release(cli *Client) { + +} diff --git a/sdks/cloud/config.go b/sdks/cloud/config.go new file mode 100644 index 0000000..42b6412 --- /dev/null +++ b/sdks/cloud/config.go @@ -0,0 +1,5 @@ +package cloud + +type Config struct { + URL string `json:"url"` +} diff --git a/sdks/cloud/job.go b/sdks/cloud/job.go new file mode 100644 index 0000000..b14dd31 --- /dev/null +++ b/sdks/cloud/job.go @@ -0,0 +1,113 @@ +package cloud + +import ( + "fmt" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/serder" + "net/url" + "strings" +) + +//type CreateParam struct { +// ClusterId string `json:"clusterId,omitempty"` +// ContainerGroupName string `json:"containerGroupName"` +// Name string `json:"name"` +// Image string `json:"image"` +// Cpu string `json:"cpu,omitempty"` +// Memory string `json:"memory,omitempty"` +// Port int32 `json:"port,omitempty"` +// NodePort int32 `json:"nodePort,omitempty"` +// MountPath string `json:"mountPath,omitempty"` +// Args []string `json:"args,omitempty"` +// Envs []struct { +// Name string `json:"name,omitempty"` +// Value string `json:"value,omitempty"` +// } `json:"envs,omitempty"` +// ContainerCreateParameter ContainerCreateParameter `json:"containerCreateParameter,omitempty"` +//} + +type CreateCloudJobReq struct { + ClusterId schsdk.ClusterID `json:"clusterId,omitempty"` + ContainerGroupName string `json:"containerGroupName"` + Name string `json:"name"` + Description string `json:"description"` + Image string `json:"image"` + Cpu string `json:"cpu,omitempty"` + Memory string `json:"memory,omitempty"` + Port int32 `json:"port,omitempty"` + NodePort int32 `json:"nodePort,omitempty"` + MountPath string `json:"mountPath,omitempty"` + Args []string `json:"args,omitempty"` + Envs []interface{} `json:"envs,omitempty"` +} + +// type T2 struct { +// Code int `json:"code"` +// Msg string `json:"msg"` +// Data struct { +// Command interface{} `json:"Command"` +// Args []string `json:"args"` +// ContainerPorts struct { +// NodePort int `json:"nodePort"` +// Port int `json:"port"` +// } `json:"containerPorts"` +// Image string `json:"image"` +// Limits struct { +// Cpu string `json:"cpu"` +// Memory string `json:"memory"` +// } `json:"limits"` +// Name string `json:"name"` +// } `json:"data"` +// TraceId string `json:"traceId"` +// } +type CreateCloudResp struct { + Command interface{} `json:"Command"` + Args []string `json:"args"` + ContainerPorts map[string]int64 `json:"containerPorts"` + Image string `json:"image"` + Limits map[string]string `json:"limits"` + Name string `json:"name"` + TaskID TaskID `json:"taskId"` +} + +func (c *Client) CreateJob(req CreateCloudJobReq, token string) (*CreateCloudResp, error) { + targetUrl, err := url.JoinPath(c.baseURL, "/cloud/container/create") + if err != nil { + return nil, err + } + + resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ + Body: req, + Header: map[string]string{ + "Authorization": token, + }, + }) + if err != nil { + return nil, err + } + + // 打印resp.Body内容 + //body, err := io.ReadAll(resp.Body) + //if err != nil { + // println("Error reading response body:", err) + //} + //println("Response Body:", string(body)) + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, http2.ContentTypeJSON) { + var codeResp respons2[CreateCloudResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == ResponseCodeOK { + return &codeResp.Data, nil + } + + return nil, fmt.Errorf("error: %s", codeResp.Message) + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) + +} diff --git a/sdks/cloud/models.go b/sdks/cloud/models.go new file mode 100644 index 0000000..ce883e5 --- /dev/null +++ b/sdks/cloud/models.go @@ -0,0 +1,3 @@ +package cloud + +type TaskID string diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index ad763d4..34bce78 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -81,6 +81,7 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( (*StopInferenceJobInfo)(nil), (*UploadJobInfo)(nil), (*BroadcastWaitInfo)(nil), + (*CloudJobInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") @@ -211,6 +212,24 @@ type Wait struct { WaitJobID string `json:"waitJobID"` } +type CloudJobInfo struct { + serder.Metadata `union:"CLOUD"` + JobInfoBase + Type string `json:"type"` + + ClusterID ClusterID `json:"clusterID,omitempty"` + ContainerGroupName string `json:"containerGroupName"` + Name string `json:"name"` + Image string `json:"image"` + Cpu string `json:"cpu,omitempty"` + Memory string `json:"memory,omitempty"` + Port int32 `json:"port,omitempty"` + NodePort int32 `json:"nodePort,omitempty"` + MountPath string `json:"mountPath,omitempty"` + Args []string `json:"args,omitempty"` + Envs []interface{} `json:"envs,omitempty"` +} + type NotifyJobInfo struct { serder.Metadata `union:"Notify"` JobInfoBase @@ -682,6 +701,7 @@ var JobOutputTypeUnion = types.NewTypeUnion[JobOutput]( (*PublicOutput)(nil), (*PCMInferenceJobOutput)(nil), (*BroadcastWaitOutput)(nil), + (*CloudJobOutput)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobOutputTypeUnion, "type") @@ -765,6 +785,19 @@ type DataReturnJobOutput struct { PackageID cdssdk.PackageID `json:"packageID"` } +type CloudJobOutput struct { + serder.Metadata `union:"CLOUD"` + JobOutputBase + Type string `json:"type"` + + Command interface{} `json:"Command"` + Args []string `json:"args"` + ContainerPorts map[string]interface{} `json:"containerPorts"` + Image string `json:"image"` + Limits map[string]interface{} `json:"limits"` + Name string `json:"name"` +} + type JobStatusReport interface { Report() }