You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

generate.go 3.8 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package plan
  2. import (
  3. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/dag"
  4. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  5. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/plan/ops"
  6. )
  7. func Generate(graph *dag.Graph, planBld *exec.PlanBuilder) error {
  8. generateSend(graph)
  9. return buildPlan(graph, planBld)
  10. }
  11. // 生成Send指令
  12. func generateSend(graph *dag.Graph) {
  13. graph.Walk(func(node *dag.Node) bool {
  14. for _, out := range node.OutputStreams {
  15. to := out.Toes[0]
  16. if to.Node.Env.Equals(node.Env) {
  17. continue
  18. }
  19. switch to.Node.Env.Type {
  20. case dag.EnvDriver:
  21. // // 如果是要送到Driver,则只能由Driver主动去拉取
  22. getNode := graph.NewNode(&ops.GetStreamType{}, nil)
  23. getNode.Env.ToEnvDriver()
  24. // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达
  25. holdNode := graph.NewNode(&ops.HoldUntilType{}, nil)
  26. holdNode.Env = node.Env
  27. // 将Get指令的信号送到Hold指令
  28. getNode.OutputValues[0].To(holdNode, 0)
  29. // 将Get指令的输出送到目的地
  30. getNode.OutputStreams[0].To(to.Node, to.SlotIndex)
  31. out.Toes = nil
  32. // 将源节点的输出送到Hold指令
  33. out.To(holdNode, 0)
  34. // 将Hold指令的输出送到Get指令
  35. holdNode.OutputStreams[0].To(getNode, 0)
  36. case dag.EnvWorker:
  37. // 如果是要送到Agent,则可以直接发送
  38. n := graph.NewNode(&ops.SendStreamType{}, nil)
  39. n.Env = node.Env
  40. n.OutputStreams[0].To(to.Node, to.SlotIndex)
  41. out.Toes = nil
  42. out.To(n, 0)
  43. }
  44. }
  45. for _, out := range node.OutputValues {
  46. to := out.Toes[0]
  47. if to.Node.Env.Equals(node.Env) {
  48. continue
  49. }
  50. switch to.Node.Env.Type {
  51. case dag.EnvDriver:
  52. // // 如果是要送到Driver,则只能由Driver主动去拉取
  53. getNode := graph.NewNode(&ops.GetVaType{}, nil)
  54. getNode.Env.ToEnvDriver()
  55. // // 同时需要对此变量生成HoldUntil指令,避免Plan结束时Get指令还未到达
  56. holdNode := graph.NewNode(&ops.HoldUntilType{}, nil)
  57. holdNode.Env = node.Env
  58. // 将Get指令的信号送到Hold指令
  59. getNode.OutputValues[0].To(holdNode, 0)
  60. // 将Get指令的输出送到目的地
  61. getNode.OutputValues[1].To(to.Node, to.SlotIndex)
  62. out.Toes = nil
  63. // 将源节点的输出送到Hold指令
  64. out.To(holdNode, 0)
  65. // 将Hold指令的输出送到Get指令
  66. holdNode.OutputValues[0].To(getNode, 0)
  67. case dag.EnvWorker:
  68. // 如果是要送到Agent,则可以直接发送
  69. n := graph.NewNode(&ops.SendVarType{}, nil)
  70. n.Env = node.Env
  71. n.OutputValues[0].To(to.Node, to.SlotIndex)
  72. out.Toes = nil
  73. out.To(n, 0)
  74. }
  75. }
  76. return true
  77. })
  78. }
  79. // 生成Plan
  80. func buildPlan(graph *dag.Graph, blder *exec.PlanBuilder) error {
  81. var retErr error
  82. graph.Walk(func(node *dag.Node) bool {
  83. for _, out := range node.OutputStreams {
  84. if out.Var != nil {
  85. continue
  86. }
  87. out.Var = blder.NewStreamVar()
  88. }
  89. for _, in := range node.InputStreams {
  90. if in.Var != nil {
  91. continue
  92. }
  93. in.Var = blder.NewStreamVar()
  94. }
  95. for _, out := range node.OutputValues {
  96. if out.Var != nil {
  97. continue
  98. }
  99. switch out.Type {
  100. case dag.StringValueVar:
  101. out.Var = blder.NewStringVar()
  102. case dag.SignalValueVar:
  103. out.Var = blder.NewSignalVar()
  104. }
  105. }
  106. for _, in := range node.InputValues {
  107. if in.Var != nil {
  108. continue
  109. }
  110. switch in.Type {
  111. case dag.StringValueVar:
  112. in.Var = blder.NewStringVar()
  113. case dag.SignalValueVar:
  114. in.Var = blder.NewSignalVar()
  115. }
  116. }
  117. op, err := node.Type.GenerateOp(node)
  118. if err != nil {
  119. retErr = err
  120. return false
  121. }
  122. switch node.Env.Type {
  123. case dag.EnvDriver:
  124. blder.AtDriver().AddOp(op)
  125. case dag.EnvWorker:
  126. blder.AtWorker(node.Env.Worker).AddOp(op)
  127. }
  128. return true
  129. })
  130. return retErr
  131. }