From 77d5367a0e8e489211b61f36cfd648537cebfd21 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Wed, 5 Jun 2024 17:13:30 +0800 Subject: [PATCH 01/17] =?UTF-8?q?=E8=BF=90=E6=8E=A7=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/mq/client.go | 10 +- pkgs/mq/server.go | 10 +- sdks/unifyops/mock_data.go | 256 +++++++++++++++++++++++++++++++++++++ sdks/unifyops/unifyops.go | 170 +++++++----------------- utils/config/config.go | 13 ++ 5 files changed, 335 insertions(+), 124 deletions(-) create mode 100644 sdks/unifyops/mock_data.go diff --git a/pkgs/mq/client.go b/pkgs/mq/client.go index 8d6f090..e34e928 100644 --- a/pkgs/mq/client.go +++ b/pkgs/mq/client.go @@ -2,6 +2,7 @@ package mq import ( "fmt" + "net" "sync" "time" @@ -70,7 +71,14 @@ type RabbitMQTransport struct { } func NewRabbitMQTransport(url string, key string, exchange string) (*RabbitMQTransport, error) { - connection, err := amqp.Dial(url) + config := amqp.Config{ + Dial: func(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, 60*time.Second) // 设置连接超时时间为 60 秒 + }, + } + connection, err := amqp.DialConfig(url, config) + + //connection, err := amqp.Dial(url) if err != nil { return nil, fmt.Errorf("connecting to %s: %w", url, err) } diff --git a/pkgs/mq/server.go b/pkgs/mq/server.go index c3d2809..6106753 100644 --- a/pkgs/mq/server.go +++ b/pkgs/mq/server.go @@ -2,6 +2,7 @@ package mq import ( "fmt" + "net" "time" "github.com/streadway/amqp" @@ -77,7 +78,14 @@ type RabbitMQServer struct { } func NewRabbitMQServer(url string, queueName string, onMessage MessageHandlerFn) (*RabbitMQServer, error) { - connection, err := amqp.Dial(url) + config := amqp.Config{ + Dial: func(network, addr string) (net.Conn, error) { + return net.DialTimeout(network, addr, 60*time.Second) // 设置连接超时时间为 60 秒 + }, + } + connection, err := amqp.DialConfig(url, config) + + //connection, err := amqp.Dial(url) if err != nil { return nil, fmt.Errorf("connecting to %s: %w", url, err) } diff --git a/sdks/unifyops/mock_data.go b/sdks/unifyops/mock_data.go new file mode 100644 index 0000000..c078d35 --- /dev/null +++ b/sdks/unifyops/mock_data.go @@ -0,0 +1,256 @@ +package uopsdk + +// CPU +func shuguang() (*[]ResourceData, error) { + var ret []ResourceData + + cpuResourceData := CPUResourceData{ + Name: ResourceTypeCPU, + Total: UnitValue[int64]{ + Value: 600, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 500, + Unit: "", + }, + } + ret = append(ret, &cpuResourceData) + + npuResourceData := NPUResourceData{ + Name: ResourceTypeNPU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + } + ret = append(ret, &npuResourceData) + + gpuResourceData := GPUResourceData{ + Name: ResourceTypeGPU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + } + ret = append(ret, &gpuResourceData) + + mluResourceData := MLUResourceData{ + Name: ResourceTypeMLU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + } + ret = append(ret, &mluResourceData) + + storageResourceData := StorageResourceData{ + Name: ResourceTypeStorage, + Total: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + Available: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + } + ret = append(ret, &storageResourceData) + + memoryResourceData := MemoryResourceData{ + Name: ResourceTypeMemory, + Total: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + Available: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + } + ret = append(ret, &memoryResourceData) + + return &ret, nil +} + +// GPU +func modelarts() (*[]ResourceData, error) { + var ret []ResourceData + + cpuResourceData := CPUResourceData{ + Name: ResourceTypeCPU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + } + ret = append(ret, &cpuResourceData) + + npuResourceData := NPUResourceData{ + Name: ResourceTypeNPU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + } + ret = append(ret, &npuResourceData) + + gpuResourceData := GPUResourceData{ + Name: ResourceTypeGPU, + Total: UnitValue[int64]{ + Value: 600, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 500, + Unit: "", + }, + } + ret = append(ret, &gpuResourceData) + + mluResourceData := MLUResourceData{ + Name: ResourceTypeMLU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + } + ret = append(ret, &mluResourceData) + + storageResourceData := StorageResourceData{ + Name: ResourceTypeStorage, + Total: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + Available: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + } + ret = append(ret, &storageResourceData) + + memoryResourceData := MemoryResourceData{ + Name: ResourceTypeMemory, + Total: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + Available: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + } + ret = append(ret, &memoryResourceData) + + return &ret, nil +} + +// NPU +func hanwuji() (*[]ResourceData, error) { + var ret []ResourceData + + cpuResourceData := CPUResourceData{ + Name: ResourceTypeCPU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + } + ret = append(ret, &cpuResourceData) + + npuResourceData := NPUResourceData{ + Name: ResourceTypeNPU, + Total: UnitValue[int64]{ + Value: 600, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 500, + Unit: "", + }, + } + ret = append(ret, &npuResourceData) + + gpuResourceData := GPUResourceData{ + Name: ResourceTypeGPU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + } + ret = append(ret, &gpuResourceData) + + mluResourceData := MLUResourceData{ + Name: ResourceTypeMLU, + Total: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + Available: UnitValue[int64]{ + Value: 100, + Unit: "", + }, + } + ret = append(ret, &mluResourceData) + + storageResourceData := StorageResourceData{ + Name: ResourceTypeStorage, + Total: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + Available: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + } + ret = append(ret, &storageResourceData) + + memoryResourceData := MemoryResourceData{ + Name: ResourceTypeMemory, + Total: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + Available: UnitValue[float64]{ + Value: 100, + Unit: "GB", + }, + } + ret = append(ret, &memoryResourceData) + + return &ret, nil +} diff --git a/sdks/unifyops/unifyops.go b/sdks/unifyops/unifyops.go index 64bb7ad..7abd3d3 100644 --- a/sdks/unifyops/unifyops.go +++ b/sdks/unifyops/unifyops.go @@ -223,127 +223,53 @@ func (c *Client) GetMemoryData(node GetOneResourceDataReq) (*MemoryResourceData, } func (c *Client) GetIndicatorData(node GetOneResourceDataReq) (*[]ResourceData, error) { - //url, err := url.JoinPath(c.baseURL, "/cmdb/resApi/getIndicatorData") - //if err != nil { - // return nil, err - //} - //resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ - // Body: node, - //}) - //if err != nil { - // return nil, err - //} - // - //contType := resp.Header.Get("Content-Type") - //if strings.Contains(contType, myhttp.ContentTypeJSON) { - // - // var codeResp response[[]map[string]any] - // if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { - // return nil, fmt.Errorf("parsing response: %w", err) - // } - // - // if codeResp.Code != CORRECT_CODE { - // return nil, codeResp.ToError() - // } - // - // var ret []ResourceData - // for _, mp := range codeResp.Data { - // var data ResourceData - // err := serder.MapToObject(mp, &data) - // if err != nil { - // return nil, err - // } - // ret = append(ret, data) - // } - // - // return &ret, nil - //} - // - //return nil, fmt.Errorf("unknow response content type: %s", contType) - - return mockData() -} - -func mockData() (*[]ResourceData, error) { - var ret []ResourceData - - cpuResourceData := CPUResourceData{ - Name: ResourceTypeCPU, - Total: UnitValue[int64]{ - Value: 100, - Unit: "", - }, - Available: UnitValue[int64]{ - Value: 100, - Unit: "", - }, - } - ret = append(ret, &cpuResourceData) - - npuResourceData := NPUResourceData{ - Name: ResourceTypeNPU, - Total: UnitValue[int64]{ - Value: 100, - Unit: "", - }, - Available: UnitValue[int64]{ - Value: 100, - Unit: "", - }, - } - ret = append(ret, &npuResourceData) - - gpuResourceData := GPUResourceData{ - Name: ResourceTypeGPU, - Total: UnitValue[int64]{ - Value: 100, - Unit: "", - }, - Available: UnitValue[int64]{ - Value: 100, - Unit: "", - }, - } - ret = append(ret, &gpuResourceData) - - mluResourceData := MLUResourceData{ - Name: ResourceTypeMLU, - Total: UnitValue[int64]{ - Value: 100, - Unit: "", - }, - Available: UnitValue[int64]{ - Value: 100, - Unit: "", - }, + switch node.SlwNodeID { + case 1: + return shuguang() + case 2: + return modelarts() + case 3: + return hanwuji() } - ret = append(ret, &mluResourceData) - - storageResourceData := StorageResourceData{ - Name: ResourceTypeStorage, - Total: UnitValue[float64]{ - Value: 100, - Unit: "GB", - }, - Available: UnitValue[float64]{ - Value: 100, - Unit: "GB", - }, - } - ret = append(ret, &storageResourceData) - - memoryResourceData := MemoryResourceData{ - Name: ResourceTypeMemory, - Total: UnitValue[float64]{ - Value: 100, - Unit: "GB", - }, - Available: UnitValue[float64]{ - Value: 100, - Unit: "GB", - }, - } - ret = append(ret, &memoryResourceData) - - return &ret, nil + return nil, nil } + +//func (c *Client) GetIndicatorData(node GetOneResourceDataReq) (*[]ResourceData, error) { +//url, err := url.JoinPath(c.baseURL, "/cmdb/resApi/getIndicatorData") +//if err != nil { +// return nil, err +//} +//resp, err := myhttp.PostJSON(url, myhttp.RequestParam{ +// Body: node, +//}) +//if err != nil { +// return nil, err +//} +// +//contType := resp.Header.Get("Content-Type") +//if strings.Contains(contType, myhttp.ContentTypeJSON) { +// +// var codeResp response[[]map[string]any] +// if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { +// return nil, fmt.Errorf("parsing response: %w", err) +// } +// +// if codeResp.Code != CORRECT_CODE { +// return nil, codeResp.ToError() +// } +// +// var ret []ResourceData +// for _, mp := range codeResp.Data { +// var data ResourceData +// err := serder.MapToObject(mp, &data) +// if err != nil { +// return nil, err +// } +// ret = append(ret, data) +// } +// +// return &ret, nil +//} +// +//return nil, fmt.Errorf("unknow response content type: %s", contType) +//} diff --git a/utils/config/config.go b/utils/config/config.go index 0f4cb04..a755889 100644 --- a/utils/config/config.go +++ b/utils/config/config.go @@ -6,6 +6,7 @@ import ( "github.com/imdario/mergo" "os" "path/filepath" + "strings" ) // Load 加载配置文件 @@ -26,6 +27,18 @@ func DefaultLoad(modeulName string, defCfg interface{}) error { return err } + if strings.Contains(execPath, "scheduler") { + execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\scheduler\\common\\assets\\confs\\" + } + + if strings.Contains(execPath, "storage") { + execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\storage\\common\\assets\\confs\\" + } + + if strings.Contains(execPath, "gateway") { + execPath = "D:\\Work\\Codes\\new\\workspace\\workspace\\gateway\\assets\\confs\\" + } + // TODO 可以考虑根据环境变量读取不同的配置 configFilePath := filepath.Join(filepath.Dir(execPath), "..", "confs", fmt.Sprintf("%s.config.json", modeulName)) return Load(configFilePath, defCfg) From 9688e918dd71cd8c3e91f9e44a8475addeb8de17 Mon Sep 17 00:00:00 2001 From: JeshuaRen <1366929814@qq.com> Date: Thu, 20 Jun 2024 09:51:17 +0800 Subject: [PATCH 02/17] =?UTF-8?q?=E4=BF=AE=E6=94=B9EC=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E7=94=A8=E4=BA=8E=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.go | 95 +++++++++++++++++++++++++++++++++++++ sdks/storage/models.go | 2 +- utils/time2/test.go | 104 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 200 insertions(+), 1 deletion(-) create mode 100644 main.go create mode 100644 utils/time2/test.go diff --git a/main.go b/main.go new file mode 100644 index 0000000..b27a0c6 --- /dev/null +++ b/main.go @@ -0,0 +1,95 @@ +package main + +import ( + "fmt" + "io" + "os" + "strconv" + "time" + + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) + +func main() { + test1("http://121.36.5.116:32010") + // test2("http://127.0.0.1:7890") +} + +func test1(url string) { + cli := cdssdk.NewClient(&cdssdk.Config{ + URL: url, + }) + + openLen, err := strconv.ParseInt(os.Args[1], 10, 64) + if err != nil { + fmt.Println(err) + return + } + + readLen, err := strconv.ParseInt(os.Args[2], 10, 64) + if err != nil { + fmt.Println(err) + return + } + + partLen, err := strconv.ParseInt(os.Args[3], 10, 64) + if err != nil { + fmt.Println(err) + return + } + + startTime := time.Now() + obj, err := cli.Object().Download(cdssdk.ObjectDownload{ + UserID: 1, + ObjectID: 470790, + Offset: 0, + Length: &openLen, + PartSize: partLen, + }) + if err != nil { + fmt.Println(err) + return + } + fmt.Printf("Open time: %v\n", time.Since(startTime)) + + startTime = time.Now() + buf := make([]byte, readLen) + _, err = io.ReadFull(obj.File, buf) + fmt.Printf("Read time: %v\n", time.Since(startTime)) + if err != nil { + fmt.Println(err) + return + } + + startTime = time.Now() + obj.File.Close() + fmt.Printf("Close time: %v\n", time.Since(startTime)) +} + +func test2(url string) { + cli := cdssdk.NewClient(&cdssdk.Config{ + URL: url, + }) + + obj, err := cli.Object().Download(cdssdk.ObjectDownload{ + UserID: 1, + ObjectID: 27151, + Offset: 0, + PartSize: 100000000, + // Length: &openLen, + }) + + if err != nil { + fmt.Println(err) + return + } + + f, err := os.Create("test.txt") + if err != nil { + fmt.Println(err) + return + } + + io.Copy(f, obj.File) +} + diff --git a/sdks/storage/models.go b/sdks/storage/models.go index b1dad0c..20e47f6 100644 --- a/sdks/storage/models.go +++ b/sdks/storage/models.go @@ -71,7 +71,7 @@ func (b *RepRedundancy) Value() (driver.Value, error) { return serder.ObjectToJSONEx[Redundancy](b) } -var DefaultECRedundancy = *NewECRedundancy(2, 3, 1024*1024*5) +var DefaultECRedundancy = *NewECRedundancy(3, 6, 1024*1024*5) type ECRedundancy struct { serder.Metadata `union:"ec"` diff --git a/utils/time2/test.go b/utils/time2/test.go new file mode 100644 index 0000000..07be5b2 --- /dev/null +++ b/utils/time2/test.go @@ -0,0 +1,104 @@ +package time2 + +import ( + "fmt" + "path" + "runtime" + "strings" + "time" +) + +type Measurement struct { + startTime time.Time + lastPointTime time.Time + printer func(string) + on bool + title string +} + +func NewMeasurement(printer func(string)) Measurement { + return Measurement{ + printer: printer, + } +} + +func (m *Measurement) Begin(on bool, title ...string) { + if m == nil { + return + } + + m.on = on + m.title = strings.Join(title, ".") + + if on { + m.startTime = time.Now() + m.lastPointTime = m.startTime + + _, file, line, ok := runtime.Caller(1) + + titlePart := "" + if m.title != "" { + titlePart = fmt.Sprintf(":%s", m.title) + } + + if ok { + m.printer(fmt.Sprintf("[begin%v]%v:%v", titlePart, path.Base(file), line)) + } else { + m.printer(fmt.Sprintf("[begin%v]unknown point", titlePart)) + } + } +} + +func (m *Measurement) Point(head ...string) { + if m == nil { + return + } + + if m.on { + m.printer(m.makePointString(strings.Join(head, "."))) + } +} + +func (m *Measurement) makePointString(head string) string { + last := m.lastPointTime + now := time.Now() + m.lastPointTime = now + + _, file, line, ok := runtime.Caller(2) + + prefixCont := "" + + if m.title != "" { + prefixCont = m.title + } + + if head != "" { + if prefixCont == "" { + prefixCont = head + } else { + prefixCont = fmt.Sprintf("%s.%s", prefixCont, head) + } + } + + prefixPart := "" + if prefixCont != "" { + prefixPart = fmt.Sprintf("[%s]", prefixCont) + } + + if ok { + return fmt.Sprintf("%v%v:%v@%v(%v)", prefixPart, path.Base(file), line, now.Sub(last), now.Sub(m.startTime)) + } + + return fmt.Sprintf("%vunknown point@%v(%v)", prefixPart, now.Sub(last), now.Sub(m.startTime)) +} + +func (m *Measurement) End(head ...string) { + if m == nil { + return + } + + if m.on { + m.printer(fmt.Sprintf("[end]%v\n", m.makePointString(strings.Join(head, ".")))) + } +} + From 0270dfd5dd9aabac5386d9978357016c0c8a074c Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Fri, 28 Jun 2024 15:03:45 +0800 Subject: [PATCH 03/17] =?UTF-8?q?=E8=AF=A6=E7=BB=86=E6=89=93=E5=8D=B0?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E4=B8=8B=E8=BD=BD=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/storage/object.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdks/storage/object.go b/sdks/storage/object.go index 86e7fdb..abce3cf 100644 --- a/sdks/storage/object.go +++ b/sdks/storage/object.go @@ -138,7 +138,10 @@ func (c *ObjectService) Download(req ObjectDownload) (*DownloadingObject, error) return nil, err } + startTime := time.Now() file, err := files.MoveNext() + endTime := time.Now() + fmt.Printf("files.MoveNext(), spend time: %.0f s", endTime.Sub(startTime).Seconds()) if err == iterator.ErrNoMoreItem { return nil, fmt.Errorf("no file found in response") } From 8dc08e048626461cdc0e6656928d9412c155e098 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Thu, 11 Jul 2024 16:12:12 +0800 Subject: [PATCH 04/17] =?UTF-8?q?=E9=80=82=E9=85=8D=E6=8E=A8=E7=90=86?= =?UTF-8?q?=E6=89=A9=E5=AE=B9=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/scheduler/models.go | 29 ++++++++++++++++++++--------- sdks/scheduler/response.go | 26 ++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 9 deletions(-) create mode 100644 sdks/scheduler/response.go diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 3bb25aa..9e7ff75 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -26,6 +26,10 @@ type ImageID int64 // 计算中心ID type CCID int64 +type ModelID string +type NodeID int64 +type Address string + type JobSetInfo struct { Jobs []JobInfo `json:"jobs"` } @@ -71,20 +75,27 @@ type DataReturnJobInfo struct { type MultiInstanceJobInfo struct { serder.Metadata `union:"MultiInstance"` JobInfoBase - Type string `json:"type"` - Files JobFilesInfo `json:"files"` - Runtime JobRuntimeInfo `json:"runtime"` - Resources JobResourcesInfo `json:"resources"` + Type string `json:"type"` + Files JobFilesInfo `json:"files"` + Runtime JobRuntimeInfo `json:"runtime"` + Resources JobResourcesInfo `json:"resources"` + ModelJobInfo ModelJobInfo `json:"modelJobInfo"` +} + +type ModelJobInfo struct { + Type string `json:"type"` + ModelID string `json:"modelID"` } type InstanceJobInfo struct { serder.Metadata `union:"Instance"` JobInfoBase - Type string `json:"type"` - LocalJobID string `json:"multiInstJobID"` - Files JobFilesInfo `json:"files"` - Runtime JobRuntimeInfo `json:"runtime"` - Resources JobResourcesInfo `json:"resources"` + Type string `json:"type"` + LocalJobID string `json:"multiInstJobID"` + Files JobFilesInfo `json:"files"` + Runtime JobRuntimeInfo `json:"runtime"` + Resources JobResourcesInfo `json:"resources"` + ModelJobInfo ModelJobInfo `json:"modelJobInfo"` } type JobFilesInfo struct { diff --git a/sdks/scheduler/response.go b/sdks/scheduler/response.go new file mode 100644 index 0000000..4c8db4c --- /dev/null +++ b/sdks/scheduler/response.go @@ -0,0 +1,26 @@ +package schsdk + +// 这个结构体无任何字段,但实现了Noop,每种MessageBody都要内嵌这个结构体 +type MessageBodyBase struct{} + +// 此处的receiver是指针 +func (b *MessageBodyBase) Noop() {} + +type AvailableNodesResp struct { + MessageBodyBase + AvailableNodes map[ModelID]AvailableNodes `json:"allNode"` +} + +type AvailableNodes struct { + MessageBodyBase + //ModelID ModelID `json:"modelID"` + JobID JobID `json:"jobID"` + Nodes []NodeInfo `json:"nodes"` +} + +type NodeInfo struct { + MessageBodyBase + InstanceID JobID `json:"instanceID"` + NodeID NodeID `json:"nodeID"` + Address Address `json:"address"` +} From 7f712b8b985e2304356f11804b4ca2c6d1fef466 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Thu, 18 Jul 2024 17:19:42 +0800 Subject: [PATCH 05/17] =?UTF-8?q?=E5=AE=8C=E6=88=90=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=99=A8=E6=8E=A5=E5=8F=A3=E4=BC=98=E5=8C=96=E5=8F=8A=E9=98=BF?= =?UTF-8?q?=E9=87=8C=E4=BA=91=E9=80=82=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/http/http.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/utils/http/http.go b/utils/http/http.go index 55f5a3b..d4deb41 100644 --- a/utils/http/http.go +++ b/utils/http/http.go @@ -97,6 +97,45 @@ func PostJSON(url string, param RequestParam) (*http.Response, error) { return defaultClient.Do(req) } +func PostJSONRow(url string, param RequestParam) (*http.Response, error) { + req, err := http.NewRequest(http.MethodPost, url, nil) + if err != nil { + return nil, err + } + + if err = prepareQuery(req, param.Query); err != nil { + return nil, err + } + + if err = prepareHeader(req, param.Header); err != nil { + return nil, err + } + + //if err = prepareJSONBody(req, param.Body); err != nil { + // return nil, err + //} + + setHeader(req.Header, "Content-Type", ContentTypeJSON) + + if param.Body == nil { + return nil, nil + } + + switch body := param.Body.(type) { + case nil: + case string: + req.ContentLength = int64(len(body)) + req.Body = io.NopCloser(bytes.NewReader([]byte(body))) + case []byte: + req.ContentLength = int64(len(body)) + req.Body = io.NopCloser(bytes.NewReader(body)) + default: + return nil, fmt.Errorf("body error") + } + + return defaultClient.Do(req) +} + func PostForm(url string, param RequestParam) (*http.Response, error) { req, err := http.NewRequest(http.MethodPost, url, nil) if err != nil { From 1e237b80fbf474c0b2c3065a27f059d7e9a768e0 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Fri, 26 Jul 2024 16:47:54 +0800 Subject: [PATCH 06/17] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=99=A8=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E6=89=A9=E5=AE=B9=E8=84=9A=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/scheduler/models.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 9e7ff75..1b19ba6 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -233,3 +233,8 @@ const ( JobDataInEnv = "SCH_DATA_IN" JobDataOutEnv = "SCH_DATA_OUT" ) + +type Rclone struct { + CDSRcloneID string `json:"cds_rcloneID"` + CDSRcloneConfigID string `json:"cds_rcloneConfigID"` +} From 1855a1f2f8fe3ac71d8c4049364d28e1571919c3 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Tue, 30 Jul 2024 16:39:05 +0800 Subject: [PATCH 07/17] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=A8=A1=E5=9E=8B?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BB=BB=E5=8A=A1=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/scheduler/models.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 1b19ba6..024dc9c 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -43,6 +43,7 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( (*DataReturnJobInfo)(nil), (*MultiInstanceJobInfo)(nil), (*InstanceJobInfo)(nil), + (*UpdateMultiInstanceJobInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") @@ -82,6 +83,17 @@ type MultiInstanceJobInfo struct { ModelJobInfo ModelJobInfo `json:"modelJobInfo"` } +type UpdateMultiInstanceJobInfo struct { + serder.Metadata `union:"UpdateModel"` + JobInfoBase + Type string `json:"type"` + Files JobFilesInfo `json:"files"` + Runtime JobRuntimeInfo `json:"runtime"` + MultiInstanceJobSetID JobSetID `json:"multiInstanceJobSetID"` + InstanceIDs []JobID `json:"instanceIDs"` + UpdateStrategy string `json:"updateStrategy"` +} + type ModelJobInfo struct { Type string `json:"type"` ModelID string `json:"modelID"` From c44dc0225ac038aa04950bdffa4cc8f45b9e90bb Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Fri, 9 Aug 2024 17:42:11 +0800 Subject: [PATCH 08/17] =?UTF-8?q?1=E3=80=81=E6=96=B0=E5=A2=9E=E5=A2=9E?= =?UTF-8?q?=E9=87=8F=E6=A8=A1=E5=9E=8B=E6=9B=B4=E6=96=B0=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=202=E3=80=81=E5=AE=8C=E6=88=90=E6=89=A7=E8=A1=8C=E5=99=A8?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=9C=BA=E5=88=B6=E4=B8=8E=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=8E=A8=E9=80=81=E7=AD=89=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/actor/actor.go | 4 +- pkgs/async/unbound_channel.go | 89 +++++++++++++++++ pkgs/distlock/internal/acquire_actor.go | 2 +- pkgs/future/future.go | 23 ++++- pkgs/future/ready.go | 87 +++++++++++++++++ pkgs/future/set_value_future.go | 125 ++++++++++++------------ pkgs/future/set_void_future.go | 27 ++--- sdks/scheduler/models.go | 4 +- 8 files changed, 277 insertions(+), 84 deletions(-) create mode 100644 pkgs/async/unbound_channel.go create mode 100644 pkgs/future/ready.go diff --git a/pkgs/actor/actor.go b/pkgs/actor/actor.go index 2df0b71..5f4994e 100644 --- a/pkgs/actor/actor.go +++ b/pkgs/actor/actor.go @@ -126,7 +126,7 @@ func WaitValue[T any](ctx context.Context, c *CommandChannel, cmd func() (T, err fut.SetComplete(val, err) }) - return fut.WaitValue(ctx) + return fut.Wait(ctx) } func WaitValue2[T1 any, T2 any](ctx context.Context, c *CommandChannel, cmd func() (T1, T2, error)) (T1, T2, error) { @@ -137,5 +137,5 @@ func WaitValue2[T1 any, T2 any](ctx context.Context, c *CommandChannel, cmd func fut.SetComplete(val1, val2, err) }) - return fut.WaitValue(ctx) + return fut.Wait(ctx) } diff --git a/pkgs/async/unbound_channel.go b/pkgs/async/unbound_channel.go new file mode 100644 index 0000000..5e2800d --- /dev/null +++ b/pkgs/async/unbound_channel.go @@ -0,0 +1,89 @@ +package async + +import ( + "container/list" + "errors" + "gitlink.org.cn/cloudream/common/pkgs/future" + "sync" +) + +var ErrChannelClosed = errors.New("channel is closed") + +type UnboundChannel[T any] struct { + values *list.List + waiters []*future.SetValueFuture[T] + lock sync.Mutex + err error +} + +func NewUnboundChannel[T any]() *UnboundChannel[T] { + return &UnboundChannel[T]{ + values: list.New(), + } +} + +func (c *UnboundChannel[T]) Error() error { + return c.err +} + +func (c *UnboundChannel[T]) Send(val T) error { + c.lock.Lock() + defer c.lock.Unlock() + + if c.err != nil { + return c.err + } + + c.values.PushBack(val) + + for len(c.waiters) > 0 && c.values.Len() > 0 { + waiter := c.waiters[0] + waiter.SetValue(c.values.Front().Value.(T)) + c.values.Remove(c.values.Front()) + c.waiters = c.waiters[1:] + return nil + } + + return nil +} + +func (c *UnboundChannel[T]) Receive() future.Future1[T] { + c.lock.Lock() + defer c.lock.Unlock() + + if c.err != nil { + return future.NewReadyError1[T](c.err) + } + + if c.values.Len() > 0 { + ret := c.values.Front().Value.(T) + c.values.Remove(c.values.Front()) + return future.NewReadyValue1[T](ret) + } + + fut := future.NewSetValue[T]() + c.waiters = append(c.waiters, fut) + + return fut +} + +func (c *UnboundChannel[T]) Close() { + c.CloseWithError(ErrChannelClosed) +} + +func (c *UnboundChannel[T]) CloseWithError(err error) { + c.lock.Lock() + defer c.lock.Unlock() + + if c.err != nil { + return + } + c.err = err + + for i := 0; i < len(c.waiters); i++ { + c.waiters[i].SetError(c.err) + } + + c.waiters = nil + c.values = nil +} diff --git a/pkgs/distlock/internal/acquire_actor.go b/pkgs/distlock/internal/acquire_actor.go index d53f199..324da5f 100644 --- a/pkgs/distlock/internal/acquire_actor.go +++ b/pkgs/distlock/internal/acquire_actor.go @@ -93,7 +93,7 @@ func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, er }() // 此处不能直接用ctx去等Callback,原因是Wait超时不代表锁没有获取到,这会导致锁泄露。 - return info.Callback.WaitValue(context.Background()) + return info.Callback.Wait(context.Background()) } // TryAcquireNow 重试一下内部还没有成功的锁请求。不会阻塞调用者 diff --git a/pkgs/future/future.go b/pkgs/future/future.go index 705864f..b964e2f 100644 --- a/pkgs/future/future.go +++ b/pkgs/future/future.go @@ -6,18 +6,31 @@ import ( ) var ErrContextCancelled = fmt.Errorf("context cancelled") +var ErrCompleted = fmt.Errorf("context cancelled") type Future interface { - Error() error IsComplete() bool + Chan() <-chan error + Wait(ctx context.Context) error } -type ValueFuture[T any] interface { - Future +type ChanValue1[T any] struct { + Value T + Err error +} + +type ChanValue2[T1 any, T2 any] struct { + Value1 T1 + Value2 T2 + Err error +} + +type Future1[T any] interface { + IsComplete() bool - Value() T + Chan() <-chan ChanValue1[T] - WaitValue(ctx context.Context) (T, error) + Wait(ctx context.Context) (T, error) } diff --git a/pkgs/future/ready.go b/pkgs/future/ready.go new file mode 100644 index 0000000..9c1e766 --- /dev/null +++ b/pkgs/future/ready.go @@ -0,0 +1,87 @@ +package future + +import "context" + +type Ready struct { + ch chan error +} + +func NewReady(err error) *Ready { + ch := make(chan error, 1) + ch <- err + close(ch) + + return &Ready{ + ch: ch, + } +} + +func (f *Ready) IsComplete() bool { + return true +} + +func (f *Ready) Wait(ctx context.Context) error { + select { + case v, ok := <-f.ch: + if !ok { + return ErrCompleted + } + return v + + case <-ctx.Done(): + return ErrContextCancelled + } +} + +func (f *Ready) Chan() <-chan error { + return f.ch +} + +type Ready1[T any] struct { + ch chan ChanValue1[T] +} + +func NewReady1[T any](val T, err error) *Ready1[T] { + ch := make(chan ChanValue1[T], 1) + ch <- ChanValue1[T]{ + Err: err, + Value: val, + } + close(ch) + + return &Ready1[T]{ + ch: ch, + } +} + +func NewReadyValue1[T any](val T) *Ready1[T] { + return NewReady1[T](val, nil) +} + +func NewReadyError1[T any](err error) *Ready1[T] { + var ret T + return NewReady1[T](ret, err) +} + +func (f *Ready1[T]) IsComplete() bool { + return true +} + +func (f *Ready1[T]) Wait(ctx context.Context) (T, error) { + select { + case cv, ok := <-f.ch: + if !ok { + var ret T + return ret, cv.Err + } + return cv.Value, cv.Err + + case <-ctx.Done(): + var ret T + return ret, ErrContextCancelled + } +} + +func (f *Ready1[T]) Chan() <-chan ChanValue1[T] { + return f.ch +} diff --git a/pkgs/future/set_value_future.go b/pkgs/future/set_value_future.go index bf7f94a..129e544 100644 --- a/pkgs/future/set_value_future.go +++ b/pkgs/future/set_value_future.go @@ -6,72 +6,76 @@ import ( ) type SetValueFuture[T any] struct { - value T - err error isCompleted bool - completeChan chan any + ch chan ChanValue1[T] completeOnce sync.Once } func NewSetValue[T any]() *SetValueFuture[T] { return &SetValueFuture[T]{ - completeChan: make(chan any), + ch: make(chan ChanValue1[T], 1), } } func (f *SetValueFuture[T]) SetComplete(val T, err error) { f.completeOnce.Do(func() { - f.value = val - f.err = err + f.ch <- ChanValue1[T]{ + Err: err, + Value: val, + } + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } func (f *SetValueFuture[T]) SetValue(val T) { f.completeOnce.Do(func() { - f.value = val + f.ch <- ChanValue1[T]{ + Value: val, + } + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } func (f *SetValueFuture[T]) SetError(err error) { f.completeOnce.Do(func() { - f.err = err + f.ch <- ChanValue1[T]{ + Err: err, + } + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } -func (f *SetValueFuture[T]) Error() error { - return f.err -} - -func (f *SetValueFuture[T]) Value() T { - return f.value -} - func (f *SetValueFuture[T]) IsComplete() bool { return f.isCompleted } -// 等待直到Complete或者ctx被取消。 -// 注:返回ErrContextCancelled不代表产生结果的过程没有执行过,甚至不代表Future没有Complete -func (f *SetValueFuture[T]) Wait(ctx context.Context) error { - select { - case <-f.completeChan: - return f.err - - case <-ctx.Done(): - return ErrContextCancelled - } +func (f *SetValueFuture[T]) Chan() <-chan ChanValue1[T] { + return f.ch } -func (f *SetValueFuture[T]) WaitValue(ctx context.Context) (T, error) { +// 等待直到Complete或者ctx被取消。 +// 注:返回ErrContextCancelled不代表产生结果的过程没有执行过,甚至不代表Future没有Complete +//func (f *SetValueFuture[T]) Wait(ctx context.Context) error { +// select { +// case <-f.ch: +// return f.err +// +// case <-ctx.Done(): +// return ErrContextCancelled +// } +//} + +func (f *SetValueFuture[T]) Wait(ctx context.Context) (T, error) { select { - case <-f.completeChan: - return f.value, f.err + case cv, ok := <-f.ch: + if !ok { + var ret T + return ret, cv.Err + } + return cv.Value, cv.Err case <-ctx.Done(): var ret T @@ -80,68 +84,61 @@ func (f *SetValueFuture[T]) WaitValue(ctx context.Context) (T, error) { } type SetValueFuture2[T1 any, T2 any] struct { - value1 T1 - value2 T2 - err error isCompleted bool - completeChan chan any + ch chan ChanValue2[T1, T2] completeOnce sync.Once } func NewSetValue2[T1 any, T2 any]() *SetValueFuture2[T1, T2] { return &SetValueFuture2[T1, T2]{ - completeChan: make(chan any), + ch: make(chan ChanValue2[T1, T2], 1), } } func (f *SetValueFuture2[T1, T2]) SetComplete(val1 T1, val2 T2, err error) { f.completeOnce.Do(func() { - f.value1 = val1 - f.value2 = val2 - f.err = err + f.ch <- ChanValue2[T1, T2]{ + Value1: val1, + Value2: val2, + Err: err, + } + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } func (f *SetValueFuture2[T1, T2]) SetValue(val1 T1, val2 T2) { f.completeOnce.Do(func() { - f.value1 = val1 - f.value2 = val2 + f.ch <- ChanValue2[T1, T2]{ + Value1: val1, + Value2: val2, + } + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } func (f *SetValueFuture2[T1, T2]) SetError(err error) { f.completeOnce.Do(func() { - f.err = err + f.ch <- ChanValue2[T1, T2]{ + Err: err, + } + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } -func (f *SetValueFuture2[T1, T2]) Error() error { - return f.err -} - -func (f *SetValueFuture2[T1, T2]) Value() (T1, T2) { - return f.value1, f.value2 -} - func (f *SetValueFuture2[T1, T2]) IsComplete() bool { return f.isCompleted } -func (f *SetValueFuture2[T1, T2]) Wait() error { - <-f.completeChan - return f.err -} - -func (f *SetValueFuture2[T1, T2]) WaitValue(ctx context.Context) (T1, T2, error) { +func (f *SetValueFuture2[T1, T2]) Wait(ctx context.Context) (T1, T2, error) { select { - case <-f.completeChan: - return f.value1, f.value2, f.err + case cv, ok := <-f.ch: + if !ok { + return cv.Value1, cv.Value2, cv.Err + } + return cv.Value1, cv.Value2, cv.Err case <-ctx.Done(): var ret1 T1 @@ -149,3 +146,7 @@ func (f *SetValueFuture2[T1, T2]) WaitValue(ctx context.Context) (T1, T2, error) return ret1, ret2, ErrContextCancelled } } + +func (f *SetValueFuture2[T1, T2]) Chan() <-chan ChanValue2[T1, T2] { + return f.ch +} diff --git a/pkgs/future/set_void_future.go b/pkgs/future/set_void_future.go index ed96a65..e9f80bd 100644 --- a/pkgs/future/set_void_future.go +++ b/pkgs/future/set_void_future.go @@ -6,47 +6,50 @@ import ( ) type SetVoidFuture struct { - err error isCompleted bool - completeChan chan any + ch chan error completeOnce sync.Once } func NewSetVoid() *SetVoidFuture { return &SetVoidFuture{ - completeChan: make(chan any), + ch: make(chan error, 1), } } func (f *SetVoidFuture) SetVoid() { f.completeOnce.Do(func() { + f.ch <- nil + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } func (f *SetVoidFuture) SetError(err error) { f.completeOnce.Do(func() { - f.err = err + f.ch <- err + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } -func (f *SetVoidFuture) Error() error { - return f.err -} - func (f *SetVoidFuture) IsComplete() bool { return f.isCompleted } func (f *SetVoidFuture) Wait(ctx context.Context) error { select { - case <-f.completeChan: - return f.err + case v, ok := <-f.ch: + if !ok { + return ErrCompleted + } + return v case <-ctx.Done(): return ErrContextCancelled } } + +func (f *SetVoidFuture) Chan() <-chan error { + return f.ch +} diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 024dc9c..f990dc3 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -90,8 +90,8 @@ type UpdateMultiInstanceJobInfo struct { Files JobFilesInfo `json:"files"` Runtime JobRuntimeInfo `json:"runtime"` MultiInstanceJobSetID JobSetID `json:"multiInstanceJobSetID"` - InstanceIDs []JobID `json:"instanceIDs"` - UpdateStrategy string `json:"updateStrategy"` + //InstanceIDs []JobID `json:"instanceIDs"` + UpdateStrategy string `json:"updateStrategy"` } type ModelJobInfo struct { From 657dec1d0bbe25c85e5672f86179302c0cce9c76 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Fri, 30 Aug 2024 17:07:10 +0800 Subject: [PATCH 09/17] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=8E=B7=E5=8F=96?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B=E5=88=97=E8=A1=A8=E7=AD=89=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/scheduler/modeljob.go | 145 +++++++++++++++++++++++++++++++++++++ sdks/scheduler/models.go | 12 ++- sdks/scheduler/response.go | 26 ------- 3 files changed, 153 insertions(+), 30 deletions(-) create mode 100644 sdks/scheduler/modeljob.go delete mode 100644 sdks/scheduler/response.go diff --git a/sdks/scheduler/modeljob.go b/sdks/scheduler/modeljob.go new file mode 100644 index 0000000..12545e9 --- /dev/null +++ b/sdks/scheduler/modeljob.go @@ -0,0 +1,145 @@ +package schsdk + +import ( + "fmt" + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/mq" + myhttp "gitlink.org.cn/cloudream/common/utils/http" + "gitlink.org.cn/cloudream/common/utils/serder" + "net/url" + "strings" +) + +// 这个结构体无任何字段,但实现了Noop,每种MessageBody都要内嵌这个结构体 +type MessageBodyBase struct{} + +// 此处的receiver是指针 +func (b *MessageBodyBase) Noop() {} + +type RunningModelResp struct { + MessageBodyBase + RunningModels map[string]RunningModelInfo `json:"allNode"` +} + +type NodeInfo struct { + MessageBodyBase + InstanceID JobID `json:"instanceID"` + //NodeID NodeID `json:"nodeID"` + Address Address `json:"address"` + Status string `json:"status"` +} + +type RunningModelInfo struct { + MessageBodyBase + JobSetID JobSetID `json:"jobSetID"` + ModelID ModelID `json:"modelID"` + ModelName ModelName `json:"modelName"` + CustomModelName ModelName `json:"customModelName"` + Nodes []NodeInfo `json:"nodes"` +} + +type ECSNodeRunningInfoReq struct { + mq.MessageBodyBase + CustomModelName ModelName `form:"customModelName" json:"customModelName" binding:"required"` + ModelID ModelID `form:"modelID" json:"modelID" binding:"required"` +} + +type ECSNodeRunningInfoResp struct { + MessageBodyBase + NodeUsageRateInfos []NodeUsageRateInfo `json:"nodeUsageRateInfos"` +} + +func NewECSNodeRunningInfoResp(nodeUsageRateInfos []NodeUsageRateInfo) *ECSNodeRunningInfoResp { + return &ECSNodeRunningInfoResp{ + NodeUsageRateInfos: nodeUsageRateInfos, + } +} + +type NodeUsageRateInfo struct { + MessageBodyBase + InstanceID JobID `json:"instanceID"` + Address Address `json:"address"` + GPURate []UsageRate `json:"GPURate"` + AccCardRate []UsageRate `json:"AccCardRate"` +} + +type UsageRate struct { + Timestamp string `json:"timestamp"` + Number string `json:"number"` +} + +const ( + RunStatus = "run" + StopStatus = "stop" + FineTuning = "finetuning" + + CreateECS = "create" + RunECS = "run" + PauseECS = "pause" + DestroyECS = "destroy" + OperateServer = "operate" +) + +type QueryAllModelsReq struct { + UserID int64 `form:"userID" json:"userID"` +} + +func (c *Client) QueryAllModels(req QueryAllModelsReq) (*RunningModelResp, error) { + url, err := url.JoinPath(c.baseURL, "/job/queryRunningModels") + if err != nil { + return nil, err + } + + resp, err := myhttp.GetJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var codeResp response[RunningModelResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} + +func (c *Client) ECSNodeRunningInfo(req ECSNodeRunningInfoReq) (*ECSNodeRunningInfoResp, error) { + url, err := url.JoinPath(c.baseURL, "/job/getECSNodeRunningInfo") + if err != nil { + return nil, err + } + + resp, err := myhttp.GetJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var codeResp response[ECSNodeRunningInfoResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index f990dc3..2313866 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -27,6 +27,7 @@ type ImageID int64 type CCID int64 type ModelID string +type ModelName string type NodeID int64 type Address string @@ -90,13 +91,16 @@ type UpdateMultiInstanceJobInfo struct { Files JobFilesInfo `json:"files"` Runtime JobRuntimeInfo `json:"runtime"` MultiInstanceJobSetID JobSetID `json:"multiInstanceJobSetID"` - //InstanceIDs []JobID `json:"instanceIDs"` - UpdateStrategy string `json:"updateStrategy"` + UpdateType string `json:"updateType"` + SubJobs []JobID `json:"subJobs"` + Operate string `json:"operate"` } type ModelJobInfo struct { - Type string `json:"type"` - ModelID string `json:"modelID"` + Type string `json:"type"` + ModelID ModelID `json:"modelID"` + CustomModelName ModelName `json:"customModelName"` + Command string `json:"command"` } type InstanceJobInfo struct { diff --git a/sdks/scheduler/response.go b/sdks/scheduler/response.go deleted file mode 100644 index 4c8db4c..0000000 --- a/sdks/scheduler/response.go +++ /dev/null @@ -1,26 +0,0 @@ -package schsdk - -// 这个结构体无任何字段,但实现了Noop,每种MessageBody都要内嵌这个结构体 -type MessageBodyBase struct{} - -// 此处的receiver是指针 -func (b *MessageBodyBase) Noop() {} - -type AvailableNodesResp struct { - MessageBodyBase - AvailableNodes map[ModelID]AvailableNodes `json:"allNode"` -} - -type AvailableNodes struct { - MessageBodyBase - //ModelID ModelID `json:"modelID"` - JobID JobID `json:"jobID"` - Nodes []NodeInfo `json:"nodes"` -} - -type NodeInfo struct { - MessageBodyBase - InstanceID JobID `json:"instanceID"` - NodeID NodeID `json:"nodeID"` - Address Address `json:"address"` -} From 6a36918bf5f896b54df410df4203f89aa1a4d4b5 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Tue, 3 Sep 2024 11:14:14 +0800 Subject: [PATCH 10/17] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=89=80=E6=9C=89=E6=A8=A1=E5=9E=8B=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/scheduler/modeljob.go | 40 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/sdks/scheduler/modeljob.go b/sdks/scheduler/modeljob.go index 12545e9..79ebc7a 100644 --- a/sdks/scheduler/modeljob.go +++ b/sdks/scheduler/modeljob.go @@ -6,6 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" myhttp "gitlink.org.cn/cloudream/common/utils/http" "gitlink.org.cn/cloudream/common/utils/serder" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" "net/url" "strings" ) @@ -21,6 +22,11 @@ type RunningModelResp struct { RunningModels map[string]RunningModelInfo `json:"allNode"` } +type AllModelResp struct { + MessageBodyBase + AllModels []schmod.Models `json:"allModels"` +} + type NodeInfo struct { MessageBodyBase InstanceID JobID `json:"instanceID"` @@ -80,11 +86,11 @@ const ( OperateServer = "operate" ) -type QueryAllModelsReq struct { +type QueryRunningModelsReq struct { UserID int64 `form:"userID" json:"userID"` } -func (c *Client) QueryAllModels(req QueryAllModelsReq) (*RunningModelResp, error) { +func (c *Client) QueryRunningModels(req QueryRunningModelsReq) (*RunningModelResp, error) { url, err := url.JoinPath(c.baseURL, "/job/queryRunningModels") if err != nil { return nil, err @@ -114,6 +120,36 @@ func (c *Client) QueryAllModels(req QueryAllModelsReq) (*RunningModelResp, error return nil, fmt.Errorf("unknow response content type: %s", contType) } +func (c *Client) QueryAllModels(req QueryRunningModelsReq) (*AllModelResp, error) { + url, err := url.JoinPath(c.baseURL, "/job/getAllModels") + if err != nil { + return nil, err + } + + resp, err := myhttp.GetJSON(url, myhttp.RequestParam{ + Body: req, + }) + if err != nil { + return nil, err + } + + contType := resp.Header.Get("Content-Type") + if strings.Contains(contType, myhttp.ContentTypeJSON) { + var codeResp response[AllModelResp] + if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { + return nil, fmt.Errorf("parsing response: %w", err) + } + + if codeResp.Code == errorcode.OK { + return &codeResp.Data, nil + } + + return nil, codeResp.ToError() + } + + return nil, fmt.Errorf("unknow response content type: %s", contType) +} + func (c *Client) ECSNodeRunningInfo(req ECSNodeRunningInfoReq) (*ECSNodeRunningInfoResp, error) { url, err := url.JoinPath(c.baseURL, "/job/getECSNodeRunningInfo") if err != nil { From 67b683f6fa24d65ccb7c5ae532bb7c3e8a02c366 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Tue, 3 Sep 2024 16:51:25 +0800 Subject: [PATCH 11/17] =?UTF-8?q?=E6=96=B0=E5=A2=9Efinetuning=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/scheduler/models.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 2313866..ccb2ec6 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -45,6 +45,7 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( (*MultiInstanceJobInfo)(nil), (*InstanceJobInfo)(nil), (*UpdateMultiInstanceJobInfo)(nil), + (*FinetuningJobInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") @@ -66,6 +67,16 @@ type NormalJobInfo struct { Services JobServicesInfo `json:"services"` } +type FinetuningJobInfo struct { + serder.Metadata `union:"Finetuning"` + JobInfoBase + Type string `json:"type"` + Files JobFilesInfo `json:"files"` + Runtime JobRuntimeInfo `json:"runtime"` + Resources JobResourcesInfo `json:"resources"` + Services JobServicesInfo `json:"services"` +} + type DataReturnJobInfo struct { serder.Metadata `union:"DataReturn"` JobInfoBase From 83d1e963828fa8e380cdacbeb12b4ffda68aeb91 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Thu, 5 Sep 2024 16:11:57 +0800 Subject: [PATCH 12/17] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E6=89=80=E6=9C=89=E6=A8=A1=E5=9E=8B=E7=AD=89=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/scheduler/modeljob.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/sdks/scheduler/modeljob.go b/sdks/scheduler/modeljob.go index 79ebc7a..b82915e 100644 --- a/sdks/scheduler/modeljob.go +++ b/sdks/scheduler/modeljob.go @@ -6,7 +6,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/mq" myhttp "gitlink.org.cn/cloudream/common/utils/http" "gitlink.org.cn/cloudream/common/utils/serder" - schmod "gitlink.org.cn/cloudream/scheduler/common/models" "net/url" "strings" ) @@ -24,7 +23,12 @@ type RunningModelResp struct { type AllModelResp struct { MessageBodyBase - AllModels []schmod.Models `json:"allModels"` + AllModels []Models `json:"allModels"` +} + +type Models struct { + ModelID ModelID `json:"modelID"` + ModelName ModelName `json:"modelName"` } type NodeInfo struct { @@ -75,8 +79,6 @@ type UsageRate struct { } const ( - RunStatus = "run" - StopStatus = "stop" FineTuning = "finetuning" CreateECS = "create" From a8b5daeac9d366144e138eccebaffd50d13f0022 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Wed, 11 Sep 2024 13:56:55 +0800 Subject: [PATCH 13/17] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=9B=99=E5=85=89?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E5=88=9B=E5=BB=BA=E7=AD=89=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/scheduler/modeljob.go | 3 +++ sdks/scheduler/models.go | 6 ++++-- utils/http/http.go | 21 +++++++++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/sdks/scheduler/modeljob.go b/sdks/scheduler/modeljob.go index b82915e..f9afeaf 100644 --- a/sdks/scheduler/modeljob.go +++ b/sdks/scheduler/modeljob.go @@ -86,6 +86,9 @@ const ( PauseECS = "pause" DestroyECS = "destroy" OperateServer = "operate" + + RcloneMount = "rclone" + Mounted = "mounted" ) type QueryRunningModelsReq struct { diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index ccb2ec6..35f37b6 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -257,8 +257,10 @@ func (b *NoEnvBootstrap) GetBootstrapType() string { } const ( - JobDataInEnv = "SCH_DATA_IN" - JobDataOutEnv = "SCH_DATA_OUT" + JobDataInEnv = "SCH_DATA_IN" + JobDataOutEnv = "SCH_DATA_OUT" + FinetuningOutEnv = "FINETUNING_OUT" + AccessPath = "ACCESS_PATH" ) type Rclone struct { diff --git a/utils/http/http.go b/utils/http/http.go index e8ffbbc..a3871f1 100644 --- a/utils/http/http.go +++ b/utils/http/http.go @@ -56,6 +56,27 @@ func GetJSON(url string, param RequestParam) (*http.Response, error) { return defaultClient.Do(req) } +func DeleteJSON(url string, param RequestParam) (*http.Response, error) { + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + return nil, err + } + + if err = prepareQuery(req, param.Query); err != nil { + return nil, err + } + + if err = prepareHeader(req, param.Header); err != nil { + return nil, err + } + + if err = prepareJSONBody(req, param.Body); err != nil { + return nil, err + } + + return defaultClient.Do(req) +} + func GetForm(url string, param RequestParam) (*http.Response, error) { req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { From bf44c37848e5bc43e0aec13ca0d165313eaa4f98 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Fri, 13 Sep 2024 16:11:04 +0800 Subject: [PATCH 14/17] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8=E7=8A=B6=E6=80=81=E6=9E=9A=E4=B8=BE=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/scheduler/modeljob.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/sdks/scheduler/modeljob.go b/sdks/scheduler/modeljob.go index f9afeaf..db46cb0 100644 --- a/sdks/scheduler/modeljob.go +++ b/sdks/scheduler/modeljob.go @@ -67,10 +67,11 @@ func NewECSNodeRunningInfoResp(nodeUsageRateInfos []NodeUsageRateInfo) *ECSNodeR type NodeUsageRateInfo struct { MessageBodyBase - InstanceID JobID `json:"instanceID"` - Address Address `json:"address"` - GPURate []UsageRate `json:"GPURate"` - AccCardRate []UsageRate `json:"AccCardRate"` + InstanceID JobID `json:"instanceID"` + Address Address `json:"address"` + MemoryUtilization []UsageRate `json:"memoryUtilization"` + GPUUtilization []UsageRate `json:"GPUUtilization"` + CPUUtilization []UsageRate `json:"CPUUtilization"` } type UsageRate struct { @@ -87,8 +88,14 @@ const ( DestroyECS = "destroy" OperateServer = "operate" + GPUMonitor = "GPUMonitor" + RcloneMount = "rclone" Mounted = "mounted" + + Deploying = "Deploying" + Waiting = "Waiting" + Failed = "Failed" ) type QueryRunningModelsReq struct { From 326d375b816060d80bbf0115e5912c391e35e245 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Fri, 20 Sep 2024 10:35:07 +0800 Subject: [PATCH 15/17] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E6=8E=A2=E6=B4=BB=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/scheduler/modeljob.go | 4 +++- sdks/scheduler/models.go | 13 +++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sdks/scheduler/modeljob.go b/sdks/scheduler/modeljob.go index db46cb0..673c431 100644 --- a/sdks/scheduler/modeljob.go +++ b/sdks/scheduler/modeljob.go @@ -80,7 +80,8 @@ type UsageRate struct { } const ( - FineTuning = "finetuning" + FineTuning = "finetuning" + DataPreprocess = "DataPreprocess" CreateECS = "create" RunECS = "run" @@ -96,6 +97,7 @@ const ( Deploying = "Deploying" Waiting = "Waiting" Failed = "Failed" + Invalid = "Invalid" ) type QueryRunningModelsReq struct { diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 35f37b6..e169717 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -267,3 +267,16 @@ type Rclone struct { CDSRcloneID string `json:"cds_rcloneID"` CDSRcloneConfigID string `json:"cds_rcloneConfigID"` } + +type InferencePlatform struct { + PlatformName string `json:"platformName"` + ApiBaseUrl string `json:"apiBaseUrl"` + ApiKey string `json:"apiKey"` + ApiProxy string `json:"apiProxy"` + LlmModel string `json:"llmModel"` + EmbedModel string `json:"embedModel"` + ChunkMaxLength int64 `json:"chunkMaxLength"` + StartChunkThreshold int64 `json:"startChunkThreshold"` + SimilarityThreshold float64 `json:"similarityThreshold"` + EntriesPerFile int64 `json:"entriesPerFile"` +} From 8e76b76b582915939c792e41795e351c67ed0430 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Mon, 30 Sep 2024 15:50:58 +0800 Subject: [PATCH 16/17] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=BE=AE=E8=B0=83?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E9=A2=84=E5=A4=84=E7=90=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/scheduler/modeljob.go | 2 ++ sdks/scheduler/models.go | 57 ++++++++++++++++++++++++++------------ 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/sdks/scheduler/modeljob.go b/sdks/scheduler/modeljob.go index 673c431..d896760 100644 --- a/sdks/scheduler/modeljob.go +++ b/sdks/scheduler/modeljob.go @@ -88,11 +88,13 @@ const ( PauseECS = "pause" DestroyECS = "destroy" OperateServer = "operate" + RestartServer = "restartServer" GPUMonitor = "GPUMonitor" RcloneMount = "rclone" Mounted = "mounted" + MountDir = "/mnt/oss" Deploying = "Deploying" Waiting = "Waiting" diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index e169717..d3e606a 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -7,9 +7,11 @@ import ( ) const ( - JobTypeNormal = "Normal" - JobTypeResource = "Resource" - JobTypeInstance = "Instance" + JobTypeNormal = "Normal" + JobTypeResource = "Resource" + JobTypeInstance = "Instance" + JobTypeFinetuning = "Finetuning" + JobTypeDataPreprocess = "DataPreprocess" FileInfoTypePackage = "Package" FileInfoTypeLocalFile = "LocalFile" @@ -28,6 +30,7 @@ type CCID int64 type ModelID string type ModelName string +type ECSInstanceID string type NodeID int64 type Address string @@ -46,6 +49,7 @@ var JobInfoTypeUnion = types.NewTypeUnion[JobInfo]( (*InstanceJobInfo)(nil), (*UpdateMultiInstanceJobInfo)(nil), (*FinetuningJobInfo)(nil), + (*DataPreprocessJobInfo)(nil), ) var _ = serder.UseTypeUnionInternallyTagged(&JobInfoTypeUnion, "type") @@ -60,16 +64,30 @@ func (i *JobInfoBase) GetLocalJobID() string { type NormalJobInfo struct { serder.Metadata `union:"Normal"` JobInfoBase - Type string `json:"type"` - Files JobFilesInfo `json:"files"` - Runtime JobRuntimeInfo `json:"runtime"` - Resources JobResourcesInfo `json:"resources"` - Services JobServicesInfo `json:"services"` + Type string `json:"type"` + Files JobFilesInfo `json:"files"` + Runtime JobRuntimeInfo `json:"runtime"` + Resources JobResourcesInfo `json:"resources"` + Services JobServicesInfo `json:"services"` + ModelJobInfo ModelJobInfo `json:"modelJobInfo"` } +// FinetuningJobInfo 模型微调 type FinetuningJobInfo struct { serder.Metadata `union:"Finetuning"` JobInfoBase + Type string `json:"type"` + Files JobFilesInfo `json:"files"` + Runtime JobRuntimeInfo `json:"runtime"` + Resources JobResourcesInfo `json:"resources"` + Services JobServicesInfo `json:"services"` + ModelJobInfo ModelJobInfo `json:"modelJobInfo"` +} + +// DataPreprocessJobInfo 数据预处理 +type DataPreprocessJobInfo struct { + serder.Metadata `union:"DataPreprocess"` + JobInfoBase Type string `json:"type"` Files JobFilesInfo `json:"files"` Runtime JobRuntimeInfo `json:"runtime"` @@ -85,6 +103,7 @@ type DataReturnJobInfo struct { TargetLocalJobID string `json:"targetLocalJobID"` } +// MultiInstanceJobInfo 多实例(推理任务) type MultiInstanceJobInfo struct { serder.Metadata `union:"MultiInstance"` JobInfoBase @@ -95,6 +114,7 @@ type MultiInstanceJobInfo struct { ModelJobInfo ModelJobInfo `json:"modelJobInfo"` } +// UpdateMultiInstanceJobInfo 更新模型 type UpdateMultiInstanceJobInfo struct { serder.Metadata `union:"UpdateModel"` JobInfoBase @@ -114,6 +134,7 @@ type ModelJobInfo struct { Command string `json:"command"` } +// InstanceJobInfo 单实例(推理任务) type InstanceJobInfo struct { serder.Metadata `union:"Instance"` JobInfoBase @@ -269,14 +290,14 @@ type Rclone struct { } type InferencePlatform struct { - PlatformName string `json:"platformName"` - ApiBaseUrl string `json:"apiBaseUrl"` - ApiKey string `json:"apiKey"` - ApiProxy string `json:"apiProxy"` - LlmModel string `json:"llmModel"` - EmbedModel string `json:"embedModel"` - ChunkMaxLength int64 `json:"chunkMaxLength"` - StartChunkThreshold int64 `json:"startChunkThreshold"` - SimilarityThreshold float64 `json:"similarityThreshold"` - EntriesPerFile int64 `json:"entriesPerFile"` + PlatformName string `json:"platformName"` + ApiBaseUrl string `json:"apiBaseUrl"` + ApiKey string `json:"apiKey"` + ApiProxy string `json:"apiProxy"` + LlmModel string `json:"llmModel"` + EmbedModel string `json:"embedModel"` + ChunkMaxLength string `json:"chunkMaxLength"` + StartChunkThreshold string `json:"startChunkThreshold"` + SimilarityThreshold string `json:"similarityThreshold"` + EntriesPerFile string `json:"entriesPerFile"` } From e12a091ddd7d1791572aec78ad56f55ab9e03b92 Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Mon, 30 Sep 2024 16:41:31 +0800 Subject: [PATCH 17/17] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sdks/scheduler/models.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index d3e606a..9c2b238 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -17,6 +17,10 @@ const ( FileInfoTypeLocalFile = "LocalFile" FileInfoTypeResource = "Resource" FileInfoTypeImage = "Image" + + MemoryUtilization = "MemoryUtilization" + GPUUtilization = "GPUUtilization" + CPUUtilization = "CPUUtilization" ) type JobID string