You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

node.go 9.5 kB

11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
11 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. package dag
  2. import (
  3. "github.com/samber/lo"
  4. "gitlink.org.cn/cloudream/common/pkgs/ioswitch/exec"
  5. "gitlink.org.cn/cloudream/common/utils/lo2"
  6. )
  7. type NodeEnvType string
  8. const (
  9. EnvUnknown NodeEnvType = ""
  10. EnvDriver NodeEnvType = "Driver"
  11. EnvWorker NodeEnvType = "Worker"
  12. )
  13. type NodeEnv struct {
  14. Type NodeEnvType
  15. Worker exec.WorkerInfo
  16. Pinned bool // 如果为true,则不应该改变这个节点的执行环境
  17. }
  18. func (e *NodeEnv) ToEnvUnknown() {
  19. e.Type = EnvUnknown
  20. e.Worker = nil
  21. }
  22. func (e *NodeEnv) ToEnvDriver() {
  23. e.Type = EnvDriver
  24. e.Worker = nil
  25. }
  26. func (e *NodeEnv) ToEnvWorker(worker exec.WorkerInfo) {
  27. e.Type = EnvWorker
  28. e.Worker = worker
  29. }
  30. func (e *NodeEnv) CopyFrom(other *NodeEnv) {
  31. e.Type = other.Type
  32. e.Worker = other.Worker
  33. }
  34. func (e *NodeEnv) Equals(other *NodeEnv) bool {
  35. if e.Type != other.Type {
  36. return false
  37. }
  38. if e.Type != EnvWorker {
  39. return true
  40. }
  41. return e.Worker.Equals(other.Worker)
  42. }
  43. type Node interface {
  44. Graph() *Graph
  45. SetGraph(graph *Graph)
  46. Env() *NodeEnv
  47. InputStreams() *StreamInputSlots
  48. OutputStreams() *StreamOutputSlots
  49. InputValues() *ValueInputSlots
  50. OutputValues() *ValueOutputSlots
  51. GenerateOp() (exec.Op, error)
  52. // String() string
  53. }
  54. type VarSlots[T any] []*T
  55. func (s *VarSlots[T]) Len() int {
  56. return len(*s)
  57. }
  58. func (s *VarSlots[T]) Get(idx int) *T {
  59. return (*s)[idx]
  60. }
  61. func (s *VarSlots[T]) Set(idx int, val *T) *T {
  62. old := (*s)[idx]
  63. (*s)[idx] = val
  64. return old
  65. }
  66. func (s *VarSlots[T]) IndexOf(v *T) int {
  67. return lo.IndexOf(*s, v)
  68. }
  69. func (s *VarSlots[T]) Append(val *T) int {
  70. *s = append(*s, val)
  71. return s.Len() - 1
  72. }
  73. func (s *VarSlots[T]) Clear(val *T) {
  74. for i := 0; i < s.Len(); i++ {
  75. if (*s)[i] == val {
  76. (*s)[i] = nil
  77. }
  78. }
  79. }
  80. func (s *VarSlots[T]) RemoveAt(idx int) {
  81. (*s) = lo2.RemoveAt(*s, idx)
  82. }
  83. func (s *VarSlots[T]) RemoveRange(start int, cnt int) {
  84. *s = lo2.RemoveRange(*s, start, cnt)
  85. }
  86. func (s *VarSlots[T]) Resize(size int) {
  87. if s.Len() < size {
  88. *s = append(*s, make([]*T, size-s.Len())...)
  89. } else if s.Len() > size {
  90. *s = (*s)[:size]
  91. }
  92. }
  93. func (s *VarSlots[T]) SetRawArray(arr []*T) {
  94. *s = arr
  95. }
  96. func (s *VarSlots[T]) RawArray() []*T {
  97. return *s
  98. }
  99. type StreamInputSlots struct {
  100. Slots VarSlots[StreamVar]
  101. }
  102. func (s *StreamInputSlots) Len() int {
  103. return s.Slots.Len()
  104. }
  105. func (s *StreamInputSlots) Get(idx int) *StreamVar {
  106. return s.Slots.Get(idx)
  107. }
  108. func (s *StreamInputSlots) IndexOf(v *StreamVar) int {
  109. return s.Slots.IndexOf(v)
  110. }
  111. // 初始化输入流槽。调用者应该保证没有正在使用的槽位(即Slots的每一个元素都为nil)
  112. func (s *StreamInputSlots) Init(cnt int) {
  113. s.Slots.Resize(cnt)
  114. }
  115. func (s *StreamInputSlots) EnlargeOne() int {
  116. s.Slots.Append(nil)
  117. return s.Len() - 1
  118. }
  119. func (s *StreamInputSlots) ClearInputAt(my Node, idx int) {
  120. v := s.Get(idx)
  121. if v == nil {
  122. return
  123. }
  124. s.Slots.Set(idx, nil)
  125. v.Dst.Remove(my)
  126. }
  127. func (s *StreamInputSlots) ClearAllInput(my Node) {
  128. for i := 0; i < s.Len(); i++ {
  129. v := s.Get(i)
  130. if v == nil {
  131. continue
  132. }
  133. s.Slots.Set(i, nil)
  134. v.Dst.Remove(my)
  135. }
  136. }
  137. func (s *StreamInputSlots) GetVarIDs() []exec.VarID {
  138. var ids []exec.VarID
  139. for _, v := range s.Slots.RawArray() {
  140. if v == nil {
  141. continue
  142. }
  143. ids = append(ids, v.VarID)
  144. }
  145. return ids
  146. }
  147. func (s *StreamInputSlots) GetVarIDsRanged(start, end int) []exec.VarID {
  148. var ids []exec.VarID
  149. for i := start; i < end; i++ {
  150. v := s.Get(i)
  151. if v == nil {
  152. continue
  153. }
  154. ids = append(ids, v.VarID)
  155. }
  156. return ids
  157. }
  158. type ValueInputSlots struct {
  159. Slots VarSlots[ValueVar]
  160. }
  161. func (s *ValueInputSlots) Len() int {
  162. return s.Slots.Len()
  163. }
  164. func (s *ValueInputSlots) Get(idx int) *ValueVar {
  165. return s.Slots.Get(idx)
  166. }
  167. func (s *ValueInputSlots) IndexOf(v *ValueVar) int {
  168. return s.Slots.IndexOf(v)
  169. }
  170. // 初始化输入流槽。调用者应该保证没有正在使用的槽位(即Slots的每一个元素都为nil)
  171. func (s *ValueInputSlots) Init(cnt int) {
  172. if s.Len() < cnt {
  173. s.Slots = append(s.Slots, make([]*ValueVar, cnt-s.Len())...)
  174. }
  175. }
  176. func (s *ValueInputSlots) EnlargeOne() int {
  177. s.Slots.Append(nil)
  178. return s.Len() - 1
  179. }
  180. func (s *ValueInputSlots) ClearInputAt(my Node, idx int) {
  181. v := s.Get(idx)
  182. if v == nil {
  183. return
  184. }
  185. s.Slots.Set(idx, nil)
  186. v.Dst.Remove(my)
  187. }
  188. func (s *ValueInputSlots) GetVarIDs() []exec.VarID {
  189. var ids []exec.VarID
  190. for _, v := range s.Slots.RawArray() {
  191. if v == nil {
  192. continue
  193. }
  194. ids = append(ids, v.VarID)
  195. }
  196. return ids
  197. }
  198. func (s *ValueInputSlots) GetVarIDsStart(start int) []exec.VarID {
  199. return s.GetVarIDsRanged(start, s.Len())
  200. }
  201. func (s *ValueInputSlots) GetVarIDsRanged(start, end int) []exec.VarID {
  202. var ids []exec.VarID
  203. for i := start; i < end; i++ {
  204. v := s.Get(i)
  205. if v == nil {
  206. continue
  207. }
  208. ids = append(ids, v.VarID)
  209. }
  210. return ids
  211. }
  212. type StreamOutputSlots struct {
  213. Slots VarSlots[StreamVar]
  214. }
  215. func (s *StreamOutputSlots) Len() int {
  216. return s.Slots.Len()
  217. }
  218. func (s *StreamOutputSlots) Get(idx int) *StreamVar {
  219. return s.Slots.Get(idx)
  220. }
  221. func (s *StreamOutputSlots) IndexOf(v *StreamVar) int {
  222. return s.Slots.IndexOf(v)
  223. }
  224. // 设置Slots大小,并为每个Slot创建一个StreamVar。
  225. // 调用者应该保证没有正在使用的输出流,即每一个输出流的Dst都为空。
  226. func (s *StreamOutputSlots) Init(my Node, size int) {
  227. s.Slots.Resize(size)
  228. for i := 0; i < size; i++ {
  229. v := my.Graph().NewStreamVar()
  230. v.Src = my
  231. s.Slots.Set(i, v)
  232. }
  233. }
  234. // 在Slots末尾增加一个StreamVar,并返回它的索引
  235. func (s *StreamOutputSlots) AppendNew(my Node) StreamOutputSlot {
  236. v := my.Graph().NewStreamVar()
  237. v.Src = my
  238. s.Slots.Append(v)
  239. return StreamOutputSlot{Node: my, Index: s.Len() - 1}
  240. }
  241. // 断开指定位置的输出流到指定节点的连接
  242. func (s *StreamOutputSlots) ClearOutputAt(idx int, dst Node) {
  243. v := s.Get(idx)
  244. v.Dst.Remove(dst)
  245. dst.InputStreams().Slots.Clear(v)
  246. }
  247. // 断开所有输出流的所有连接,完全清空所有输出流。但会保留流变量
  248. func (s *StreamOutputSlots) ClearAllOutput(my Node) {
  249. for i := 0; i < s.Len(); i++ {
  250. v := s.Get(i)
  251. v.ClearAllDst()
  252. }
  253. }
  254. func (s *StreamOutputSlots) GetVarIDs() []exec.VarID {
  255. var ids []exec.VarID
  256. for _, v := range s.Slots.RawArray() {
  257. if v == nil {
  258. continue
  259. }
  260. ids = append(ids, v.VarID)
  261. }
  262. return ids
  263. }
  264. func (s *StreamOutputSlots) GetVarIDsRanged(start, end int) []exec.VarID {
  265. var ids []exec.VarID
  266. for i := start; i < end; i++ {
  267. v := s.Get(i)
  268. if v == nil {
  269. continue
  270. }
  271. ids = append(ids, v.VarID)
  272. }
  273. return ids
  274. }
  275. type ValueOutputSlots struct {
  276. Slots VarSlots[ValueVar]
  277. }
  278. func (s *ValueOutputSlots) Len() int {
  279. return s.Slots.Len()
  280. }
  281. func (s *ValueOutputSlots) Get(idx int) *ValueVar {
  282. return s.Slots.Get(idx)
  283. }
  284. func (s *ValueOutputSlots) IndexOf(v *ValueVar) int {
  285. return s.Slots.IndexOf(v)
  286. }
  287. // 设置Slots大小,并为每个Slot创建一个StreamVar
  288. // 调用者应该保证没有正在使用的输出流,即每一个输出流的Dst都为空。
  289. func (s *ValueOutputSlots) Init(my Node, size int) {
  290. s.Slots.Resize(size)
  291. for i := 0; i < size; i++ {
  292. v := my.Graph().NewValueVar()
  293. v.Src = my
  294. s.Slots.Set(i, v)
  295. }
  296. }
  297. // 在Slots末尾增加一个StreamVar,并返回它的索引
  298. func (s *ValueOutputSlots) AppendNew(my Node) ValueOutputSlot {
  299. v := my.Graph().NewValueVar()
  300. v.Src = my
  301. s.Slots.Append(v)
  302. return ValueOutputSlot{Node: my, Index: s.Len() - 1}
  303. }
  304. // 断开指定位置的输出流到指定节点的连接
  305. func (s *ValueOutputSlots) ClearOutputAt(idx int, dst Node) {
  306. v := s.Get(idx)
  307. v.Dst.Remove(dst)
  308. dst.InputValues().Slots.Clear(v)
  309. }
  310. // 断开所有输出流的所有连接,完全清空所有输出流。但会保留流变量
  311. func (s *ValueOutputSlots) ClearAllOutput(my Node) {
  312. for i := 0; i < s.Len(); i++ {
  313. v := s.Get(i)
  314. v.ClearAllDst()
  315. }
  316. }
  317. func (s *ValueOutputSlots) GetVarIDs() []exec.VarID {
  318. var ids []exec.VarID
  319. for _, v := range s.Slots.RawArray() {
  320. if v == nil {
  321. continue
  322. }
  323. ids = append(ids, v.VarID)
  324. }
  325. return ids
  326. }
  327. func (s *ValueOutputSlots) GetVarIDsRanged(start, end int) []exec.VarID {
  328. var ids []exec.VarID
  329. for i := start; i < end; i++ {
  330. v := s.Get(i)
  331. if v == nil {
  332. continue
  333. }
  334. ids = append(ids, v.VarID)
  335. }
  336. return ids
  337. }
  338. type NodeBase struct {
  339. env NodeEnv
  340. inputStreams StreamInputSlots
  341. outputStreams StreamOutputSlots
  342. inputValues ValueInputSlots
  343. outputValues ValueOutputSlots
  344. graph *Graph
  345. }
  346. func (n *NodeBase) Graph() *Graph {
  347. return n.graph
  348. }
  349. func (n *NodeBase) SetGraph(graph *Graph) {
  350. n.graph = graph
  351. }
  352. func (n *NodeBase) Env() *NodeEnv {
  353. return &n.env
  354. }
  355. func (n *NodeBase) InputStreams() *StreamInputSlots {
  356. return &n.inputStreams
  357. }
  358. func (n *NodeBase) OutputStreams() *StreamOutputSlots {
  359. return &n.outputStreams
  360. }
  361. func (n *NodeBase) InputValues() *ValueInputSlots {
  362. return &n.inputValues
  363. }
  364. func (n *NodeBase) OutputValues() *ValueOutputSlots {
  365. return &n.outputValues
  366. }
  367. type StreamOutputSlot struct {
  368. Node Node
  369. Index int
  370. }
  371. func (s StreamOutputSlot) Var() *StreamVar {
  372. return s.Node.OutputStreams().Get(s.Index)
  373. }
  374. type StreamInputSlot struct {
  375. Node Node
  376. Index int
  377. }
  378. func (s StreamInputSlot) Var() *StreamVar {
  379. return s.Node.InputStreams().Get(s.Index)
  380. }
  381. type ValueOutputSlot struct {
  382. Node Node
  383. Index int
  384. }
  385. func (s ValueOutputSlot) Var() *ValueVar {
  386. return s.Node.OutputValues().Get(s.Index)
  387. }
  388. type ValueInputSlot struct {
  389. Node Node
  390. Index int
  391. }
  392. func (s ValueInputSlot) Var() *ValueVar {
  393. return s.Node.InputValues().Get(s.Index)
  394. }