You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

executor.go 2.8 kB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package exec
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "gitlink.org.cn/cloudream/common/pkgs/future"
  7. "gitlink.org.cn/cloudream/common/utils/lo2"
  8. "gitlink.org.cn/cloudream/common/utils/sync2"
  9. )
  10. type binding struct {
  11. ID VarID
  12. Callback *future.SetValueFuture[VarValue]
  13. }
  14. type freeVar struct {
  15. ID VarID
  16. Value VarValue
  17. }
  18. type Executor struct {
  19. plan Plan
  20. vars map[VarID]freeVar
  21. bindings []*binding
  22. lock sync.Mutex
  23. store map[string]VarValue
  24. }
  25. func NewExecutor(plan Plan) *Executor {
  26. planning := Executor{
  27. plan: plan,
  28. vars: make(map[VarID]freeVar),
  29. store: make(map[string]VarValue),
  30. }
  31. return &planning
  32. }
  33. func (s *Executor) Plan() *Plan {
  34. return &s.plan
  35. }
  36. func (s *Executor) Run(ctx *ExecContext) (map[string]VarValue, error) {
  37. c, cancel := context.WithCancel(ctx.Context)
  38. ctx.Context = c
  39. defer cancel()
  40. err := sync2.ParallelDo(s.plan.Ops, func(o Op, idx int) error {
  41. err := o.Execute(ctx, s)
  42. s.lock.Lock()
  43. defer s.lock.Unlock()
  44. if err != nil {
  45. cancel()
  46. return fmt.Errorf("%T: %w", o, err)
  47. }
  48. return nil
  49. })
  50. if err != nil {
  51. return nil, err
  52. }
  53. return s.store, nil
  54. }
  55. func (s *Executor) BindVar(ctx context.Context, id VarID) (VarValue, error) {
  56. s.lock.Lock()
  57. gv, ok := s.vars[id]
  58. if ok {
  59. delete(s.vars, id)
  60. s.lock.Unlock()
  61. return gv.Value, nil
  62. }
  63. callback := future.NewSetValue[VarValue]()
  64. s.bindings = append(s.bindings, &binding{
  65. ID: id,
  66. Callback: callback,
  67. })
  68. s.lock.Unlock()
  69. return callback.Wait(ctx)
  70. }
  71. func (s *Executor) PutVar(id VarID, value VarValue) *Executor {
  72. s.lock.Lock()
  73. defer s.lock.Unlock()
  74. for ib, b := range s.bindings {
  75. if b.ID != id {
  76. continue
  77. }
  78. b.Callback.SetValue(value)
  79. s.bindings = lo2.RemoveAt(s.bindings, ib)
  80. return s
  81. }
  82. // 如果没有绑定,则直接放入变量表中
  83. s.vars[id] = freeVar{ID: id, Value: value}
  84. return s
  85. }
  86. func (s *Executor) Store(key string, val VarValue) {
  87. s.lock.Lock()
  88. defer s.lock.Unlock()
  89. s.store[key] = val
  90. }
  91. func BindVar[T VarValue](e *Executor, ctx context.Context, id VarID) (T, error) {
  92. v, err := e.BindVar(ctx, id)
  93. if err != nil {
  94. var def T
  95. return def, err
  96. }
  97. ret, ok := v.(T)
  98. if !ok {
  99. var def T
  100. return def, fmt.Errorf("binded var %v is %T, not %T", id, v, def)
  101. }
  102. return ret, nil
  103. }
  104. func BindArray[T VarValue](e *Executor, ctx context.Context, ids []VarID) ([]T, error) {
  105. ret := make([]T, len(ids))
  106. for i := range ids {
  107. v, err := e.BindVar(ctx, ids[i])
  108. if err != nil {
  109. return nil, err
  110. }
  111. v2, ok := v.(T)
  112. if !ok {
  113. var def T
  114. return nil, fmt.Errorf("binded var %v is %T, not %T", ids[i], v, def)
  115. }
  116. ret[i] = v2
  117. }
  118. return ret, nil
  119. }
  120. func PutArray[T VarValue](e *Executor, ids []VarID, values []T) {
  121. for i := range ids {
  122. e.PutVar(ids[i], values[i])
  123. }
  124. }