Browse Source

调整pcm的接口

pull/30/head
Sydonian 2 years ago
parent
commit
1ae4788174
9 changed files with 403 additions and 133 deletions
  1. +34
    -0
      sdks/pcm/models.go
  2. +198
    -111
      sdks/pcm/pcm.go
  3. +58
    -0
      sdks/pcm/pcm_test.go
  4. +7
    -3
      sdks/scheduler/models.go
  5. +6
    -9
      sdks/unifyops/models.go
  6. +3
    -7
      sdks/unifyops/unifyops.go
  7. +19
    -3
      utils/http/http.go
  8. +43
    -0
      utils/serder/types.go
  9. +35
    -0
      utils/serder/types_test.go

+ 34
- 0
sdks/pcm/models.go View File

@@ -0,0 +1,34 @@
package pcmsdk

import schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"

type Participant struct {
ID schsdk.SlwNodeID `json:"id"`
Name string `json:"name"`
Type string `json:"type"`
}

type Image struct {
ImageID schsdk.SlwNodeImageID `json:"imageID"`
ImageName string `json:"imageName"`
ImageStatus string `json:"imageStatus"`
}

type ResourceID string

type Resource struct {
ParticipantID schsdk.SlwNodeID `json:"participantID"`
ParticipantName string `json:"participantName"`
SpecName string `json:"specName"`
SpecID ResourceID `json:"specId"`
SpecPrice float64 `json:"specPrice"`
}

type TaskID string

const (
TaskStatusPending = "Pending"
TaskStatusRunning = "Running"
TaskStatusSuccess = "Success"
TaskStatuFailed = "Failed"
)

+ 198
- 111
sdks/pcm/pcm.go View File

@@ -4,27 +4,29 @@ import (
"fmt"
"net/url"
"strings"
"time"

"gitlink.org.cn/cloudream/common/sdks"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/serder"
)

const CORRECT_CODE int = 200
const CodeOK int = 200

type UploadImageReq struct {
SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"`
SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"`
ImagePath string `json:"imagePath"`
}

type UploadImageResp struct {
Result string `json:"result"`
ImageID uopsdk.SlwNodeImageID `json:"imageID"`
ImageID schsdk.SlwNodeImageID `json:"imageID"`
}

// TODO
func (c *Client) UploadImage(req UploadImageReq) (*UploadImageResp, error) {
url, err := url.JoinPath(c.baseURL, "/pcm/v1/core/uploadImage")
url, err := url.JoinPath(c.baseURL, "/pcm/v1/storelink/uploadImage")
if err != nil {
return nil, err
}
@@ -43,7 +45,7 @@ func (c *Client) UploadImage(req UploadImageReq) (*UploadImageResp, error) {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == CORRECT_CODE {
if codeResp.Code == CodeOK {
return &codeResp.Data, nil
}

@@ -53,200 +55,285 @@ func (c *Client) UploadImage(req UploadImageReq) (*UploadImageResp, error) {
return nil, fmt.Errorf("unknow response content type: %s", contType)
}

type GetParticipantsResp struct {
Participants []Participant
}

func (c *Client) GetParticipants() (*GetParticipantsResp, error) {
type Resp struct {
Code int `json:"code"`
Message string `json:"message"`
Participants []Participant `json:"participants"`
}

url, err := url.JoinPath(c.baseURL, "/pcm/v1/storelink/uploadImage")
if err != nil {
return nil, err
}
rawResp, err := myhttp.GetJSON(url, myhttp.RequestParam{})
if err != nil {
return nil, err
}

resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
if err != nil {
return nil, err
}

if resp.Code != CodeOK {
return nil, &sdks.CodeMessageError{
Code: fmt.Sprintf("%d", resp.Code),
Message: resp.Message,
}
}

return &GetParticipantsResp{
Participants: resp.Participants,
}, nil
}

type GetImageListReq struct {
SlwNodeID int64 `json:"slwNodeID"`
PartID schsdk.SlwNodeID `json:"partId"`
}

type GetImageListResp struct {
ImageIDs []int64 `json:"imageIDs"`
Images []Image
}

func (c *Client) GetImageList(req GetImageListReq) (*GetImageListResp, error) {
url, err := url.JoinPath(c.baseURL, "/pcm/v1/core/getImageList")
type Resp struct {
Success bool `json:"success"`
Images []Image `json:"images"`
ErrorMsg string `json:"errorMsg"`
}

url, err := url.JoinPath(c.baseURL, "/pcm/v1/storelink/getImageList")
if err != nil {
return nil, err
}
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
rawResp, err := myhttp.GetJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {

var codeResp response[GetImageListResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == CORRECT_CODE {
return &codeResp.Data, nil
}
resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
if err != nil {
return nil, err
}

return nil, codeResp.ToError()
if !resp.Success {
return nil, fmt.Errorf(resp.ErrorMsg)
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
return &GetImageListResp{
Images: resp.Images,
}, nil
}

type DeleteImageReq struct {
SlwNodeID int64 `json:"slwNodeID"`
PCMJobID int64 `json:"pcmJobID"`
PartID schsdk.SlwNodeID `json:"partID"`
ImageID schsdk.SlwNodeImageID `json:"imageID"`
}

type DeleteImageResp struct {
Result string `json:"result"`
}
func (c *Client) DeleteImage(req DeleteImageReq) error {
type Resp struct {
Success bool `json:"success"`
ErrorMsg string `json:"errorMsg"`
}

func (c *Client) DeleteImage(req DeleteImageReq) (*DeleteImageResp, error) {
url, err := url.JoinPath(c.baseURL, "/pcm/v1/core/deleteImage")
url, err := url.JoinPath(c.baseURL, "/pcm/v1/storelink/deleteImage")
if err != nil {
return nil, err
return err
}
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
rawResp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return nil, err
return err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {

var codeResp response[DeleteImageResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == CORRECT_CODE {
return &codeResp.Data, nil
}
resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
if err != nil {
return err
}

return nil, codeResp.ToError()
if !resp.Success {
return fmt.Errorf(resp.ErrorMsg)
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
return nil
}

type ScheduleTaskReq struct {
SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"`
Envs []schsdk.EnvVar `json:"envs"`
ImageID uopsdk.SlwNodeImageID `json:"imageID"`
CMDLine string `json:"cmdLine"`
type SubmitTaskReq struct {
PartID schsdk.SlwNodeID `json:"partId"`
ImageID schsdk.SlwNodeImageID `json:"imageId"`
ResourceID ResourceID `json:"resourceId"`
CMD string `json:"cmd"`
Params []schsdk.KVPair `json:"params"`
Envs []schsdk.KVPair `json:"envs"`
}

type ScheduleTaskResp struct {
Result string `json:"result"`
PCMJobID int64 `json:"pcmJobID"`
type SubmitTaskResp struct {
TaskID TaskID
}

func (c *Client) ScheduleTask(req ScheduleTaskReq) (*ScheduleTaskResp, error) {
url, err := url.JoinPath(c.baseURL, "/pcm/v1/core/scheduleTask")
func (c *Client) SubmitTask(req SubmitTaskReq) (*SubmitTaskResp, error) {
type Resp struct {
Success bool `json:"success"`
TaskID TaskID `json:"taskId"`
ErrorMsg string `json:"errorMsg"`
}

url, err := url.JoinPath(c.baseURL, "/pcm/v1/storelink/submitTask")
if err != nil {
return nil, err
}
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
rawResp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {

var codeResp response[ScheduleTaskResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == CORRECT_CODE {
return &codeResp.Data, nil
}
resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
if err != nil {
return nil, err
}

return nil, codeResp.ToError()
if !resp.Success {
return nil, fmt.Errorf(resp.ErrorMsg)
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
return &SubmitTaskResp{
TaskID: resp.TaskID,
}, nil
}

type GetTaskStatusReq struct {
SlwNodeID uopsdk.SlwNodeID `json:"slwNodeID"`
PCMJobID int64 `json:"pcmJobID"`
type GetTaskReq struct {
PartID schsdk.SlwNodeID `json:"partId"`
TaskID TaskID `json:"taskId"`
}

type GetTaskStatusResp struct {
Result string `json:"result"`
Status string `json:"status"`
type GetTaskResp struct {
TaskStatus string
TaskName string
StartedAt time.Time
CompletedAt time.Time
}

func (c *Client) GetTaskStatus(req GetTaskStatusReq) (*GetTaskStatusResp, error) {
url, err := url.JoinPath(c.baseURL, "/pcm/v1/core/getTaskStatus")
func (c *Client) GetTask(req GetTaskReq) (*GetTaskResp, error) {
type Resp struct {
Success bool `json:"success"`
Task struct {
TaskID TaskID `json:"taskId"`
TaskStatus string `json:"taskStatus"`
TaskName string `json:"taskName"`
StartedAt serder.TimestampSecond `json:"startedAt"`
CompletedAt serder.TimestampSecond `json:"completedAt"`
} `json:"task"`
ErrorMsg string `json:"errorMsg"`
}

url, err := url.JoinPath(c.baseURL, "/pcm/v1/storelink/getTask")
if err != nil {
return nil, err
}
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
rawResp, err := myhttp.GetJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {
resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
if err != nil {
return nil, err
}

var codeResp response[GetTaskStatusResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}
if !resp.Success {
return nil, fmt.Errorf(resp.ErrorMsg)
}

if codeResp.Code == CORRECT_CODE {
return &codeResp.Data, nil
}
return &GetTaskResp{
TaskStatus: resp.Task.TaskStatus,
TaskName: resp.Task.TaskName,
StartedAt: time.Time(resp.Task.StartedAt),
CompletedAt: time.Time(resp.Task.CompletedAt),
}, nil
}

return nil, codeResp.ToError()
type DeleteTaskReq struct {
PartID schsdk.SlwNodeID `json:"partId"`
TaskID TaskID `json:"taskId"`
}

func (c *Client) DeleteTask(req DeleteTaskReq) error {
type Resp struct {
Success bool `json:"success"`
ErrorMsg string `json:"errorMsg"`
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
url, err := url.JoinPath(c.baseURL, "/pcm/v1/storelink/deleteTask")
if err != nil {
return err
}
rawResp, err := myhttp.PostJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return err
}

resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
if err != nil {
return err
}

if !resp.Success {
return fmt.Errorf(resp.ErrorMsg)
}

return nil
}

type DeleteTaskReq struct {
SlwNodeID int64 `json:"slwNodeID"`
PCMJobID int64 `json:"pcmJobID"`
type GetResourceSpecs struct {
PartID schsdk.SlwNodeID `json:"partId"`
}

type DeleteTaskResp struct {
Result string `json:"result"`
type GetResourceSpecsResp struct {
Resources []Resource
}

func (c *Client) DeleteTask(req DeleteTaskReq) (*DeleteTaskResp, error) {
url, err := url.JoinPath(c.baseURL, "/pcm/v1/core/deleteTask")
func (c *Client) GetResourceSpecs(req GetImageListReq) (*GetResourceSpecsResp, error) {
type Resp struct {
Success bool `json:"success"`
ResourceSpecs []Resource `json:"resourceSpecs"`
ErrorMsg string `json:"errorMsg"`
}

url, err := url.JoinPath(c.baseURL, "/pcm/v1/storelink/getResourceSpecs")
if err != nil {
return nil, err
}
resp, err := myhttp.PostJSON(url, myhttp.RequestParam{
rawResp, err := myhttp.GetJSON(url, myhttp.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, myhttp.ContentTypeJSON) {

var codeResp response[DeleteTaskResp]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return nil, fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == CORRECT_CODE {
return &codeResp.Data, nil
}
resp, err := myhttp.ParseJSONResponse[Resp](rawResp)
if err != nil {
return nil, err
}

return nil, codeResp.ToError()
if !resp.Success {
return nil, fmt.Errorf(resp.ErrorMsg)
}

return nil, fmt.Errorf("unknow response content type: %s", contType)
return &GetResourceSpecsResp{
Resources: resp.ResourceSpecs,
}, nil
}

+ 58
- 0
sdks/pcm/pcm_test.go View File

@@ -0,0 +1,58 @@
package pcmsdk

import (
"testing"
"time"

. "github.com/smartystreets/goconvey/convey"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
)

func Test_SubmitTask(t *testing.T) {
Convey("提交任务,查询任务", t, func() {
cli := NewClient(&Config{
URL: "http://localhost:8889",
})

submitResp, err := cli.SubmitTask(SubmitTaskReq{
PartID: 1711652475901054976,
ImageID: "1d1769857cd64c03928c8a1a4ee4a23f",
ResourceID: "6388d3c27f654fa5b11439a3d6098dbc",
CMD: "echo $asd",
Envs: []schsdk.KVPair{{
Key: "asd",
Value: "hello",
}},
Params: []schsdk.KVPair{},
})
So(err, ShouldBeNil)

t.Logf("taskID: %s", submitResp.TaskID)

taskResp, err := cli.GetTask(GetTaskReq{
PartID: 1711652475901054976,
TaskID: submitResp.TaskID,
})
So(err, ShouldBeNil)

<-time.After(time.Second * 3)

t.Logf("taskName: %s, taskStatus: %s, startedAt: %v", taskResp.TaskName, taskResp.TaskStatus, taskResp.StartedAt)
})

}

func Test_GetImageList(t *testing.T) {
Convey("查询镜像列表", t, func() {
cli := NewClient(&Config{
URL: "http://localhost:8889",
})

getReps, err := cli.GetImageList(GetImageListReq{
PartID: 1711652475901054976,
})
So(err, ShouldBeNil)

t.Logf("imageList: %v", getReps.Images)
})
}

+ 7
- 3
sdks/scheduler/models.go View File

@@ -23,6 +23,10 @@ type JobSetID string

type ImageID string

type SlwNodeID int64

type SlwNodeImageID string

type JobSetInfo struct {
Jobs []JobInfo `json:"jobs"`
}
@@ -111,11 +115,11 @@ type ImageJobFileInfo struct {

type JobRuntimeInfo struct {
Command string `json:"command"`
Envs []EnvVar `json:"envs"`
Envs []KVPair `json:"envs"`
}

type EnvVar struct {
Var string `json:"var"`
type KVPair struct {
Key string `json:"key"`
Value string `json:"value"`
}



+ 6
- 9
sdks/unifyops/models.go View File

@@ -3,6 +3,7 @@ package uopsdk
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
"gitlink.org.cn/cloudream/common/pkgs/types"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
"gitlink.org.cn/cloudream/common/utils/serder"
)

@@ -17,16 +18,12 @@ const (
ResourceTypeMemory ResourceType = "MEMORY"
)

type SlwNodeID int64

type SlwNodeImageID int64

type SlwNode struct {
ID SlwNodeID `json:"ID"`
Name string `json:"name"`
SlwRegionID int64 `json:"slwRegionID"`
StgNodeID int64 `json:"stgNodeID"`
StorageID int64 `json:"StorageID"`
ID schsdk.SlwNodeID `json:"ID"`
Name string `json:"name"`
SlwRegionID int64 `json:"slwRegionID"`
StgNodeID int64 `json:"stgNodeID"`
StorageID int64 `json:"StorageID"`
}

type ResourceData interface {


+ 3
- 7
sdks/unifyops/unifyops.go View File

@@ -5,6 +5,7 @@ import (
"net/url"
"strings"

schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
myhttp "gitlink.org.cn/cloudream/common/utils/http"
"gitlink.org.cn/cloudream/common/utils/serder"
)
@@ -43,7 +44,7 @@ func (c *Client) GetAllSlwNodeInfo() (*GetAllSlwNodeInfoResp, error) {
}

type GetOneResourceDataReq struct {
SlwNodeID SlwNodeID `json:"nodeId"`
SlwNodeID schsdk.SlwNodeID `json:"nodeId"`
}

func (c *Client) GetCPUData(node GetOneResourceDataReq) (*CPUResourceData, error) {
@@ -250,15 +251,10 @@ func (c *Client) GetIndicatorData(node GetOneResourceDataReq) (*[]ResourceData,
return nil, codeResp.ToError()
}

mapToObjOpt := serder.MapToObjectOption{
UnionTypes: []serder.TaggedUnionType{
ResourceDataTaggedTypeUnion,
},
}
var ret []ResourceData
for _, mp := range codeResp.Data {
var data ResourceData
err := serder.MapToObject(mp, &data, mapToObjOpt)
err := serder.MapToObject(mp, &data)
if err != nil {
return nil, err
}


+ 19
- 3
utils/http/http.go View File

@@ -1,6 +1,7 @@
package http

import (
"bytes"
"fmt"
"io"
"mime/multipart"
@@ -9,6 +10,7 @@ import (
"strings"

"gitlink.org.cn/cloudream/common/pkgs/iterator"
"gitlink.org.cn/cloudream/common/utils/math"
"gitlink.org.cn/cloudream/common/utils/serder"
)

@@ -120,7 +122,13 @@ func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) {
return ret, nil
}

return ret, fmt.Errorf("unknow response content type: %s, status: %d", contType, resp.StatusCode)
cont, err := io.ReadAll(resp.Body)
if err != nil {
return ret, fmt.Errorf("unknow response content type: %s, status: %d", contType, resp.StatusCode)
}
strCont := string(cont)

return ret, fmt.Errorf("unknow response content type: %s, status: %d, body(prefix): %s", contType, resp.StatusCode, strCont[:math.Min(len(strCont), 200)])
}

type MultiPartRequestParam struct {
@@ -273,7 +281,13 @@ func prepareJSONBody(req *http.Request, body any) error {
return nil
}

req.Body = serder.ObjectToJSONStream(body)
data, err := serder.ObjectToJSON(body)
if err != nil {
return err
}

req.ContentLength = int64(len(data))
req.Body = io.NopCloser(bytes.NewReader(data))
return nil
}

@@ -297,7 +311,9 @@ func prepareFormBody(req *http.Request, body any) error {
values.Add(k, fmt.Sprintf("%v", v))
}

req.Body = io.NopCloser(strings.NewReader(values.Encode()))
data := values.Encode()
req.Body = io.NopCloser(strings.NewReader(data))
req.ContentLength = int64(len(data))
return nil
}



+ 43
- 0
utils/serder/types.go View File

@@ -0,0 +1,43 @@
package serder

import (
"fmt"
"strconv"
"time"
)

type TimestampSecond time.Time

func (t *TimestampSecond) MarshalJSON() ([]byte, error) {
raw := time.Time(*t)
return []byte(fmt.Sprintf("%d", raw.Unix())), nil
}

func (t *TimestampSecond) UnmarshalJSON(data []byte) error {
var timestamp int64
var err error
if timestamp, err = strconv.ParseInt(string(data), 10, 64); err != nil {
return err
}

*t = TimestampSecond(time.Unix(timestamp, 0))
return nil
}

type TimestampMilliSecond time.Time

func (t *TimestampMilliSecond) MarshalJSON() ([]byte, error) {
raw := time.Time(*t)
return []byte(fmt.Sprintf("%d", raw.UnixMilli())), nil
}

func (t *TimestampMilliSecond) UnmarshalJSON(data []byte) error {
var timestamp int64
var err error
if timestamp, err = strconv.ParseInt(string(data), 10, 64); err != nil {
return err
}

*t = TimestampMilliSecond(time.UnixMilli(timestamp))
return nil
}

+ 35
- 0
utils/serder/types_test.go View File

@@ -0,0 +1,35 @@
package serder

import (
"encoding/json"
"testing"
"time"

. "github.com/smartystreets/goconvey/convey"
)

func Test_Timestamp(t *testing.T) {
Convey("秒级时间戳", t, func() {
str := "1698894747"

var ts TimestampSecond

err := json.Unmarshal([]byte(str), &ts)
So(err, ShouldBeNil)

t := time.Time(ts)
So(t.Unix(), ShouldEqual, 1698894747)
})

Convey("毫秒级时间戳", t, func() {
str := "1698895130651"

var ts TimestampMilliSecond

err := json.Unmarshal([]byte(str), &ts)
So(err, ShouldBeNil)

t := time.Time(ts)
So(t.UnixMilli(), ShouldEqual, 1698895130651)
})
}

Loading…
Cancel
Save