Browse Source

优化读取Node表代码

gitlink
JeshuaRen 1 year ago
parent
commit
e55092523a
2 changed files with 267 additions and 9 deletions
  1. +230
    -0
      sdks/storage/hub_io.go
  2. +37
    -9
      sdks/storage/models.go

+ 230
- 0
sdks/storage/hub_io.go View File

@@ -0,0 +1,230 @@
package cdssdk

import (
"bytes"
"fmt"
"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder"
"io"
"mime/multipart"
"net/http"
"net/url"
"strings"
)

const GetStreamPath = "/hubIO/getStream"

type GetStreamReq struct {
PlanID exec.PlanID `json:"planID"`
VarID exec.VarID `json:"varID"`
Signal *exec.SignalVar `json:"signal"`
}

func (c *Client) GetStream(planID exec.PlanID, id exec.VarID, signal *exec.SignalVar) (io.ReadCloser, error) {
targetUrl, err := url.JoinPath(c.baseURL, GetStreamPath)
if err != nil {
return nil, err
}

req := &GetStreamReq{
PlanID: planID,
VarID: id,
Signal: signal,
}

resp, err := http2.GetJSON(targetUrl, http2.RequestParam{
Body: req,
})
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
// 读取错误信息
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
return nil, fmt.Errorf("error response from server: %s", string(body))
}

return resp.Body, nil
}

const SendStreamPath = "/hubIO/sendStream"

type SendStreamReq struct {
PlanID exec.PlanID `json:"planID"`
VarID exec.VarID `json:"varID"`
Stream io.ReadCloser `json:"stream"`
}

func (c *Client) SendStream(planID exec.PlanID, varID exec.VarID, str io.Reader) error {
targetUrl, err := url.JoinPath(c.baseURL, SendStreamPath)
if err != nil {
return err
}

body := &bytes.Buffer{}
writer := multipart.NewWriter(body)

_ = writer.WriteField("plan_id", string(planID))
_ = writer.WriteField("var_id", string(rune(varID)))
fileWriter, err := writer.CreateFormFile("file", "data")
if err != nil {
return fmt.Errorf("creating form file: %w", err)
}

// 将读取的数据写入 multipart 的文件部分
_, err = io.Copy(fileWriter, str)
if err != nil {
return fmt.Errorf("copying stream data: %w", err)
}

// 关闭 writer 以结束 multipart
err = writer.Close()
if err != nil {
return fmt.Errorf("closing writer: %w", err)
}

// 创建 HTTP 请求
req, err := http.NewRequest("POST", targetUrl, body)
if err != nil {
return fmt.Errorf("creating HTTP request: %w", err)
}
req.Header.Set("Content-Type", writer.FormDataContentType())

// 发送请求
cli := http.Client{}
resp, err := cli.Do(req)
if err != nil {
return fmt.Errorf("sending HTTP request: %w", err)
}
defer resp.Body.Close()

// 检查响应状态码
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("server returned non-200 status: %d", resp.StatusCode)
}

return nil
}

const ExecuteIOPlanPath = "/hubIO/executeIOPlan"

type ExecuteIOPlanReq struct {
Plan exec.Plan `json:"plan"`
}

func (c *Client) ExecuteIOPlan(plan exec.Plan) error {
targetUrl, err := url.JoinPath(c.baseURL, ExecuteIOPlanPath)
if err != nil {
return err
}

req := &ExecuteIOPlanReq{
Plan: plan,
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
})
if err != nil {
return err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[any]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == errorcode.OK {
return nil
}

return codeResp.ToError()
}

return fmt.Errorf("unknow response content type: %s", contType)
}

const SendVarPath = "/hubIO/sendVar"

type SendVarReq struct {
PlanID exec.PlanID `json:"planID"`
Var exec.Var `json:"var"`
}

func (c *Client) SendVar(id exec.PlanID, v exec.Var) error {
targetUrl, err := url.JoinPath(c.baseURL, SendVarPath)
if err != nil {
return err
}

req := &SendVarReq{
PlanID: id,
Var: v,
}

resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
Body: req,
})
if err != nil {
return err
}

contType := resp.Header.Get("Content-Type")
if strings.Contains(contType, http2.ContentTypeJSON) {
var codeResp response[any]
if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
return fmt.Errorf("parsing response: %w", err)
}

if codeResp.Code == errorcode.OK {
return nil
}

return codeResp.ToError()
}

return fmt.Errorf("unknow response content type: %s", contType)
}

const GetVarPath = "/hubIO/getVar"

type GetVarReq struct {
PlanID exec.PlanID `json:"planID"`
Var exec.Var `json:"var"`
Signal *exec.SignalVar `json:"signal"`
}

func (c *Client) GetVar(id exec.PlanID, v exec.Var, signal *exec.SignalVar) error {
targetUrl, err := url.JoinPath(c.baseURL, GetVarPath)
if err != nil {
return err
}

req := &GetVarReq{
PlanID: id,
Var: v,
Signal: signal,
}

resp, err := http2.GetJSON(targetUrl, http2.RequestParam{
Body: req,
})
if err != nil {
return err
}

if resp.StatusCode != http.StatusOK {
// 读取错误信息
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
return fmt.Errorf("error response from server: %s", string(body))
}

return nil
}

+ 37
- 9
sdks/storage/models.go View File

@@ -183,15 +183,43 @@ type Object struct {
}

type Node struct {
NodeID NodeID `db:"NodeID" json:"nodeID"`
Name string `db:"Name" json:"name"`
LocalIP string `db:"LocalIP" json:"localIP"`
ExternalIP string `db:"ExternalIP" json:"externalIP"`
LocalGRPCPort int `db:"LocalGRPCPort" json:"localGRPCPort"`
ExternalGRPCPort int `db:"ExternalGRPCPort" json:"externalGRPCPort"`
LocationID LocationID `db:"LocationID" json:"locationID"`
State string `db:"State" json:"state"`
LastReportTime *time.Time `db:"LastReportTime" json:"lastReportTime"`
NodeID NodeID `gorm:"column:NodeID;type:varchar(255)" json:"nodeID"`
Name string `gorm:"column:Name;type:varchar(255)" json:"name"`
LocalIP string `gorm:"column:LocalIP;type:varchar(255)" json:"localIP"`
ExternalIP string `gorm:"column:ExternalIP;type:varchar(255)" json:"externalIP"`
LocalGRPCPort int `gorm:"column:LocalGRPCPort;type:int" json:"localGRPCPort"`
ExternalGRPCPort int `gorm:"column:ExternalGRPCPort;type:int" json:"externalGRPCPort"`
LocationID LocationID `gorm:"column:LocationID;type:varchar(255)" json:"locationID"`
State string `gorm:"column:State;type:varchar(255)" json:"state"`
LastReportTime *time.Time `gorm:"column:LastReportTime;type:timestamp" json:"lastReportTime"`
Address NodeAddressInfo `gorm:"column:Address;type:json;serializer:union" json:"address"`
}

type NodeAddressInfo interface {
}

var NodeAddressUnion = types.NewTypeUnion[NodeAddressInfo](
(*GRPCAddressInfo)(nil),
(*HttpAddressInfo)(nil),
)

var _ = serder.UseTypeUnionInternallyTagged(&NodeAddressUnion, "type")

type GRPCAddressInfo struct {
serder.Metadata `union:"GRPC"`
Type string `json:"type"`
LocalIP string `json:"localIP"`
ExternalIP string `json:"externalIP"`
LocalGRPCPort int `json:"localGRPCPort"`
ExternalGRPCPort int `json:"externalGRPCPort"`
}

type HttpAddressInfo struct {
serder.Metadata `union:"HTTP"`
Type string `json:"type"`
LocalIP string `json:"localIP"`
ExternalIP string `json:"externalIP"`
Port int `json:"port"`
}

func (n Node) String() string {


Loading…
Cancel
Save