You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

executor.go 3.3 kB

1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package exec
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "github.com/hashicorp/go-multierror"
  7. "gitlink.org.cn/cloudream/common/pkgs/future"
  8. "gitlink.org.cn/cloudream/common/utils/lo2"
  9. "gitlink.org.cn/cloudream/common/utils/sync2"
  10. )
  11. type binding struct {
  12. ID VarID
  13. Callback *future.SetValueFuture[VarValue]
  14. }
  15. type freeVar struct {
  16. ID VarID
  17. Value VarValue
  18. }
  19. type Executor struct {
  20. plan Plan
  21. vars map[VarID]freeVar
  22. bindings []*binding
  23. lock sync.Mutex
  24. store map[string]VarValue
  25. }
  26. func NewExecutor(plan Plan) *Executor {
  27. planning := Executor{
  28. plan: plan,
  29. vars: make(map[VarID]freeVar),
  30. store: make(map[string]VarValue),
  31. }
  32. return &planning
  33. }
  34. func (s *Executor) Plan() *Plan {
  35. return &s.plan
  36. }
  37. func (s *Executor) Run(ctx *ExecContext) (map[string]VarValue, error) {
  38. c, cancel := context.WithCancel(ctx.Context)
  39. ctx.Context = c
  40. defer cancel()
  41. err := s.runOps(s.plan.Ops, ctx, cancel)
  42. if err != nil {
  43. return nil, err
  44. }
  45. return s.store, nil
  46. }
  47. func (s *Executor) runOps(ops []Op, ctx *ExecContext, cancel context.CancelFunc) error {
  48. lock := sync.Mutex{}
  49. var err error
  50. var wg sync.WaitGroup
  51. wg.Add(len(ops))
  52. for i, arg := range ops {
  53. go func(arg Op, index int) {
  54. defer wg.Done()
  55. if e := arg.Execute(ctx, s); e != nil {
  56. lock.Lock()
  57. // 尽量不记录 ErrContextCanceled 错误,除非没有其他错误
  58. if err == nil {
  59. err = e
  60. } else if err == sync2.ErrContextCanceled {
  61. err = e
  62. } else if e != sync2.ErrContextCanceled {
  63. err = multierror.Append(err, e)
  64. }
  65. lock.Unlock()
  66. cancel()
  67. }
  68. }(arg, i)
  69. }
  70. wg.Wait()
  71. return err
  72. }
  73. func (s *Executor) BindVar(ctx context.Context, id VarID) (VarValue, error) {
  74. s.lock.Lock()
  75. gv, ok := s.vars[id]
  76. if ok {
  77. delete(s.vars, id)
  78. s.lock.Unlock()
  79. return gv.Value, nil
  80. }
  81. callback := future.NewSetValue[VarValue]()
  82. s.bindings = append(s.bindings, &binding{
  83. ID: id,
  84. Callback: callback,
  85. })
  86. s.lock.Unlock()
  87. return callback.Wait(ctx)
  88. }
  89. func (s *Executor) PutVar(id VarID, value VarValue) *Executor {
  90. s.lock.Lock()
  91. defer s.lock.Unlock()
  92. for ib, b := range s.bindings {
  93. if b.ID != id {
  94. continue
  95. }
  96. b.Callback.SetValue(value)
  97. s.bindings = lo2.RemoveAt(s.bindings, ib)
  98. return s
  99. }
  100. // 如果没有绑定,则直接放入变量表中
  101. s.vars[id] = freeVar{ID: id, Value: value}
  102. return s
  103. }
  104. func (s *Executor) Store(key string, val VarValue) {
  105. s.lock.Lock()
  106. defer s.lock.Unlock()
  107. s.store[key] = val
  108. }
  109. func BindVar[T VarValue](e *Executor, ctx context.Context, id VarID) (T, error) {
  110. v, err := e.BindVar(ctx, id)
  111. if err != nil {
  112. var def T
  113. return def, err
  114. }
  115. ret, ok := v.(T)
  116. if !ok {
  117. var def T
  118. return def, fmt.Errorf("binded var %v is %T, not %T", id, v, def)
  119. }
  120. return ret, nil
  121. }
  122. func BindArray[T VarValue](e *Executor, ctx context.Context, ids []VarID) ([]T, error) {
  123. ret := make([]T, len(ids))
  124. for i := range ids {
  125. v, err := e.BindVar(ctx, ids[i])
  126. if err != nil {
  127. return nil, err
  128. }
  129. v2, ok := v.(T)
  130. if !ok {
  131. var def T
  132. return nil, fmt.Errorf("binded var %v is %T, not %T", ids[i], v, def)
  133. }
  134. ret[i] = v2
  135. }
  136. return ret, nil
  137. }
  138. func PutArray[T VarValue](e *Executor, ids []VarID, values []T) {
  139. for i := range ids {
  140. e.PutVar(ids[i], values[i])
  141. }
  142. }