You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

send.go 6.3 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. package ops
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "gitlink.org.cn/cloudream/common/pkgs/future"
  7. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
  8. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  9. "gitlink.org.cn/cloudream/common/pkgs/logger"
  10. "gitlink.org.cn/cloudream/common/utils/io2"
  11. )
  12. func init() {
  13. exec.UseOp[*SendStream]()
  14. exec.UseOp[*GetStream]()
  15. exec.UseOp[*SendVar]()
  16. exec.UseOp[*GetVar]()
  17. }
  18. type SendStream struct {
  19. Input *exec.StreamVar `json:"input"`
  20. Send *exec.StreamVar `json:"send"`
  21. Worker exec.WorkerInfo `json:"worker"`
  22. }
  23. func (o *SendStream) Execute(ctx context.Context, e *exec.Executor) error {
  24. err := e.BindVars(ctx, o.Input)
  25. if err != nil {
  26. return err
  27. }
  28. defer o.Input.Stream.Close()
  29. cli, err := o.Worker.NewClient()
  30. if err != nil {
  31. return fmt.Errorf("new worker %v client: %w", o.Worker, err)
  32. }
  33. defer cli.Close()
  34. logger.Debugf("sending stream %v as %v to worker %v", o.Input.ID, o.Send.ID, o.Worker)
  35. // 发送后流的ID不同
  36. err = cli.SendStream(ctx, e.Plan().ID, o.Send, o.Input.Stream)
  37. if err != nil {
  38. return fmt.Errorf("sending stream: %w", err)
  39. }
  40. return nil
  41. }
  42. type GetStream struct {
  43. Signal *exec.SignalVar `json:"signal"`
  44. Target *exec.StreamVar `json:"target"`
  45. Output *exec.StreamVar `json:"output"`
  46. Worker exec.WorkerInfo `json:"worker"`
  47. }
  48. func (o *GetStream) Execute(ctx context.Context, e *exec.Executor) error {
  49. cli, err := o.Worker.NewClient()
  50. if err != nil {
  51. return fmt.Errorf("new worker %v client: %w", o.Worker, err)
  52. }
  53. defer cli.Close()
  54. logger.Debugf("getting stream %v as %v from worker %v", o.Target.ID, o.Output.ID, o.Worker)
  55. str, err := cli.GetStream(ctx, e.Plan().ID, o.Target, o.Signal)
  56. if err != nil {
  57. return fmt.Errorf("getting stream: %w", err)
  58. }
  59. fut := future.NewSetVoid()
  60. // 获取后送到本地的流ID是不同的
  61. o.Output.Stream = io2.AfterReadClosedOnce(str, func(closer io.ReadCloser) {
  62. fut.SetVoid()
  63. })
  64. e.PutVars(o.Output)
  65. return fut.Wait(ctx)
  66. }
  67. type SendVar struct {
  68. Input exec.Var `json:"input"`
  69. Send exec.Var `json:"send"`
  70. Worker exec.WorkerInfo `json:"worker"`
  71. }
  72. func (o *SendVar) Execute(ctx context.Context, e *exec.Executor) error {
  73. err := e.BindVars(ctx, o.Input)
  74. if err != nil {
  75. return err
  76. }
  77. cli, err := o.Worker.NewClient()
  78. if err != nil {
  79. return fmt.Errorf("new worker %v client: %w", o.Worker, err)
  80. }
  81. defer cli.Close()
  82. logger.Debugf("sending var %v as %v to worker %v", o.Input.GetID(), o.Send.GetID(), o.Worker)
  83. exec.AssignVar(o.Input, o.Send)
  84. err = cli.SendVar(ctx, e.Plan().ID, o.Send)
  85. if err != nil {
  86. return fmt.Errorf("sending var: %w", err)
  87. }
  88. return nil
  89. }
  90. type GetVar struct {
  91. Signal *exec.SignalVar `json:"signal"`
  92. Target exec.Var `json:"target"`
  93. Output exec.Var `json:"output"`
  94. Worker exec.WorkerInfo `json:"worker"`
  95. }
  96. func (o *GetVar) Execute(ctx context.Context, e *exec.Executor) error {
  97. cli, err := o.Worker.NewClient()
  98. if err != nil {
  99. return fmt.Errorf("new worker %v client: %w", o.Worker, err)
  100. }
  101. defer cli.Close()
  102. logger.Debugf("getting var %v as %v from worker %v", o.Target.GetID(), o.Output.GetID(), o.Worker)
  103. err = cli.GetVar(ctx, e.Plan().ID, o.Target, o.Signal)
  104. if err != nil {
  105. return fmt.Errorf("getting var: %w", err)
  106. }
  107. exec.AssignVar(o.Target, o.Output)
  108. e.PutVars(o.Output)
  109. return nil
  110. }
  111. type SendStreamType struct {
  112. ToWorker exec.WorkerInfo
  113. }
  114. func (t *SendStreamType) Send(n *dag.Node, v *dag.StreamVar) *dag.StreamVar {
  115. v.To(n, 0)
  116. return n.OutputStreams[0]
  117. }
  118. func (t *SendStreamType) InitNode(node *dag.Node) {
  119. dag.NodeDeclareInputStream(node, 1)
  120. dag.NodeNewOutputStream(node, nil)
  121. }
  122. func (t *SendStreamType) GenerateOp(op *dag.Node) (exec.Op, error) {
  123. return &SendStream{
  124. Input: op.InputStreams[0].Var,
  125. Send: op.OutputStreams[0].Var,
  126. Worker: t.ToWorker,
  127. }, nil
  128. }
  129. func (t *SendStreamType) String(node *dag.Node) string {
  130. return fmt.Sprintf("SendStream[]%v%v", formatStreamIO(node), formatValueIO(node))
  131. }
  132. type SendVarType struct {
  133. ToWorker exec.WorkerInfo
  134. }
  135. func (t *SendVarType) Send(n *dag.Node, v *dag.ValueVar) *dag.ValueVar {
  136. v.To(n, 0)
  137. n.OutputValues[0].Type = v.Type
  138. return n.OutputValues[0]
  139. }
  140. func (t *SendVarType) InitNode(node *dag.Node) {
  141. dag.NodeDeclareInputValue(node, 1)
  142. dag.NodeNewOutputValue(node, 0, nil)
  143. }
  144. func (t *SendVarType) GenerateOp(op *dag.Node) (exec.Op, error) {
  145. return &SendVar{
  146. Input: op.InputValues[0].Var,
  147. Send: op.OutputValues[0].Var,
  148. Worker: t.ToWorker,
  149. }, nil
  150. }
  151. func (t *SendVarType) String(node *dag.Node) string {
  152. return fmt.Sprintf("SendVar[]%v%v", formatStreamIO(node), formatValueIO(node))
  153. }
  154. type GetStreamType struct {
  155. FromWorker exec.WorkerInfo
  156. }
  157. func (t *GetStreamType) Get(n *dag.Node, v *dag.StreamVar) *dag.StreamVar {
  158. v.To(n, 0)
  159. return n.OutputStreams[0]
  160. }
  161. func (t *GetStreamType) SignalVar(n *dag.Node) *dag.ValueVar {
  162. return n.OutputValues[0]
  163. }
  164. func (t *GetStreamType) InitNode(node *dag.Node) {
  165. dag.NodeDeclareInputStream(node, 1)
  166. dag.NodeNewOutputValue(node, dag.SignalValueVar, nil)
  167. dag.NodeNewOutputStream(node, nil)
  168. }
  169. func (t *GetStreamType) GenerateOp(op *dag.Node) (exec.Op, error) {
  170. return &GetStream{
  171. Signal: op.OutputValues[0].Var.(*exec.SignalVar),
  172. Output: op.OutputStreams[0].Var,
  173. Target: op.InputStreams[0].Var,
  174. Worker: t.FromWorker,
  175. }, nil
  176. }
  177. func (t *GetStreamType) String(node *dag.Node) string {
  178. return fmt.Sprintf("GetStream[]%v%v", formatStreamIO(node), formatValueIO(node))
  179. }
  180. type GetVaType struct {
  181. FromWorker exec.WorkerInfo
  182. }
  183. func (t *GetVaType) Get(n *dag.Node, v *dag.ValueVar) *dag.ValueVar {
  184. v.To(n, 0)
  185. n.OutputValues[1].Type = v.Type
  186. return n.OutputValues[1]
  187. }
  188. func (t *GetVaType) SignalVar(n *dag.Node) *dag.ValueVar {
  189. return n.OutputValues[0]
  190. }
  191. func (t *GetVaType) InitNode(node *dag.Node) {
  192. dag.NodeDeclareInputValue(node, 1)
  193. dag.NodeNewOutputValue(node, dag.SignalValueVar, nil)
  194. dag.NodeNewOutputValue(node, 0, nil)
  195. }
  196. func (t *GetVaType) GenerateOp(op *dag.Node) (exec.Op, error) {
  197. return &GetVar{
  198. Signal: op.OutputValues[0].Var.(*exec.SignalVar),
  199. Output: op.OutputValues[1].Var,
  200. Target: op.InputValues[0].Var,
  201. Worker: t.FromWorker,
  202. }, nil
  203. }
  204. func (t *GetVaType) String(node *dag.Node) string {
  205. return fmt.Sprintf("GetVar[]%v%v", formatStreamIO(node), formatValueIO(node))
  206. }