You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

node.go 3.9 kB

11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package dag
  2. import (
  3. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  4. "gitlink.org.cn/cloudream/common/utils/lo2"
  5. )
  6. type NodeEnvType string
  7. const (
  8. EnvUnknown NodeEnvType = ""
  9. EnvDriver NodeEnvType = "Driver"
  10. EnvWorker NodeEnvType = "Worker"
  11. )
  12. type NodeEnv struct {
  13. Type NodeEnvType
  14. Worker exec.WorkerInfo
  15. Pinned bool // 如果为true,则不应该改变这个节点的执行环境
  16. }
  17. func (e *NodeEnv) ToEnvUnknown() {
  18. e.Type = EnvUnknown
  19. e.Worker = nil
  20. }
  21. func (e *NodeEnv) ToEnvDriver() {
  22. e.Type = EnvDriver
  23. e.Worker = nil
  24. }
  25. func (e *NodeEnv) ToEnvWorker(worker exec.WorkerInfo) {
  26. e.Type = EnvWorker
  27. e.Worker = worker
  28. }
  29. func (e *NodeEnv) Equals(other *NodeEnv) bool {
  30. if e.Type != other.Type {
  31. return false
  32. }
  33. if e.Type != EnvWorker {
  34. return true
  35. }
  36. return e.Worker.Equals(other.Worker)
  37. }
  38. type Node interface {
  39. Graph() *Graph
  40. SetGraph(graph *Graph)
  41. Env() *NodeEnv
  42. InputStreams() *InputSlots
  43. OutputStreams() *OutputSlots
  44. InputValues() *InputSlots
  45. OutputValues() *OutputSlots
  46. GenerateOp() (exec.Op, error)
  47. // String() string
  48. }
  49. type VarSlots []*Var
  50. func (s *VarSlots) Len() int {
  51. return len(*s)
  52. }
  53. func (s *VarSlots) Get(idx int) *Var {
  54. return (*s)[idx]
  55. }
  56. func (s *VarSlots) Set(idx int, val *Var) *Var {
  57. old := (*s)[idx]
  58. (*s)[idx] = val
  59. return old
  60. }
  61. func (s *VarSlots) Append(val *Var) int {
  62. *s = append(*s, val)
  63. return s.Len() - 1
  64. }
  65. func (s *VarSlots) RemoveAt(idx int) {
  66. (*s) = lo2.RemoveAt(*s, idx)
  67. }
  68. func (s *VarSlots) Resize(size int) {
  69. if s.Len() < size {
  70. *s = append(*s, make([]*Var, size-s.Len())...)
  71. } else if s.Len() > size {
  72. *s = (*s)[:size]
  73. }
  74. }
  75. func (s *VarSlots) SetRawArray(arr []*Var) {
  76. *s = arr
  77. }
  78. func (s *VarSlots) RawArray() []*Var {
  79. return *s
  80. }
  81. type InputSlots struct {
  82. VarSlots
  83. }
  84. func (s *InputSlots) EnsureSize(cnt int) {
  85. if s.Len() < cnt {
  86. s.VarSlots = append(s.VarSlots, make([]*Var, cnt-s.Len())...)
  87. }
  88. }
  89. func (s *InputSlots) EnlargeOne() int {
  90. s.Append(nil)
  91. return s.Len() - 1
  92. }
  93. func (s *InputSlots) GetVarIDs() []exec.VarID {
  94. var ids []exec.VarID
  95. for _, v := range s.RawArray() {
  96. if v == nil {
  97. continue
  98. }
  99. ids = append(ids, v.VarID)
  100. }
  101. return ids
  102. }
  103. func (s *InputSlots) GetVarIDsRanged(start, end int) []exec.VarID {
  104. var ids []exec.VarID
  105. for i := start; i < end; i++ {
  106. v := s.Get(i)
  107. if v == nil {
  108. continue
  109. }
  110. ids = append(ids, v.VarID)
  111. }
  112. return ids
  113. }
  114. type OutputSlots struct {
  115. VarSlots
  116. }
  117. func (s *OutputSlots) Setup(my Node, v *Var, slotIdx int) {
  118. if s.Len() <= slotIdx {
  119. s.VarSlots = append(s.VarSlots, make([]*Var, slotIdx-s.Len()+1)...)
  120. }
  121. s.Set(slotIdx, v)
  122. *v.From() = EndPoint{
  123. Node: my,
  124. SlotIndex: slotIdx,
  125. }
  126. }
  127. func (s *OutputSlots) SetupNew(my Node, v *Var) {
  128. s.Append(v)
  129. *v.From() = EndPoint{
  130. Node: my,
  131. SlotIndex: s.Len() - 1,
  132. }
  133. }
  134. func (s *OutputSlots) GetVarIDs() []exec.VarID {
  135. var ids []exec.VarID
  136. for _, v := range s.RawArray() {
  137. if v == nil {
  138. continue
  139. }
  140. ids = append(ids, v.VarID)
  141. }
  142. return ids
  143. }
  144. func (s *OutputSlots) GetVarIDsRanged(start, end int) []exec.VarID {
  145. var ids []exec.VarID
  146. for i := start; i < end; i++ {
  147. v := s.Get(i)
  148. if v == nil {
  149. continue
  150. }
  151. ids = append(ids, v.VarID)
  152. }
  153. return ids
  154. }
  155. type Slot struct {
  156. Var *Var
  157. Index int
  158. }
  159. type NodeBase struct {
  160. env NodeEnv
  161. inputStreams InputSlots
  162. outputStreams OutputSlots
  163. inputValues InputSlots
  164. outputValues OutputSlots
  165. graph *Graph
  166. }
  167. func (n *NodeBase) Graph() *Graph {
  168. return n.graph
  169. }
  170. func (n *NodeBase) SetGraph(graph *Graph) {
  171. n.graph = graph
  172. }
  173. func (n *NodeBase) Env() *NodeEnv {
  174. return &n.env
  175. }
  176. func (n *NodeBase) InputStreams() *InputSlots {
  177. return &n.inputStreams
  178. }
  179. func (n *NodeBase) OutputStreams() *OutputSlots {
  180. return &n.outputStreams
  181. }
  182. func (n *NodeBase) InputValues() *InputSlots {
  183. return &n.inputValues
  184. }
  185. func (n *NodeBase) OutputValues() *OutputSlots {
  186. return &n.outputValues
  187. }