You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

io.go 6.7 kB

2 years ago
2 years ago
2 years ago
1 year ago
2 years ago
2 years ago
10 months ago
2 years ago
2 years ago
1 year ago
2 years ago
1 year ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
1 year ago
2 years ago
1 year ago
1 year ago
1 year ago
2 years ago
2 years ago
2 years ago
2 years ago
1 year ago
2 years ago
2 years ago
2 years ago
1 year ago
2 years ago
1 year ago
2 years ago
2 years ago
2 years ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. package grpc
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "time"
  7. "github.com/inhies/go-bytesize"
  8. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. "gitlink.org.cn/cloudream/common/utils/io2"
  11. "gitlink.org.cn/cloudream/common/utils/serder"
  12. agtrpc "gitlink.org.cn/cloudream/storage/common/pkgs/grpc/agent"
  13. )
  14. func (s *Service) ExecuteIOPlan(ctx context.Context, req *agtrpc.ExecuteIOPlanReq) (*agtrpc.ExecuteIOPlanResp, error) {
  15. plan, err := serder.JSONToObjectEx[exec.Plan]([]byte(req.Plan))
  16. if err != nil {
  17. return nil, fmt.Errorf("deserializing plan: %w", err)
  18. }
  19. log := logger.WithField("PlanID", plan.ID)
  20. log.Infof("begin execute io plan")
  21. sw := exec.NewExecutor(plan)
  22. s.swWorker.Add(sw)
  23. defer s.swWorker.Remove(sw)
  24. execCtx := exec.NewWithContext(ctx)
  25. exec.SetValueByType(execCtx, s.stgAgts)
  26. _, err = sw.Run(execCtx)
  27. if err != nil {
  28. log.Warnf("running io plan: %v", err)
  29. return nil, fmt.Errorf("running io plan: %w", err)
  30. }
  31. log.Infof("plan finished")
  32. return &agtrpc.ExecuteIOPlanResp{}, nil
  33. }
  34. func (s *Service) SendStream(server agtrpc.Agent_SendStreamServer) error {
  35. msg, err := server.Recv()
  36. if err != nil {
  37. return fmt.Errorf("recving stream id packet: %w", err)
  38. }
  39. if msg.Type != agtrpc.StreamDataPacketType_SendArgs {
  40. return fmt.Errorf("first packet must be a SendArgs packet")
  41. }
  42. logger.
  43. WithField("PlanID", msg.PlanID).
  44. WithField("VarID", msg.VarID).
  45. Debugf("stream input")
  46. // 同一批Plan中每个节点的Plan的启动时间有先后,但最多不应该超过30秒
  47. ctx, cancel := context.WithTimeout(server.Context(), time.Second*30)
  48. defer cancel()
  49. sw := s.swWorker.FindByIDContexted(ctx, exec.PlanID(msg.PlanID))
  50. if sw == nil {
  51. return fmt.Errorf("plan not found")
  52. }
  53. pr, pw := io.Pipe()
  54. varID := exec.VarID(msg.VarID)
  55. sw.PutVar(varID, &exec.StreamValue{Stream: pr})
  56. // 然后读取文件数据
  57. var recvSize int64
  58. for {
  59. msg, err := server.Recv()
  60. // 读取客户端数据失败
  61. // 即使err是io.EOF,只要没有收到客户端包含EOF数据包就被断开了连接,就认为接收失败
  62. if err != nil {
  63. // 关闭文件写入
  64. pw.CloseWithError(io.ErrClosedPipe)
  65. logger.WithField("ReceiveSize", recvSize).
  66. WithField("VarID", varID).
  67. Warnf("recv message failed, err: %s", err.Error())
  68. return fmt.Errorf("recv message failed, err: %w", err)
  69. }
  70. err = io2.WriteAll(pw, msg.Data)
  71. if err != nil {
  72. // 关闭文件写入
  73. pw.CloseWithError(io.ErrClosedPipe)
  74. logger.Warnf("write data to file failed, err: %s", err.Error())
  75. return fmt.Errorf("write data to file failed, err: %w", err)
  76. }
  77. recvSize += int64(len(msg.Data))
  78. if msg.Type == agtrpc.StreamDataPacketType_EOF {
  79. // 客户端明确说明文件传输已经结束,那么结束写入,获得文件Hash
  80. err := pw.Close()
  81. if err != nil {
  82. logger.Warnf("finish writing failed, err: %s", err.Error())
  83. return fmt.Errorf("finish writing failed, err: %w", err)
  84. }
  85. // 并将结果返回到客户端
  86. err = server.SendAndClose(&agtrpc.SendStreamResp{})
  87. if err != nil {
  88. logger.Warnf("send response failed, err: %s", err.Error())
  89. return fmt.Errorf("send response failed, err: %w", err)
  90. }
  91. return nil
  92. }
  93. }
  94. }
  95. func (s *Service) GetStream(req *agtrpc.GetStreamReq, server agtrpc.Agent_GetStreamServer) error {
  96. logger.
  97. WithField("PlanID", req.PlanID).
  98. WithField("VarID", req.VarID).
  99. Debugf("stream output")
  100. // 同上
  101. ctx, cancel := context.WithTimeout(server.Context(), time.Second*30)
  102. defer cancel()
  103. sw := s.swWorker.FindByIDContexted(ctx, exec.PlanID(req.PlanID))
  104. if sw == nil {
  105. return fmt.Errorf("plan not found")
  106. }
  107. signal, err := serder.JSONToObjectEx[exec.VarValue]([]byte(req.Signal))
  108. if err != nil {
  109. return fmt.Errorf("deserializing var: %w", err)
  110. }
  111. sw.PutVar(exec.VarID(req.SignalID), signal)
  112. strVar, err := exec.BindVar[*exec.StreamValue](sw, server.Context(), exec.VarID(req.VarID))
  113. if err != nil {
  114. return fmt.Errorf("binding vars: %w", err)
  115. }
  116. reader := strVar.Stream
  117. defer reader.Close()
  118. buf := make([]byte, 1024*64)
  119. readAllCnt := 0
  120. startTime := time.Now()
  121. for {
  122. readCnt, err := reader.Read(buf)
  123. if readCnt > 0 {
  124. readAllCnt += readCnt
  125. err = server.Send(&agtrpc.StreamDataPacket{
  126. Type: agtrpc.StreamDataPacketType_Data,
  127. Data: buf[:readCnt],
  128. })
  129. if err != nil {
  130. logger.
  131. WithField("PlanID", req.PlanID).
  132. WithField("VarID", req.VarID).
  133. Warnf("send stream data failed, err: %s", err.Error())
  134. return fmt.Errorf("send stream data failed, err: %w", err)
  135. }
  136. }
  137. // 文件读取完毕
  138. if err == io.EOF {
  139. dt := time.Since(startTime)
  140. logger.
  141. WithField("PlanID", req.PlanID).
  142. WithField("VarID", req.VarID).
  143. Debugf("send data size %d in %v, speed %v/s", readAllCnt, dt, bytesize.New(float64(readAllCnt)/dt.Seconds()))
  144. // 发送EOF消息
  145. server.Send(&agtrpc.StreamDataPacket{
  146. Type: agtrpc.StreamDataPacketType_EOF,
  147. })
  148. return nil
  149. }
  150. // io.ErrUnexpectedEOF没有读满整个buf就遇到了EOF,此时正常发送剩余数据即可。除了这两个错误之外,其他错误都中断操作
  151. if err != nil && err != io.ErrUnexpectedEOF {
  152. logger.
  153. WithField("PlanID", req.PlanID).
  154. WithField("VarID", req.VarID).
  155. Warnf("reading stream data: %s", err.Error())
  156. return fmt.Errorf("reading stream data: %w", err)
  157. }
  158. }
  159. }
  160. func (s *Service) SendVar(ctx context.Context, req *agtrpc.SendVarReq) (*agtrpc.SendVarResp, error) {
  161. ctx, cancel := context.WithTimeout(ctx, time.Second*30)
  162. defer cancel()
  163. sw := s.swWorker.FindByIDContexted(ctx, exec.PlanID(req.PlanID))
  164. if sw == nil {
  165. return nil, fmt.Errorf("plan not found")
  166. }
  167. v, err := serder.JSONToObjectEx[exec.VarValue]([]byte(req.VarValue))
  168. if err != nil {
  169. return nil, fmt.Errorf("deserializing var: %w", err)
  170. }
  171. sw.PutVar(exec.VarID(req.VarID), v)
  172. return &agtrpc.SendVarResp{}, nil
  173. }
  174. func (s *Service) GetVar(ctx context.Context, req *agtrpc.GetVarReq) (*agtrpc.GetVarResp, error) {
  175. ctx2, cancel := context.WithTimeout(ctx, time.Second*30)
  176. defer cancel()
  177. sw := s.swWorker.FindByIDContexted(ctx2, exec.PlanID(req.PlanID))
  178. if sw == nil {
  179. return nil, fmt.Errorf("plan not found")
  180. }
  181. signal, err := serder.JSONToObjectEx[exec.VarValue]([]byte(req.Signal))
  182. if err != nil {
  183. return nil, fmt.Errorf("deserializing var: %w", err)
  184. }
  185. sw.PutVar(exec.VarID(req.SignalID), signal)
  186. v, err := sw.BindVar(ctx, exec.VarID(req.VarID))
  187. if err != nil {
  188. return nil, fmt.Errorf("binding vars: %w", err)
  189. }
  190. vd, err := serder.ObjectToJSONEx(v)
  191. if err != nil {
  192. return nil, fmt.Errorf("serializing var: %w", err)
  193. }
  194. return &agtrpc.GetVarResp{
  195. Var: string(vd),
  196. }, nil
  197. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。