| @@ -13,7 +13,7 @@ import ( | |||||
| type Driver struct { | type Driver struct { | ||||
| planID PlanID | planID PlanID | ||||
| planBlder *PlanBuilder | planBlder *PlanBuilder | ||||
| callback *future.SetValueFuture[map[string]any] | |||||
| callback *future.SetValueFuture[map[string]VarValue] | |||||
| ctx *ExecContext | ctx *ExecContext | ||||
| cancel context.CancelFunc | cancel context.CancelFunc | ||||
| driverExec *Executor | driverExec *Executor | ||||
| @@ -42,7 +42,7 @@ func (e *Driver) Signal(signal *DriverSignalVar) { | |||||
| e.driverExec.PutVar(signal.ID, &SignalValue{}) | e.driverExec.PutVar(signal.ID, &SignalValue{}) | ||||
| } | } | ||||
| func (e *Driver) Wait(ctx context.Context) (map[string]any, error) { | |||||
| func (e *Driver) Wait(ctx context.Context) (map[string]VarValue, error) { | |||||
| stored, err := e.callback.Wait(ctx) | stored, err := e.callback.Wait(ctx) | ||||
| if err != nil { | if err != nil { | ||||
| return nil, err | return nil, err | ||||
| @@ -15,19 +15,24 @@ type binding struct { | |||||
| Callback *future.SetValueFuture[VarValue] | Callback *future.SetValueFuture[VarValue] | ||||
| } | } | ||||
| type freeVar struct { | |||||
| ID VarID | |||||
| Value VarValue | |||||
| } | |||||
| type Executor struct { | type Executor struct { | ||||
| plan Plan | plan Plan | ||||
| vars map[VarID]Var | |||||
| vars map[VarID]freeVar | |||||
| bindings []*binding | bindings []*binding | ||||
| lock sync.Mutex | lock sync.Mutex | ||||
| store map[string]any | |||||
| store map[string]VarValue | |||||
| } | } | ||||
| func NewExecutor(plan Plan) *Executor { | func NewExecutor(plan Plan) *Executor { | ||||
| planning := Executor{ | planning := Executor{ | ||||
| plan: plan, | plan: plan, | ||||
| vars: make(map[VarID]Var), | |||||
| store: make(map[string]any), | |||||
| vars: make(map[VarID]freeVar), | |||||
| store: make(map[string]VarValue), | |||||
| } | } | ||||
| return &planning | return &planning | ||||
| @@ -37,7 +42,7 @@ func (s *Executor) Plan() *Plan { | |||||
| return &s.plan | return &s.plan | ||||
| } | } | ||||
| func (s *Executor) Run(ctx *ExecContext) (map[string]any, error) { | |||||
| func (s *Executor) Run(ctx *ExecContext) (map[string]VarValue, error) { | |||||
| c, cancel := context.WithCancel(ctx.Context) | c, cancel := context.WithCancel(ctx.Context) | ||||
| ctx.Context = c | ctx.Context = c | ||||
| @@ -99,11 +104,11 @@ func (s *Executor) PutVar(id VarID, value VarValue) *Executor { | |||||
| } | } | ||||
| // 如果没有绑定,则直接放入变量表中 | // 如果没有绑定,则直接放入变量表中 | ||||
| s.vars[id] = Var{ID: id, Value: value} | |||||
| s.vars[id] = freeVar{ID: id, Value: value} | |||||
| return s | return s | ||||
| } | } | ||||
| func (s *Executor) Store(key string, val any) { | |||||
| func (s *Executor) Store(key string, val VarValue) { | |||||
| s.lock.Lock() | s.lock.Lock() | ||||
| defer s.lock.Unlock() | defer s.lock.Unlock() | ||||
| @@ -63,7 +63,7 @@ func (b *PlanBuilder) Execute(ctx *ExecContext) *Driver { | |||||
| exec := Driver{ | exec := Driver{ | ||||
| planID: planID, | planID: planID, | ||||
| planBlder: b, | planBlder: b, | ||||
| callback: future.NewSetValue[map[string]any](), | |||||
| callback: future.NewSetValue[map[string]VarValue](), | |||||
| ctx: ctx, | ctx: ctx, | ||||
| cancel: cancel, | cancel: cancel, | ||||
| driverExec: NewExecutor(execPlan), | driverExec: NewExecutor(execPlan), | ||||
| @@ -10,23 +10,11 @@ import ( | |||||
| type VarID int | type VarID int | ||||
| type Var struct { | |||||
| ID VarID `json:"id"` | |||||
| Value VarValue `json:"value"` | |||||
| } | |||||
| type VarPack[T VarValue] struct { | |||||
| type Var[T VarValue] struct { | |||||
| ID VarID `json:"id"` | ID VarID `json:"id"` | ||||
| Value T `json:"value"` | Value T `json:"value"` | ||||
| } | } | ||||
| func (v *VarPack[T]) ToAny() AnyVar { | |||||
| return AnyVar{ | |||||
| ID: v.ID, | |||||
| Value: v.Value, | |||||
| } | |||||
| } | |||||
| // 变量的值 | // 变量的值 | ||||
| type VarValue interface { | type VarValue interface { | ||||
| Clone() VarValue | Clone() VarValue | ||||
| @@ -42,15 +30,6 @@ func UseVarValue[T VarValue]() { | |||||
| valueUnion.Add(reflect2.TypeOf[T]()) | valueUnion.Add(reflect2.TypeOf[T]()) | ||||
| } | } | ||||
| type AnyVar = VarPack[VarValue] | |||||
| func V(id VarID, value VarValue) AnyVar { | |||||
| return AnyVar{ | |||||
| ID: id, | |||||
| Value: value, | |||||
| } | |||||
| } | |||||
| type StreamValue struct { | type StreamValue struct { | ||||
| Stream io.ReadCloser `json:"-"` | Stream io.ReadCloser `json:"-"` | ||||
| } | } | ||||
| @@ -60,15 +39,24 @@ func (v *StreamValue) Clone() VarValue { | |||||
| panic("StreamValue should not be cloned") | panic("StreamValue should not be cloned") | ||||
| } | } | ||||
| type StreamVar = Var[*StreamValue] | |||||
| func NewStreamVar(id VarID, stream io.ReadCloser) StreamVar { | |||||
| return StreamVar{ | |||||
| ID: id, | |||||
| Value: &StreamValue{Stream: stream}, | |||||
| } | |||||
| } | |||||
| type SignalValue struct{} | type SignalValue struct{} | ||||
| func (o *SignalValue) Clone() VarValue { | func (o *SignalValue) Clone() VarValue { | ||||
| return &SignalValue{} | return &SignalValue{} | ||||
| } | } | ||||
| type SignalVar = VarPack[*SignalValue] | |||||
| type SignalVar = Var[*SignalValue] | |||||
| func NewSignal(id VarID) SignalVar { | |||||
| func NewSignalVar(id VarID) SignalVar { | |||||
| return SignalVar{ | return SignalVar{ | ||||
| ID: id, | ID: id, | ||||
| Value: &SignalValue{}, | Value: &SignalValue{}, | ||||
| @@ -82,3 +70,12 @@ type StringValue struct { | |||||
| func (o *StringValue) Clone() VarValue { | func (o *StringValue) Clone() VarValue { | ||||
| return &StringValue{Value: o.Value} | return &StringValue{Value: o.Value} | ||||
| } | } | ||||
| type StringVar = Var[*StringValue] | |||||
| func NewStringVar(id VarID, value string) StringVar { | |||||
| return StringVar{ | |||||
| ID: id, | |||||
| Value: &StringValue{Value: value}, | |||||
| } | |||||
| } | |||||
| @@ -50,10 +50,10 @@ func (o *SendStream) String() string { | |||||
| } | } | ||||
| type GetStream struct { | type GetStream struct { | ||||
| Signal exec.VarPack[*exec.SignalValue] `json:"signal"` | |||||
| Target exec.VarID `json:"target"` | |||||
| Output exec.VarID `json:"output"` | |||||
| Worker exec.WorkerInfo `json:"worker"` | |||||
| Signal exec.SignalVar `json:"signal"` | |||||
| Target exec.VarID `json:"target"` | |||||
| Output exec.VarID `json:"output"` | |||||
| Worker exec.WorkerInfo `json:"worker"` | |||||
| } | } | ||||
| func (o *GetStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { | func (o *GetStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error { | ||||
| @@ -113,10 +113,10 @@ func (o *SendVar) String() string { | |||||
| } | } | ||||
| type GetVar struct { | type GetVar struct { | ||||
| Signal exec.VarPack[*exec.SignalValue] `json:"signal"` | |||||
| Target exec.VarID `json:"target"` | |||||
| Output exec.VarID `json:"output"` | |||||
| Worker exec.WorkerInfo `json:"worker"` | |||||
| Signal exec.SignalVar `json:"signal"` | |||||
| Target exec.VarID `json:"target"` | |||||
| Output exec.VarID `json:"output"` | |||||
| Worker exec.WorkerInfo `json:"worker"` | |||||
| } | } | ||||
| func (o *GetVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { | func (o *GetVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error { | ||||
| @@ -234,7 +234,7 @@ func (t *GetStreamNode) SignalVar() *dag.Var { | |||||
| func (t *GetStreamNode) GenerateOp() (exec.Op, error) { | func (t *GetStreamNode) GenerateOp() (exec.Op, error) { | ||||
| return &GetStream{ | return &GetStream{ | ||||
| Signal: exec.NewSignal(t.OutputValues().Get(0).VarID), | |||||
| Signal: exec.NewSignalVar(t.OutputValues().Get(0).VarID), | |||||
| Output: t.OutputStreams().Get(0).VarID, | Output: t.OutputStreams().Get(0).VarID, | ||||
| Target: t.InputStreams().Get(0).VarID, | Target: t.InputStreams().Get(0).VarID, | ||||
| Worker: t.FromWorker, | Worker: t.FromWorker, | ||||
| @@ -273,7 +273,7 @@ func (t *GetValueNode) SignalVar() *dag.Var { | |||||
| func (t *GetValueNode) GenerateOp() (exec.Op, error) { | func (t *GetValueNode) GenerateOp() (exec.Op, error) { | ||||
| return &GetVar{ | return &GetVar{ | ||||
| Signal: exec.NewSignal(t.OutputValues().Get(0).VarID), | |||||
| Signal: exec.NewSignalVar(t.OutputValues().Get(0).VarID), | |||||
| Output: t.OutputValues().Get(1).VarID, | Output: t.OutputValues().Get(1).VarID, | ||||
| Target: t.InputValues().Get(0).VarID, | Target: t.InputValues().Get(0).VarID, | ||||
| Worker: t.FromWorker, | Worker: t.FromWorker, | ||||
| @@ -1,11 +1,8 @@ | |||||
| package cdsapi | package cdsapi | ||||
| import ( | import ( | ||||
| "bytes" | |||||
| "fmt" | "fmt" | ||||
| "io" | "io" | ||||
| "mime/multipart" | |||||
| "net/http" | |||||
| "net/url" | "net/url" | ||||
| "strings" | "strings" | ||||
| @@ -26,31 +23,32 @@ type GetStreamReq struct { | |||||
| Signal exec.VarValue `json:"signal"` | Signal exec.VarValue `json:"signal"` | ||||
| } | } | ||||
| func (c *Client) GetStream(planID exec.PlanID, id exec.VarID, signalID exec.VarID, signal exec.VarValue) (io.ReadCloser, error) { | |||||
| func (c *Client) GetStream(req GetStreamReq) (io.ReadCloser, error) { | |||||
| targetUrl, err := url.JoinPath(c.baseURL, GetStreamPath) | targetUrl, err := url.JoinPath(c.baseURL, GetStreamPath) | ||||
| if err != nil { | if err != nil { | ||||
| return nil, err | return nil, err | ||||
| } | } | ||||
| req := &GetStreamReq{ | |||||
| PlanID: planID, | |||||
| VarID: id, | |||||
| SignalID: signalID, | |||||
| Signal: signal, | |||||
| body, err := serder.ObjectToJSONEx(req) | |||||
| if err != nil { | |||||
| return nil, fmt.Errorf("request to json: %w", err) | |||||
| } | } | ||||
| resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ | resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ | ||||
| Body: req, | |||||
| Body: body, | |||||
| }) | }) | ||||
| if err != nil { | if err != nil { | ||||
| return nil, err | return nil, err | ||||
| } | } | ||||
| if resp.StatusCode != http.StatusOK { | |||||
| // 读取错误信息 | |||||
| body, _ := io.ReadAll(resp.Body) | |||||
| resp.Body.Close() | |||||
| return nil, fmt.Errorf("error response from server: %s", string(body)) | |||||
| contType := resp.Header.Get("Content-Type") | |||||
| if strings.Contains(contType, http2.ContentTypeJSON) { | |||||
| var codeResp response[any] | |||||
| if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { | |||||
| return nil, fmt.Errorf("parsing response: %w", err) | |||||
| } | |||||
| return nil, codeResp.ToError() | |||||
| } | } | ||||
| return resp.Body, nil | return resp.Body, nil | ||||
| @@ -59,60 +57,54 @@ func (c *Client) GetStream(planID exec.PlanID, id exec.VarID, signalID exec.VarI | |||||
| const SendStreamPath = "/hubIO/sendStream" | const SendStreamPath = "/hubIO/sendStream" | ||||
| type SendStreamReq struct { | type SendStreamReq struct { | ||||
| PlanID exec.PlanID `json:"planID"` | |||||
| VarID exec.VarID `json:"varID"` | |||||
| Stream io.ReadCloser `json:"stream"` | |||||
| SendStreamInfo | |||||
| Stream io.ReadCloser `json:"-"` | |||||
| } | |||||
| type SendStreamInfo struct { | |||||
| PlanID exec.PlanID `json:"planID"` | |||||
| VarID exec.VarID `json:"varID"` | |||||
| } | } | ||||
| func (c *Client) SendStream(planID exec.PlanID, varID exec.VarID, str io.Reader) error { | |||||
| targetUrl, err := url.JoinPath(c.baseURL, SendStreamPath) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| body := &bytes.Buffer{} | |||||
| writer := multipart.NewWriter(body) | |||||
| _ = writer.WriteField("plan_id", string(planID)) | |||||
| _ = writer.WriteField("var_id", string(rune(varID))) | |||||
| fileWriter, err := writer.CreateFormFile("file", "data") | |||||
| if err != nil { | |||||
| return fmt.Errorf("creating form file: %w", err) | |||||
| } | |||||
| // 将读取的数据写入 multipart 的文件部分 | |||||
| _, err = io.Copy(fileWriter, str) | |||||
| if err != nil { | |||||
| return fmt.Errorf("copying stream data: %w", err) | |||||
| } | |||||
| // 关闭 writer 以结束 multipart | |||||
| err = writer.Close() | |||||
| if err != nil { | |||||
| return fmt.Errorf("closing writer: %w", err) | |||||
| } | |||||
| // 创建 HTTP 请求 | |||||
| req, err := http.NewRequest("POST", targetUrl, body) | |||||
| if err != nil { | |||||
| return fmt.Errorf("creating HTTP request: %w", err) | |||||
| } | |||||
| req.Header.Set("Content-Type", writer.FormDataContentType()) | |||||
| // 发送请求 | |||||
| cli := http.Client{} | |||||
| resp, err := cli.Do(req) | |||||
| if err != nil { | |||||
| return fmt.Errorf("sending HTTP request: %w", err) | |||||
| } | |||||
| defer resp.Body.Close() | |||||
| // 检查响应状态码 | |||||
| if resp.StatusCode != http.StatusOK { | |||||
| return fmt.Errorf("server returned non-200 status: %d", resp.StatusCode) | |||||
| } | |||||
| return nil | |||||
| func (c *Client) SendStream(req SendStreamReq) error { | |||||
| // targetUrl, err := url.JoinPath(c.baseURL, SendStreamPath) | |||||
| // if err != nil { | |||||
| // return err | |||||
| // } | |||||
| // infoJSON, err := serder.ObjectToJSON(req) | |||||
| // if err != nil { | |||||
| // return fmt.Errorf("info to json: %w", err) | |||||
| // } | |||||
| // resp, err := http2.PostMultiPart(targetUrl, http2.MultiPartRequestParam{ | |||||
| // Form: map[string]string{"info": string(infoJSON)}, | |||||
| // Files: iterator.Array(&http2.IterMultiPartFile{ | |||||
| // FieldName: "stream", | |||||
| // FileName: "stream", | |||||
| // File: req.Stream, | |||||
| // }), | |||||
| // }) | |||||
| // if err != nil { | |||||
| // return err | |||||
| // } | |||||
| // contType := resp.Header.Get("Content-Type") | |||||
| // if strings.Contains(contType, http2.ContentTypeJSON) { | |||||
| // var err error | |||||
| // var codeResp response[ObjectUploadResp] | |||||
| // if codeResp, err = serder.JSONToObjectStreamEx[response[ObjectUploadResp]](resp.Body); err != nil { | |||||
| // return fmt.Errorf("parsing response: %w", err) | |||||
| // } | |||||
| // if codeResp.Code == errorcode.OK { | |||||
| // return nil | |||||
| // } | |||||
| // return codeResp.ToError() | |||||
| // } | |||||
| // return fmt.Errorf("unknow response content type: %s", contType) | |||||
| return fmt.Errorf("not implemented") | |||||
| } | } | ||||
| const ExecuteIOPlanPath = "/hubIO/executeIOPlan" | const ExecuteIOPlanPath = "/hubIO/executeIOPlan" | ||||
| @@ -121,38 +113,34 @@ type ExecuteIOPlanReq struct { | |||||
| Plan exec.Plan `json:"plan"` | Plan exec.Plan `json:"plan"` | ||||
| } | } | ||||
| func (c *Client) ExecuteIOPlan(plan exec.Plan) error { | |||||
| func (c *Client) ExecuteIOPlan(req ExecuteIOPlanReq) error { | |||||
| targetUrl, err := url.JoinPath(c.baseURL, ExecuteIOPlanPath) | targetUrl, err := url.JoinPath(c.baseURL, ExecuteIOPlanPath) | ||||
| if err != nil { | if err != nil { | ||||
| return err | return err | ||||
| } | } | ||||
| req := &ExecuteIOPlanReq{ | |||||
| Plan: plan, | |||||
| body, err := serder.ObjectToJSONEx(req) | |||||
| if err != nil { | |||||
| return fmt.Errorf("request to json: %w", err) | |||||
| } | } | ||||
| resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ | resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ | ||||
| Body: req, | |||||
| Body: body, | |||||
| }) | }) | ||||
| if err != nil { | if err != nil { | ||||
| return err | return err | ||||
| } | } | ||||
| contType := resp.Header.Get("Content-Type") | |||||
| if strings.Contains(contType, http2.ContentTypeJSON) { | |||||
| var codeResp response[any] | |||||
| if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { | |||||
| return fmt.Errorf("parsing response: %w", err) | |||||
| } | |||||
| if codeResp.Code == errorcode.OK { | |||||
| return nil | |||||
| } | |||||
| codeResp, err := ParseJSONResponse[response[any]](resp) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| return codeResp.ToError() | |||||
| if codeResp.Code == errorcode.OK { | |||||
| return nil | |||||
| } | } | ||||
| return fmt.Errorf("unknow response content type: %s", contType) | |||||
| return codeResp.ToError() | |||||
| } | } | ||||
| const SendVarPath = "/hubIO/sendVar" | const SendVarPath = "/hubIO/sendVar" | ||||
| @@ -163,40 +151,34 @@ type SendVarReq struct { | |||||
| VarValue exec.VarValue `json:"varValue"` | VarValue exec.VarValue `json:"varValue"` | ||||
| } | } | ||||
| func (c *Client) SendVar(planID exec.PlanID, id exec.VarID, value exec.VarValue) error { | |||||
| func (c *Client) SendVar(req SendVarReq) error { | |||||
| targetUrl, err := url.JoinPath(c.baseURL, SendVarPath) | targetUrl, err := url.JoinPath(c.baseURL, SendVarPath) | ||||
| if err != nil { | if err != nil { | ||||
| return err | return err | ||||
| } | } | ||||
| req := &SendVarReq{ | |||||
| PlanID: planID, | |||||
| VarID: id, | |||||
| VarValue: value, | |||||
| body, err := serder.ObjectToJSONEx(req) | |||||
| if err != nil { | |||||
| return fmt.Errorf("request to json: %w", err) | |||||
| } | } | ||||
| resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ | resp, err := http2.PostJSON(targetUrl, http2.RequestParam{ | ||||
| Body: req, | |||||
| Body: body, | |||||
| }) | }) | ||||
| if err != nil { | if err != nil { | ||||
| return err | return err | ||||
| } | } | ||||
| contType := resp.Header.Get("Content-Type") | |||||
| if strings.Contains(contType, http2.ContentTypeJSON) { | |||||
| var codeResp response[any] | |||||
| if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil { | |||||
| return fmt.Errorf("parsing response: %w", err) | |||||
| } | |||||
| if codeResp.Code == errorcode.OK { | |||||
| return nil | |||||
| } | |||||
| jsonResp, err := ParseJSONResponse[response[any]](resp) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| return codeResp.ToError() | |||||
| if jsonResp.Code == errorcode.OK { | |||||
| return nil | |||||
| } | } | ||||
| return fmt.Errorf("unknow response content type: %s", contType) | |||||
| return jsonResp.ToError() | |||||
| } | } | ||||
| const GetVarPath = "/hubIO/getVar" | const GetVarPath = "/hubIO/getVar" | ||||
| @@ -208,33 +190,36 @@ type GetVarReq struct { | |||||
| Signal exec.VarValue `json:"signal"` | Signal exec.VarValue `json:"signal"` | ||||
| } | } | ||||
| func (c *Client) GetVar(id exec.PlanID, varID exec.VarID, singalID exec.VarID, signal exec.VarValue) (exec.VarValue, error) { | |||||
| type GetVarResp struct { | |||||
| Value exec.VarValue `json:"value"` | |||||
| } | |||||
| func (c *Client) GetVar(req GetVarReq) (*GetVarResp, error) { | |||||
| targetUrl, err := url.JoinPath(c.baseURL, GetVarPath) | targetUrl, err := url.JoinPath(c.baseURL, GetVarPath) | ||||
| if err != nil { | if err != nil { | ||||
| return nil, err | return nil, err | ||||
| } | } | ||||
| req := &GetVarReq{ | |||||
| PlanID: id, | |||||
| VarID: varID, | |||||
| SignalID: singalID, | |||||
| Signal: signal, | |||||
| body, err := serder.ObjectToJSONEx(req) | |||||
| if err != nil { | |||||
| return nil, fmt.Errorf("request to json: %w", err) | |||||
| } | } | ||||
| resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ | resp, err := http2.GetJSON(targetUrl, http2.RequestParam{ | ||||
| Body: req, | |||||
| Body: body, | |||||
| }) | }) | ||||
| if err != nil { | if err != nil { | ||||
| return nil, err | return nil, err | ||||
| } | } | ||||
| body, _ := io.ReadAll(resp.Body) | |||||
| jsonResp, err := ParseJSONResponse[response[GetVarResp]](resp) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| if resp.StatusCode != http.StatusOK { | |||||
| // 读取错误信息 | |||||
| resp.Body.Close() | |||||
| return nil, fmt.Errorf("error response from server: %s", string(body)) | |||||
| if jsonResp.Code == errorcode.OK { | |||||
| return &jsonResp.Data, nil | |||||
| } | } | ||||
| return nil | |||||
| return nil, jsonResp.ToError() | |||||
| } | } | ||||
| @@ -1,6 +1,7 @@ | |||||
| package cdsapi | package cdsapi | ||||
| import ( | import ( | ||||
| "encoding/binary" | |||||
| "fmt" | "fmt" | ||||
| "io" | "io" | ||||
| "net/http" | "net/http" | ||||
| @@ -8,6 +9,7 @@ import ( | |||||
| "strings" | "strings" | ||||
| "gitlink.org.cn/cloudream/common/utils/http2" | "gitlink.org.cn/cloudream/common/utils/http2" | ||||
| "gitlink.org.cn/cloudream/common/utils/io2" | |||||
| "gitlink.org.cn/cloudream/common/utils/math2" | "gitlink.org.cn/cloudream/common/utils/math2" | ||||
| "gitlink.org.cn/cloudream/common/utils/serder" | "gitlink.org.cn/cloudream/common/utils/serder" | ||||
| ) | ) | ||||
| @@ -36,3 +38,88 @@ func ParseJSONResponse[TBody any](resp *http.Response) (TBody, error) { | |||||
| return ret, fmt.Errorf("unknow response content type: %s, status: %d, body(prefix): %s", contType, resp.StatusCode, strCont[:math2.Min(len(strCont), 200)]) | return ret, fmt.Errorf("unknow response content type: %s, status: %d, body(prefix): %s", contType, resp.StatusCode, strCont[:math2.Min(len(strCont), 200)]) | ||||
| } | } | ||||
| func WriteStream(dst io.Writer, src io.Reader) (int64, error) { | |||||
| sent := int64(0) | |||||
| buf := make([]byte, 1024*4) | |||||
| header := make([]byte, 4) | |||||
| for { | |||||
| n, err := src.Read(buf) | |||||
| if n > 0 { | |||||
| binary.LittleEndian.PutUint32(header, uint32(n)) | |||||
| err := io2.WriteAll(dst, header) | |||||
| if err != nil { | |||||
| return sent, err | |||||
| } | |||||
| sent += int64(n) | |||||
| } | |||||
| if err == io.EOF { | |||||
| binary.LittleEndian.PutUint32(header, uint32(0)) | |||||
| err := io2.WriteAll(dst, header) | |||||
| if err != nil { | |||||
| return sent, err | |||||
| } | |||||
| return sent, nil | |||||
| } | |||||
| if err != nil { | |||||
| errData := []byte(err.Error()) | |||||
| header := make([]byte, 4) | |||||
| binary.LittleEndian.PutUint32(header, uint32(-len(errData))) | |||||
| // 不管有没有成功 | |||||
| io2.WriteAll(dst, header) | |||||
| io2.WriteAll(dst, errData) | |||||
| return sent, err | |||||
| } | |||||
| } | |||||
| } | |||||
| func ReadStream(src io.Reader) io.ReadCloser { | |||||
| pr, pw := io.Pipe() | |||||
| buf := make([]byte, 1024*4) | |||||
| go func() { | |||||
| for { | |||||
| _, err := io.ReadFull(src, buf[:4]) | |||||
| if err != nil { | |||||
| pw.CloseWithError(err) | |||||
| break | |||||
| } | |||||
| h := int32(binary.LittleEndian.Uint32(buf[:4])) | |||||
| if h == 0 { | |||||
| pw.Close() | |||||
| break | |||||
| } | |||||
| if h < 0 { | |||||
| _, err := io.ReadFull(src, buf[:-h]) | |||||
| if err != nil { | |||||
| pw.CloseWithError(err) | |||||
| break | |||||
| } | |||||
| pw.CloseWithError(fmt.Errorf(string(buf[:-h]))) | |||||
| break | |||||
| } | |||||
| _, err = io.ReadFull(src, buf[:h]) | |||||
| if err != nil { | |||||
| pw.CloseWithError(err) | |||||
| break | |||||
| } | |||||
| _, err = pw.Write(buf[:h]) | |||||
| if err != nil { | |||||
| pw.Close() | |||||
| break | |||||
| } | |||||
| } | |||||
| }() | |||||
| return pr | |||||
| } | |||||
| @@ -27,6 +27,9 @@ type StorageID int64 | |||||
| type LocationID int64 | type LocationID int64 | ||||
| // 文件的SHA256哈希值,全大写的16进制字符串格式 | |||||
| type FileHash string | |||||
| /// TODO 将分散在各处的公共结构体定义集中到这里来 | /// TODO 将分散在各处的公共结构体定义集中到这里来 | ||||
| type Redundancy interface { | type Redundancy interface { | ||||
| @@ -176,7 +179,7 @@ type Object struct { | |||||
| PackageID PackageID `db:"PackageID" json:"packageID"` | PackageID PackageID `db:"PackageID" json:"packageID"` | ||||
| Path string `db:"Path" json:"path"` | Path string `db:"Path" json:"path"` | ||||
| Size int64 `db:"Size" json:"size,string"` | Size int64 `db:"Size" json:"size,string"` | ||||
| FileHash string `db:"FileHash" json:"fileHash"` | |||||
| FileHash FileHash `db:"FileHash" json:"fileHash"` | |||||
| Redundancy Redundancy `db:"Redundancy" json:"redundancy"` | Redundancy Redundancy `db:"Redundancy" json:"redundancy"` | ||||
| CreateTime time.Time `db:"CreateTime" json:"createTime"` | CreateTime time.Time `db:"CreateTime" json:"createTime"` | ||||
| UpdateTime time.Time `db:"UpdateTime" json:"updateTime"` | UpdateTime time.Time `db:"UpdateTime" json:"updateTime"` | ||||
| @@ -245,21 +248,21 @@ type NodeConnectivity struct { | |||||
| TestTime time.Time `db:"TestTime" json:"testTime"` | TestTime time.Time `db:"TestTime" json:"testTime"` | ||||
| } | } | ||||
| type NodePackageCachingInfo struct { | |||||
| NodeID NodeID `json:"nodeID"` | |||||
| FileSize int64 `json:"fileSize"` | |||||
| ObjectCount int64 `json:"objectCount"` | |||||
| type StoragePackageCachingInfo struct { | |||||
| StorageID StorageID `json:"storageID"` | |||||
| FileSize int64 `json:"fileSize"` | |||||
| ObjectCount int64 `json:"objectCount"` | |||||
| } | } | ||||
| type PackageCachingInfo struct { | type PackageCachingInfo struct { | ||||
| NodeInfos []NodePackageCachingInfo `json:"nodeInfos"` | |||||
| PackageSize int64 `json:"packageSize"` | |||||
| StorageInfos []StoragePackageCachingInfo `json:"stgInfos"` | |||||
| PackageSize int64 `json:"packageSize"` | |||||
| } | } | ||||
| func NewPackageCachingInfo(nodeInfos []NodePackageCachingInfo, packageSize int64) PackageCachingInfo { | |||||
| func NewPackageCachingInfo(stgInfos []StoragePackageCachingInfo, packageSize int64) PackageCachingInfo { | |||||
| return PackageCachingInfo{ | return PackageCachingInfo{ | ||||
| NodeInfos: nodeInfos, | |||||
| PackageSize: packageSize, | |||||
| StorageInfos: stgInfos, | |||||
| PackageSize: packageSize, | |||||
| } | } | ||||
| } | } | ||||
| @@ -20,8 +20,6 @@ var _ = serder.UseTypeUnionInternallyTagged(types.Ref(types.NewTypeUnion[ShardSt | |||||
| type ShardStorage struct { | type ShardStorage struct { | ||||
| StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey"` | StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey"` | ||||
| // 完全管理此存储服务的Hub的ID | |||||
| MasterHub NodeID `json:"masterHub" gorm:"column:MasterHub; not null"` | |||||
| // Shard存储空间在存储服务的目录 | // Shard存储空间在存储服务的目录 | ||||
| Root string `json:"root" gorm:"column:Root; not null"` | Root string `json:"root" gorm:"column:Root; not null"` | ||||
| // ShardStore配置数据 | // ShardStore配置数据 | ||||
| @@ -1,6 +1,8 @@ | |||||
| package cdssdk | package cdssdk | ||||
| import ( | import ( | ||||
| "fmt" | |||||
| "gitlink.org.cn/cloudream/common/pkgs/types" | "gitlink.org.cn/cloudream/common/pkgs/types" | ||||
| "gitlink.org.cn/cloudream/common/utils/serder" | "gitlink.org.cn/cloudream/common/utils/serder" | ||||
| ) | ) | ||||
| @@ -31,12 +33,18 @@ func (a *LocalStorageAddress) String() string { | |||||
| type Storage struct { | type Storage struct { | ||||
| StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey; autoIncrement;"` | StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey; autoIncrement;"` | ||||
| Name string `json:"name" gorm:"column:Name; not null"` | Name string `json:"name" gorm:"column:Name; not null"` | ||||
| // 完全管理此存储服务的Hub的ID | |||||
| MasterHub NodeID `json:"masterHub" gorm:"column:MasterHub; not null"` | |||||
| // 存储服务的地址,包含鉴权所需数据 | // 存储服务的地址,包含鉴权所需数据 | ||||
| Address StorageAddress `json:"address" gorm:"column:Address; type:json; not null; serializer:union"` | Address StorageAddress `json:"address" gorm:"column:Address; type:json; not null; serializer:union"` | ||||
| // 存储服务拥有的特别功能 | // 存储服务拥有的特别功能 | ||||
| Features []StorageFeature `json:"features" gorm:"column:Features; type:json; serializer:union"` | Features []StorageFeature `json:"features" gorm:"column:Features; type:json; serializer:union"` | ||||
| } | } | ||||
| func (s *Storage) String() string { | |||||
| return fmt.Sprintf("%v(%v)", s.Name, s.StorageID) | |||||
| } | |||||
| // 共享存储服务的配置数据 | // 共享存储服务的配置数据 | ||||
| type SharedStorage struct { | type SharedStorage struct { | ||||
| StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey"` | StorageID StorageID `json:"storageID" gorm:"column:StorageID; primaryKey"` | ||||
| @@ -32,7 +32,7 @@ var defaultClient = http.DefaultClient | |||||
| type RequestParam struct { | type RequestParam struct { | ||||
| Header any | Header any | ||||
| Query any | Query any | ||||
| Body any | |||||
| Body any // 如果是[]byte,则直接作为请求体,否则会被序列化等处理 | |||||
| } | } | ||||
| func GetJSON(url string, param RequestParam) (*http.Response, error) { | func GetJSON(url string, param RequestParam) (*http.Response, error) { | ||||
| @@ -119,45 +119,6 @@ func PostJSON(url string, param RequestParam) (*http.Response, error) { | |||||
| return defaultClient.Do(req) | return defaultClient.Do(req) | ||||
| } | } | ||||
| func PostJSONRow(url string, param RequestParam) (*http.Response, error) { | |||||
| req, err := http.NewRequest(http.MethodPost, url, nil) | |||||
| if err != nil { | |||||
| return nil, err | |||||
| } | |||||
| if err = prepareQuery(req, param.Query); err != nil { | |||||
| return nil, err | |||||
| } | |||||
| if err = prepareHeader(req, param.Header); err != nil { | |||||
| return nil, err | |||||
| } | |||||
| //if err = prepareJSONBody(req, param.Body); err != nil { | |||||
| // return nil, err | |||||
| //} | |||||
| setHeader(req.Header, "Content-Type", ContentTypeJSON) | |||||
| if param.Body == nil { | |||||
| return nil, nil | |||||
| } | |||||
| switch body := param.Body.(type) { | |||||
| case nil: | |||||
| case string: | |||||
| req.ContentLength = int64(len(body)) | |||||
| req.Body = io.NopCloser(bytes.NewReader([]byte(body))) | |||||
| case []byte: | |||||
| req.ContentLength = int64(len(body)) | |||||
| req.Body = io.NopCloser(bytes.NewReader(body)) | |||||
| default: | |||||
| return nil, fmt.Errorf("body error") | |||||
| } | |||||
| return defaultClient.Do(req) | |||||
| } | |||||
| func PostForm(url string, param RequestParam) (*http.Response, error) { | func PostForm(url string, param RequestParam) (*http.Response, error) { | ||||
| req, err := http.NewRequest(http.MethodPost, url, nil) | req, err := http.NewRequest(http.MethodPost, url, nil) | ||||
| if err != nil { | if err != nil { | ||||
| @@ -528,17 +489,22 @@ func prepareHeader(req *http.Request, header any) error { | |||||
| func prepareJSONBody(req *http.Request, body any) error { | func prepareJSONBody(req *http.Request, body any) error { | ||||
| setHeader(req.Header, "Content-Type", ContentTypeJSON) | setHeader(req.Header, "Content-Type", ContentTypeJSON) | ||||
| if body == nil { | |||||
| switch body := body.(type) { | |||||
| case nil: | |||||
| return nil | return nil | ||||
| } | |||||
| case []byte: | |||||
| req.ContentLength = int64(len(body)) | |||||
| req.Body = io.NopCloser(bytes.NewReader(body)) | |||||
| default: | |||||
| data, err := serder.ObjectToJSON(body) | |||||
| if err != nil { | |||||
| return err | |||||
| } | |||||
| data, err := serder.ObjectToJSON(body) | |||||
| if err != nil { | |||||
| return err | |||||
| req.ContentLength = int64(len(data)) | |||||
| req.Body = io.NopCloser(bytes.NewReader(data)) | |||||
| } | } | ||||
| req.ContentLength = int64(len(data)) | |||||
| req.Body = io.NopCloser(bytes.NewReader(data)) | |||||
| return nil | return nil | ||||
| } | } | ||||
| @@ -1,4 +1,4 @@ | |||||
| package os | |||||
| package os2 | |||||
| import ( | import ( | ||||
| "os" | "os" | ||||
| @@ -0,0 +1,20 @@ | |||||
| package os2 | |||||
| import ( | |||||
| "math/rand" | |||||
| "strings" | |||||
| ) | |||||
| func GenerateRandomFileName(len int) string { | |||||
| sb := strings.Builder{} | |||||
| for i := 0; i < len; i++ { | |||||
| rd := rand.Intn(26 + 10) | |||||
| if rd < 26 { | |||||
| sb.WriteRune('a' + rune(rd)) | |||||
| } else { | |||||
| sb.WriteRune('0' + rune(rd-10)) | |||||
| } | |||||
| } | |||||
| return sb.String() | |||||
| } | |||||