You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

io.go 6.7 kB

2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
1 year ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
1 year ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package grpc
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "time"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/common/utils/io2"
  10. "gitlink.org.cn/cloudream/common/utils/serder"
  11. agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
  12. )
  13. func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanReq) (*agtrpc.ExecuteIOPlanResp, error) {
  14. plan, err := serder.JSONToObjectEx[exec.Plan]([]byte(req.Plan))
  15. if err != nil {
  16. return nil, fmt.Errorf("deserializing plan: %w", err)
  17. }
  18. logger.WithField("PlanID", plan.ID).Infof("begin execute io plan")
  19. defer logger.WithField("PlanID", plan.ID).Infof("plan finished")
  20. sw := exec.NewExecutor(plan)
  21. s.swWorker.Add(sw)
  22. defer s.swWorker.Remove(sw)
  23. _, err = sw.Run(ctx)
  24. if err != nil {
  25. return nil, fmt.Errorf("running io plan: %w", err)
  26. }
  27. return &agtrpc.ExecuteIOPlanResp{}, nil
  28. }
  29. func (s *Service) SendStream(server agtrpc.Agent_SendStreamServer) error {
  30. msg, err := server.Recv()
  31. if err != nil {
  32. return fmt.Errorf("recving stream id packet: %w", err)
  33. }
  34. if msg.Type != agtrpc.StreamDataPacketType_SendArgs {
  35. return fmt.Errorf("first packet must be a SendArgs packet")
  36. }
  37. logger.
  38. WithField("PlanID", msg.PlanID).
  39. WithField("VarID", msg.VarID).
  40. Debugf("stream input")
  41. // 同一批Plan中每个节点的Plan的启动时间有先后,但最多不应该超过30秒
  42. ctx, cancel := context.WithTimeout(server.Context(), time.Second*30)
  43. defer cancel()
  44. sw := s.swWorker.FindByIDContexted(ctx, exec.PlanID(msg.PlanID))
  45. if sw == nil {
  46. return fmt.Errorf("plan not found")
  47. }
  48. pr, pw := io.Pipe()
  49. rb := io2.RingBuffer(pr, 16*1024)
  50. rb.UpstreamName = fmt.Sprintf("GRPC(send) input")
  51. rb.DownstreamName = fmt.Sprintf("GRPC(send) output %v", msg.VarID)
  52. varID := exec.VarID(msg.VarID)
  53. sw.PutVars(&exec.StreamVar{
  54. ID: varID,
  55. Stream: rb,
  56. })
  57. // 然后读取文件数据
  58. var recvSize int64
  59. for {
  60. msg, err := server.Recv()
  61. // 读取客户端数据失败
  62. // 即使err是io.EOF,只要没有收到客户端包含EOF数据包就被断开了连接,就认为接收失败
  63. if err != nil {
  64. // 关闭文件写入
  65. pw.CloseWithError(io.ErrClosedPipe)
  66. logger.WithField("ReceiveSize", recvSize).
  67. WithField("VarID", varID).
  68. Warnf("recv message failed, err: %s", err.Error())
  69. return fmt.Errorf("recv message failed, err: %w", err)
  70. }
  71. err = io2.WriteAll(pw, msg.Data)
  72. if err != nil {
  73. // 关闭文件写入
  74. pw.CloseWithError(io.ErrClosedPipe)
  75. logger.Warnf("write data to file failed, err: %s", err.Error())
  76. return fmt.Errorf("write data to file failed, err: %w", err)
  77. }
  78. recvSize += int64(len(msg.Data))
  79. if msg.Type == agtrpc.StreamDataPacketType_EOF {
  80. // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash
  81. err := pw.Close()
  82. if err != nil {
  83. logger.Warnf("finish writing failed, err: %s", err.Error())
  84. return fmt.Errorf("finish writing failed, err: %w", err)
  85. }
  86. // 并将结果返回到客户端
  87. err = server.SendAndClose(&agtrpc.SendStreamResp{})
  88. if err != nil {
  89. logger.Warnf("send response failed, err: %s", err.Error())
  90. return fmt.Errorf("send response failed, err: %w", err)
  91. }
  92. return nil
  93. }
  94. }
  95. }
  96. func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Agent_GetStreamServer) error {
  97. logger.
  98. WithField("PlanID", req.PlanID).
  99. WithField("VarID", req.VarID).
  100. Debugf("stream output")
  101. // 同上
  102. ctx, cancel := context.WithTimeout(server.Context(), time.Second*30)
  103. defer cancel()
  104. sw := s.swWorker.FindByIDContexted(ctx, exec.PlanID(req.PlanID))
  105. if sw == nil {
  106. return fmt.Errorf("plan not found")
  107. }
  108. signal, err := serder.JSONToObjectEx[*exec.SignalVar]([]byte(req.Signal))
  109. if err != nil {
  110. return fmt.Errorf("deserializing var: %w", err)
  111. }
  112. sw.PutVars(signal)
  113. strVar := &exec.StreamVar{
  114. ID: exec.VarID(req.VarID),
  115. }
  116. err = sw.BindVars(server.Context(), strVar)
  117. if err != nil {
  118. return fmt.Errorf("binding vars: %w", err)
  119. }
  120. reader := strVar.Stream
  121. defer reader.Close()
  122. buf := make([]byte, 1024*64)
  123. readAllCnt := 0
  124. for {
  125. readCnt, err := reader.Read(buf)
  126. if readCnt > 0 {
  127. readAllCnt += readCnt
  128. err = server.Send(&agtrpc.StreamDataPacket{
  129. Type: agtrpc.StreamDataPacketType_Data,
  130. Data: buf[:readCnt],
  131. })
  132. if err != nil {
  133. logger.
  134. WithField("PlanID", req.PlanID).
  135. WithField("VarID", req.VarID).
  136. Warnf("send stream data failed, err: %s", err.Error())
  137. return fmt.Errorf("send stream data failed, err: %w", err)
  138. }
  139. }
  140. // 文件读取完毕
  141. if err == io.EOF {
  142. logger.
  143. WithField("PlanID", req.PlanID).
  144. WithField("VarID", req.VarID).
  145. Debugf("send data size %d", readAllCnt)
  146. // 发送EOF消息
  147. server.Send(&agtrpc.StreamDataPacket{
  148. Type: agtrpc.StreamDataPacketType_EOF,
  149. })
  150. return nil
  151. }
  152. // io.ErrUnexpectedEOF没有读满整个buf就遇到了EOF,此时正常发送剩余数据即可。除了这两个错误之外,其他错误都中断操作
  153. if err != nil && err != io.ErrUnexpectedEOF {
  154. logger.
  155. WithField("PlanID", req.PlanID).
  156. WithField("VarID", req.VarID).
  157. Warnf("reading stream data: %s", err.Error())
  158. return fmt.Errorf("reading stream data: %w", err)
  159. }
  160. }
  161. }
  162. func (s *Service) SendVar(ctx context.Context, req *agtrpc.SendVarReq) (*agtrpc.SendVarResp, error) {
  163. ctx, cancel := context.WithTimeout(ctx, time.Second*30)
  164. defer cancel()
  165. sw := s.swWorker.FindByIDContexted(ctx, exec.PlanID(req.PlanID))
  166. if sw == nil {
  167. return nil, fmt.Errorf("plan not found")
  168. }
  169. v, err := serder.JSONToObjectEx[exec.Var]([]byte(req.Var))
  170. if err != nil {
  171. return nil, fmt.Errorf("deserializing var: %w", err)
  172. }
  173. sw.PutVars(v)
  174. return &agtrpc.SendVarResp{}, nil
  175. }
  176. func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.GetVarResp, error) {
  177. ctx2, cancel := context.WithTimeout(ctx, time.Second*30)
  178. defer cancel()
  179. sw := s.swWorker.FindByIDContexted(ctx2, exec.PlanID(req.PlanID))
  180. if sw == nil {
  181. return nil, fmt.Errorf("plan not found")
  182. }
  183. v, err := serder.JSONToObjectEx[exec.Var]([]byte(req.Var))
  184. if err != nil {
  185. return nil, fmt.Errorf("deserializing var: %w", err)
  186. }
  187. signal, err := serder.JSONToObjectEx[*exec.SignalVar]([]byte(req.Signal))
  188. if err != nil {
  189. return nil, fmt.Errorf("deserializing var: %w", err)
  190. }
  191. sw.PutVars(signal)
  192. err = sw.BindVars(ctx, v)
  193. if err != nil {
  194. return nil, fmt.Errorf("binding vars: %w", err)
  195. }
  196. vd, err := serder.ObjectToJSONEx(v)
  197. if err != nil {
  198. return nil, fmt.Errorf("serializing var: %w", err)
  199. }
  200. return &agtrpc.GetVarResp{
  201. Var: string(vd),
  202. }, nil
  203. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。