diff --git a/api/scheduler/client.go b/api/scheduler/client.go new file mode 100644 index 0000000..14381ec --- /dev/null +++ b/api/scheduler/client.go @@ -0,0 +1,56 @@ +package scheduler + +import "gitlink.org.cn/cloudream/common/api" + +type response[T any] struct { + Code string `json:"code"` + Message string `json:"message"` + Data T `json:"data"` +} + +func (r *response[T]) ToError() *api.CodeMessageError { + return &api.CodeMessageError{ + Code: r.Code, + Message: r.Message, + } +} + +type Client struct { + baseURL string +} + +func NewClient(cfg *Config) *Client { + return &Client{ + baseURL: cfg.URL, + } +} + +type PoolClient struct { + *Client + owner *Pool +} + +func (c *PoolClient) Close() { + c.owner.Release(c) +} + +type Pool struct { + cfg *Config +} + +func NewPool(cfg *Config) *Pool { + return &Pool{ + cfg: cfg, + } +} +func (p *Pool) Acquire() (*PoolClient, error) { + cli := NewClient(p.cfg) + return &PoolClient{ + Client: cli, + owner: p, + }, nil +} + +func (p *Pool) Release(cli *PoolClient) { + +} diff --git a/api/scheduler/config.go b/api/scheduler/config.go new file mode 100644 index 0000000..15765cf --- /dev/null +++ b/api/scheduler/config.go @@ -0,0 +1,5 @@ +package scheduler + +type Config struct { + URL string `json:"url"` +} diff --git a/api/scheduler/jobset.go b/api/scheduler/jobset.go new file mode 100644 index 0000000..50d5a0a --- /dev/null +++ b/api/scheduler/jobset.go @@ -0,0 +1,86 @@ +package scheduler + +import ( + "fmt" + "net/url" + "strings" + + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/models" + myhttp "gitlink.org.cn/cloudream/common/utils/http" + "gitlink.org.cn/cloudream/common/utils/serder" +) + +type JobSetSumbitReq struct { + models.JobSetInfo +} + +type JobSetSumbitResp struct { + JobSetID string `json:"jobSetID"` +} + +func (c *Client) JobSetSumbit(req JobSetSumbitReq) (*JobSetSumbitResp, error) { + url, err := url.JoinPath(c.baseURL, "/jobSet/submit") + if err != nil { + return nil, err + } + + resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var codeResp response[JobSetSumbitResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} + +type JobSetSetLocalFileReq struct { + JobSetID string `json:"jobSetID"` + LocalPath string `json:"localPath"` + PackageID int64 `json:"packageID"` +} + +func (c *Client) JobSetSetLocalFile(req JobSetSetLocalFileReq) error { + url, err := url.JoinPath(c.baseURL, "/jobSet/setLocalFile") + if err != nil { + return err + } + + resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var codeResp response[any] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == errorcode.OK { + return nil + } + + return codeResp.ToError() + } + + return fmt.Errorf("unknow response content type: %s", contType) +} diff --git a/api/scheduler/scheduler_test.go b/api/scheduler/scheduler_test.go new file mode 100644 index 0000000..1bcf2ff --- /dev/null +++ b/api/scheduler/scheduler_test.go @@ -0,0 +1,50 @@ +package scheduler + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + "gitlink.org.cn/cloudream/common/models" +) + +func Test_JobSet(t *testing.T) { + Convey("提交任务集和设置LocalFile", t, func() { + cli := NewClient(&Config{ + URL: "http://localhost:7891", + }) + + id, err := cli.JobSetSumbit(JobSetSumbitReq{ + JobSetInfo: models.JobSetInfo{ + Jobs: []models.JobInfo{ + models.ResourceJobInfo{ + Type: models.JobTypeResource, + }, + models.NormalJobInfo{ + Type: models.JobTypeNormal, + Files: models.JobFilesInfo{ + Dataset: models.PackageFileInfo{ + Type: models.FileInfoTypePackage, + }, + Code: models.LocalFileInfo{ + Type: models.FileInfoTypeLocalFile, + LocalPath: "code", + }, + Image: models.ImageFileInfo{ + Type: models.FileInfoTypeImage, + }, + }, + }, + }, + }, + }) + So(err, ShouldBeNil) + So(id.JobSetID, ShouldNotBeEmpty) + + err = cli.JobSetSetLocalFile(JobSetSetLocalFileReq{ + JobSetID: id.JobSetID, + LocalPath: "code", + PackageID: 1, + }) + So(err, ShouldBeNil) + }) +} diff --git a/models/job.go b/models/job.go index 55e42ff..eccca17 100644 --- a/models/job.go +++ b/models/job.go @@ -42,14 +42,14 @@ type ResourceJobInfo struct { } type JobFilesInfo struct { - Dateset FileInfo `json:"dataset"` + Dataset FileInfo `json:"dataset"` Code FileInfo `json:"code"` Image FileInfo `json:"image"` } type FileInfo interface{} -var FileInfoTypeUnion = serder.NewTypeUnion[JobInfo]("type", +var FileInfoTypeUnion = serder.NewTypeUnion[FileInfo]("type", serder.NewStringTypeResolver(). Add(FileInfoTypePackage, myreflect.TypeOf[PackageFileInfo]()). Add(FileInfoTypeLocalFile, myreflect.TypeOf[LocalFileInfo]()). diff --git a/utils/serder/serder.go b/utils/serder/serder.go index c089b09..e63fb5a 100644 --- a/utils/serder/serder.go +++ b/utils/serder/serder.go @@ -76,7 +76,8 @@ func MapToObject(m map[string]any, obj any, opt ...MapToObjectOption) error { unionTypeMapping := make(map[reflect.Type]*UnionTypeInfo) for _, u := range op.UnionTypes { - unionTypeMapping[u.UnionType] = &u + uu := u + unionTypeMapping[u.UnionType] = &uu } convs := []Converter{