You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

send.go 7.8 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. package ops
  2. import (
  3. "fmt"
  4. "io"
  5. "time"
  6. "gitlink.org.cn/cloudream/common/pkgs/future"
  7. "gitlink.org.cn/cloudream/common/utils/io2"
  8. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/dag"
  9. "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/ioswitch/exec"
  10. )
  11. const (
  12. SendStreamStatsStoreKey = "Stats.SendStream"
  13. )
  14. func init() {
  15. exec.UseOp[*SendStream]()
  16. exec.UseOp[*GetStream]()
  17. exec.UseOp[*SendVar]()
  18. exec.UseOp[*GetVar]()
  19. }
  20. type SendStreamStatsValue struct {
  21. StatsCtx string
  22. IsSend bool
  23. Length int64
  24. Time time.Duration
  25. Src exec.Location
  26. Dst exec.Location
  27. }
  28. func (v *SendStreamStatsValue) Clone() exec.VarValue {
  29. v2 := *v
  30. return &v2
  31. }
  32. type SendStream struct {
  33. Input exec.VarID
  34. Send exec.VarID
  35. Worker exec.WorkerInfo
  36. StatsCtx string
  37. }
  38. func (o *SendStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
  39. inputStr, err := exec.BindVar[*exec.StreamValue](e, ctx.Context, o.Input)
  40. if err != nil {
  41. return err
  42. }
  43. defer inputStr.Stream.Close()
  44. cli, err := o.Worker.NewClient()
  45. if err != nil {
  46. return fmt.Errorf("new worker %v client: %w", o.Worker, err)
  47. }
  48. defer cli.Close()
  49. counter := io2.CounterCloser(inputStr.Stream, nil)
  50. startTime := time.Now()
  51. // 发送后流的ID不同
  52. err = cli.SendStream(ctx.Context, e.Plan().ID, o.Send, counter)
  53. if err != nil {
  54. return fmt.Errorf("sending stream: %w", err)
  55. }
  56. e.Store(SendStreamStatsStoreKey, &SendStreamStatsValue{
  57. StatsCtx: o.StatsCtx,
  58. IsSend: true,
  59. Length: counter.Count(),
  60. Time: time.Since(startTime),
  61. Src: e.Location(),
  62. Dst: exec.Location{
  63. IsDriver: false,
  64. WorkerName: o.Worker.Name(),
  65. },
  66. })
  67. return nil
  68. }
  69. func (o *SendStream) String() string {
  70. return fmt.Sprintf("SendStream %v->%v@%v", o.Input, o.Send, o.Worker)
  71. }
  72. type GetStream struct {
  73. Signal exec.SignalVar
  74. Target exec.VarID
  75. Output exec.VarID
  76. Worker exec.WorkerInfo
  77. StatsCtx string
  78. }
  79. func (o *GetStream) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
  80. cli, err := o.Worker.NewClient()
  81. if err != nil {
  82. return fmt.Errorf("new worker %v client: %w", o.Worker, err)
  83. }
  84. defer cli.Close()
  85. str, err := cli.GetStream(ctx.Context, e.Plan().ID, o.Target, o.Signal.ID, o.Signal.Value)
  86. if err != nil {
  87. return fmt.Errorf("getting stream: %w", err)
  88. }
  89. startTime := time.Now()
  90. counter := io2.CounterCloser(str, nil)
  91. fut := future.NewSetVoid()
  92. // 获取后送到本地的流ID是不同的
  93. str = io2.AfterReadClosedOnce(counter, func(closer io.ReadCloser) {
  94. fut.SetVoid()
  95. })
  96. e.PutVar(o.Output, &exec.StreamValue{Stream: str})
  97. err = fut.Wait(ctx.Context)
  98. e.Store(SendStreamStatsStoreKey, &SendStreamStatsValue{
  99. StatsCtx: o.StatsCtx,
  100. IsSend: false,
  101. Length: counter.Count(),
  102. Time: time.Since(startTime),
  103. Src: exec.Location{
  104. IsDriver: false,
  105. WorkerName: o.Worker.Name(),
  106. },
  107. Dst: e.Location(),
  108. })
  109. return err
  110. }
  111. func (o *GetStream) String() string {
  112. return fmt.Sprintf("GetStream %v(S:%v)<-%v@%v", o.Output, o.Signal.ID, o.Target, o.Worker)
  113. }
  114. type SendVar struct {
  115. Input exec.VarID `json:"input"`
  116. Send exec.VarID `json:"send"`
  117. Worker exec.WorkerInfo `json:"worker"`
  118. }
  119. func (o *SendVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
  120. input, err := e.BindVar(ctx.Context, o.Input)
  121. if err != nil {
  122. return err
  123. }
  124. cli, err := o.Worker.NewClient()
  125. if err != nil {
  126. return fmt.Errorf("new worker %v client: %w", o.Worker, err)
  127. }
  128. defer cli.Close()
  129. err = cli.SendVar(ctx.Context, e.Plan().ID, o.Send, input)
  130. if err != nil {
  131. return fmt.Errorf("sending var: %w", err)
  132. }
  133. return nil
  134. }
  135. func (o *SendVar) String() string {
  136. return fmt.Sprintf("SendVar %v->%v@%v", o.Input, o.Send, o.Worker)
  137. }
  138. type GetVar struct {
  139. Signal exec.SignalVar `json:"signal"`
  140. Target exec.VarID `json:"target"`
  141. Output exec.VarID `json:"output"`
  142. Worker exec.WorkerInfo `json:"worker"`
  143. }
  144. func (o *GetVar) Execute(ctx *exec.ExecContext, e *exec.Executor) error {
  145. cli, err := o.Worker.NewClient()
  146. if err != nil {
  147. return fmt.Errorf("new worker %v client: %w", o.Worker, err)
  148. }
  149. defer cli.Close()
  150. get, err := cli.GetVar(ctx.Context, e.Plan().ID, o.Target, o.Signal.ID, o.Signal.Value)
  151. if err != nil {
  152. return fmt.Errorf("getting var: %w", err)
  153. }
  154. e.PutVar(o.Output, get)
  155. return nil
  156. }
  157. func (o *GetVar) String() string {
  158. return fmt.Sprintf("GetVar %v(S:%v)<-%v@%v", o.Output, o.Signal.ID, o.Target, o.Worker)
  159. }
  160. type SendStreamNode struct {
  161. dag.NodeBase
  162. ToWorker exec.WorkerInfo
  163. StatsCtx string
  164. }
  165. func (b *GraphNodeBuilder) NewSendStream(to exec.WorkerInfo, statsCtx string) *SendStreamNode {
  166. node := &SendStreamNode{
  167. ToWorker: to,
  168. StatsCtx: statsCtx,
  169. }
  170. b.AddNode(node)
  171. node.InputStreams().Init(1)
  172. node.OutputStreams().Init(node, 1)
  173. return node
  174. }
  175. func (t *SendStreamNode) Send(v *dag.StreamVar) *dag.StreamVar {
  176. v.To(t, 0)
  177. return t.OutputStreams().Get(0)
  178. }
  179. func (t *SendStreamNode) GenerateOp() (exec.Op, error) {
  180. return &SendStream{
  181. Input: t.InputStreams().Get(0).VarID,
  182. Send: t.OutputStreams().Get(0).VarID,
  183. Worker: t.ToWorker,
  184. StatsCtx: t.StatsCtx,
  185. }, nil
  186. }
  187. // func (t *SendStreamType) String() string {
  188. // return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node))
  189. // }
  190. type SendValueNode struct {
  191. dag.NodeBase
  192. ToWorker exec.WorkerInfo
  193. }
  194. func (b *GraphNodeBuilder) NewSendValue(to exec.WorkerInfo) *SendValueNode {
  195. node := &SendValueNode{
  196. ToWorker: to,
  197. }
  198. b.AddNode(node)
  199. node.InputValues().Init(1)
  200. node.OutputValues().Init(node, 1)
  201. return node
  202. }
  203. func (t *SendValueNode) Send(v *dag.ValueVar) *dag.ValueVar {
  204. v.To(t, 0)
  205. return t.OutputValues().Get(0)
  206. }
  207. func (t *SendValueNode) GenerateOp() (exec.Op, error) {
  208. return &SendVar{
  209. Input: t.InputValues().Get(0).VarID,
  210. Send: t.OutputValues().Get(0).VarID,
  211. Worker: t.ToWorker,
  212. }, nil
  213. }
  214. // func (t *SendVarType) String() string {
  215. // return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node))
  216. // }
  217. type GetStreamNode struct {
  218. dag.NodeBase
  219. FromWorker exec.WorkerInfo
  220. StatsCtx string
  221. }
  222. func (b *GraphNodeBuilder) NewGetStream(from exec.WorkerInfo, statsCtx string) *GetStreamNode {
  223. node := &GetStreamNode{
  224. FromWorker: from,
  225. StatsCtx: statsCtx,
  226. }
  227. b.AddNode(node)
  228. node.InputStreams().Init(1)
  229. node.OutputValues().Init(node, 1)
  230. node.OutputStreams().Init(node, 1)
  231. return node
  232. }
  233. func (t *GetStreamNode) Get(v *dag.StreamVar) *dag.StreamVar {
  234. v.To(t, 0)
  235. return t.OutputStreams().Get(0)
  236. }
  237. func (t *GetStreamNode) SignalVar() *dag.ValueVar {
  238. return t.OutputValues().Get(0)
  239. }
  240. func (t *GetStreamNode) GenerateOp() (exec.Op, error) {
  241. return &GetStream{
  242. Signal: exec.NewSignalVar(t.OutputValues().Get(0).VarID),
  243. Output: t.OutputStreams().Get(0).VarID,
  244. Target: t.InputStreams().Get(0).VarID,
  245. Worker: t.FromWorker,
  246. StatsCtx: t.StatsCtx,
  247. }, nil
  248. }
  249. // func (t *GetStreamType) String() string {
  250. // return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node))
  251. // }
  252. type GetValueNode struct {
  253. dag.NodeBase
  254. FromWorker exec.WorkerInfo
  255. }
  256. func (b *GraphNodeBuilder) NewGetValue(from exec.WorkerInfo) *GetValueNode {
  257. node := &GetValueNode{
  258. FromWorker: from,
  259. }
  260. b.AddNode(node)
  261. node.InputValues().Init(1)
  262. node.OutputValues().Init(node, 2)
  263. return node
  264. }
  265. func (t *GetValueNode) Get(v *dag.ValueVar) *dag.ValueVar {
  266. v.To(t, 0)
  267. return t.OutputValues().Get(1)
  268. }
  269. func (t *GetValueNode) SignalVar() *dag.ValueVar {
  270. return t.OutputValues().Get(0)
  271. }
  272. func (t *GetValueNode) GenerateOp() (exec.Op, error) {
  273. return &GetVar{
  274. Signal: exec.NewSignalVar(t.OutputValues().Get(0).VarID),
  275. Output: t.OutputValues().Get(1).VarID,
  276. Target: t.InputValues().Get(0).VarID,
  277. Worker: t.FromWorker,
  278. }, nil
  279. }
  280. // func (t *GetVaType) String() string {
  281. // return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node))
  282. // }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。