You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

hub_io.go 5.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package cdsapi
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "mime/multipart"
  7. "net/http"
  8. "net/url"
  9. "strings"
  10. "gitlink.org.cn/cloudream/common/consts/errorcode"
  11. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  12. "gitlink.org.cn/cloudream/common/utils/http2"
  13. "gitlink.org.cn/cloudream/common/utils/serder"
  14. )
  15. const GetStreamPath = "/hubIO/getStream"
  16. type GetStreamReq struct {
  17. PlanID exec.PlanID `json:"planID"`
  18. VarID exec.VarID `json:"varID"`
  19. Signal *exec.SignalVar `json:"signal"`
  20. }
  21. func (c *Client) GetStream(planID exec.PlanID, id exec.VarID, signal *exec.SignalVar) (io.ReadCloser, error) {
  22. targetUrl, err := url.JoinPath(c.baseURL, GetStreamPath)
  23. if err != nil {
  24. return nil, err
  25. }
  26. req := &GetStreamReq{
  27. PlanID: planID,
  28. VarID: id,
  29. Signal: signal,
  30. }
  31. resp, err := http2.GetJSON(targetUrl, http2.RequestParam{
  32. Body: req,
  33. })
  34. if err != nil {
  35. return nil, err
  36. }
  37. if resp.StatusCode != http.StatusOK {
  38. // 读取错误信息
  39. body, _ := io.ReadAll(resp.Body)
  40. resp.Body.Close()
  41. return nil, fmt.Errorf("error response from server: %s", string(body))
  42. }
  43. return resp.Body, nil
  44. }
  45. const SendStreamPath = "/hubIO/sendStream"
  46. type SendStreamReq struct {
  47. PlanID exec.PlanID `json:"planID"`
  48. VarID exec.VarID `json:"varID"`
  49. Stream io.ReadCloser `json:"stream"`
  50. }
  51. func (c *Client) SendStream(planID exec.PlanID, varID exec.VarID, str io.Reader) error {
  52. targetUrl, err := url.JoinPath(c.baseURL, SendStreamPath)
  53. if err != nil {
  54. return err
  55. }
  56. body := &bytes.Buffer{}
  57. writer := multipart.NewWriter(body)
  58. _ = writer.WriteField("plan_id", string(planID))
  59. _ = writer.WriteField("var_id", string(rune(varID)))
  60. fileWriter, err := writer.CreateFormFile("file", "data")
  61. if err != nil {
  62. return fmt.Errorf("creating form file: %w", err)
  63. }
  64. // 将读取的数据写入 multipart 的文件部分
  65. _, err = io.Copy(fileWriter, str)
  66. if err != nil {
  67. return fmt.Errorf("copying stream data: %w", err)
  68. }
  69. // 关闭 writer 以结束 multipart
  70. err = writer.Close()
  71. if err != nil {
  72. return fmt.Errorf("closing writer: %w", err)
  73. }
  74. // 创建 HTTP 请求
  75. req, err := http.NewRequest("POST", targetUrl, body)
  76. if err != nil {
  77. return fmt.Errorf("creating HTTP request: %w", err)
  78. }
  79. req.Header.Set("Content-Type", writer.FormDataContentType())
  80. // 发送请求
  81. cli := http.Client{}
  82. resp, err := cli.Do(req)
  83. if err != nil {
  84. return fmt.Errorf("sending HTTP request: %w", err)
  85. }
  86. defer resp.Body.Close()
  87. // 检查响应状态码
  88. if resp.StatusCode != http.StatusOK {
  89. return fmt.Errorf("server returned non-200 status: %d", resp.StatusCode)
  90. }
  91. return nil
  92. }
  93. const ExecuteIOPlanPath = "/hubIO/executeIOPlan"
  94. type ExecuteIOPlanReq struct {
  95. Plan exec.Plan `json:"plan"`
  96. }
  97. func (c *Client) ExecuteIOPlan(plan exec.Plan) error {
  98. targetUrl, err := url.JoinPath(c.baseURL, ExecuteIOPlanPath)
  99. if err != nil {
  100. return err
  101. }
  102. req := &ExecuteIOPlanReq{
  103. Plan: plan,
  104. }
  105. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  106. Body: req,
  107. })
  108. if err != nil {
  109. return err
  110. }
  111. contType := resp.Header.Get("Content-Type")
  112. if strings.Contains(contType, http2.ContentTypeJSON) {
  113. var codeResp response[any]
  114. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  115. return fmt.Errorf("parsing response: %w", err)
  116. }
  117. if codeResp.Code == errorcode.OK {
  118. return nil
  119. }
  120. return codeResp.ToError()
  121. }
  122. return fmt.Errorf("unknow response content type: %s", contType)
  123. }
  124. const SendVarPath = "/hubIO/sendVar"
  125. type SendVarReq struct {
  126. PlanID exec.PlanID `json:"planID"`
  127. Var exec.Var `json:"var"`
  128. }
  129. func (c *Client) SendVar(id exec.PlanID, v exec.Var) error {
  130. targetUrl, err := url.JoinPath(c.baseURL, SendVarPath)
  131. if err != nil {
  132. return err
  133. }
  134. req := &SendVarReq{
  135. PlanID: id,
  136. Var: v,
  137. }
  138. resp, err := http2.PostJSON(targetUrl, http2.RequestParam{
  139. Body: req,
  140. })
  141. if err != nil {
  142. return err
  143. }
  144. contType := resp.Header.Get("Content-Type")
  145. if strings.Contains(contType, http2.ContentTypeJSON) {
  146. var codeResp response[any]
  147. if err := serder.JSONToObjectStream(resp.Body, &codeResp); err != nil {
  148. return fmt.Errorf("parsing response: %w", err)
  149. }
  150. if codeResp.Code == errorcode.OK {
  151. return nil
  152. }
  153. return codeResp.ToError()
  154. }
  155. return fmt.Errorf("unknow response content type: %s", contType)
  156. }
  157. const GetVarPath = "/hubIO/getVar"
  158. type GetVarReq struct {
  159. PlanID exec.PlanID `json:"planID"`
  160. Var exec.Var `json:"var"`
  161. Signal *exec.SignalVar `json:"signal"`
  162. }
  163. func (c *Client) GetVar(id exec.PlanID, v exec.Var, signal *exec.SignalVar) error {
  164. targetUrl, err := url.JoinPath(c.baseURL, GetVarPath)
  165. if err != nil {
  166. return err
  167. }
  168. req := &GetVarReq{
  169. PlanID: id,
  170. Var: v,
  171. Signal: signal,
  172. }
  173. resp, err := http2.GetJSON(targetUrl, http2.RequestParam{
  174. Body: req,
  175. })
  176. if err != nil {
  177. return err
  178. }
  179. if resp.StatusCode != http.StatusOK {
  180. // 读取错误信息
  181. body, _ := io.ReadAll(resp.Body)
  182. resp.Body.Close()
  183. return fmt.Errorf("error response from server: %s", string(body))
  184. }
  185. return nil
  186. }