You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

driver.go 2.8 kB

1 year ago
1 year ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package exec
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "sync"
  7. "github.com/hashicorp/go-multierror"
  8. "gitlink.org.cn/cloudream/common/pkgs/future"
  9. "gitlink.org.cn/cloudream/common/utils/io2"
  10. "gitlink.org.cn/cloudream/common/utils/math2"
  11. )
  12. type Driver struct {
  13. planID PlanID
  14. planBlder *PlanBuilder
  15. callback *future.SetValueFuture[map[string]VarValue]
  16. ctx *ExecContext
  17. cancel context.CancelFunc
  18. driverExec *Executor
  19. }
  20. // 开始写入一个流。此函数会将输入视为一个完整的流,因此会给流包装一个Range来获取只需要的部分。
  21. func (e *Driver) BeginWrite(str io.ReadCloser, handle *DriverWriteStream) {
  22. e.driverExec.PutVar(handle.ID, &StreamValue{Stream: io2.NewRange(str, handle.RangeHint.Offset, handle.RangeHint.Length)})
  23. }
  24. // 开始写入一个流。此函数默认输入流已经是Handle的RangeHint锁描述的范围,因此不会做任何其他处理
  25. func (e *Driver) BeginWriteRanged(str io.ReadCloser, handle *DriverWriteStream) {
  26. e.driverExec.PutVar(handle.ID, &StreamValue{Stream: str})
  27. }
  28. func (e *Driver) BeginRead(handle *DriverReadStream) (io.ReadCloser, error) {
  29. str, err := BindVar[*StreamValue](e.driverExec, e.ctx.Context, handle.ID)
  30. if err != nil {
  31. return nil, fmt.Errorf("bind vars: %w", err)
  32. }
  33. return str.Stream, nil
  34. }
  35. func (e *Driver) Signal(signal *DriverSignalVar) {
  36. e.driverExec.PutVar(signal.ID, &SignalValue{})
  37. }
  38. func (e *Driver) Wait(ctx context.Context) (map[string]VarValue, error) {
  39. stored, err := e.callback.Wait(ctx)
  40. if err != nil {
  41. return nil, err
  42. }
  43. return stored, nil
  44. }
  45. func (e *Driver) execute() {
  46. wg := sync.WaitGroup{}
  47. errLock := sync.Mutex{}
  48. var execErr error
  49. for _, p := range e.planBlder.WorkerPlans {
  50. wg.Add(1)
  51. go func(p *WorkerPlanBuilder, ctx context.Context, cancel context.CancelFunc) {
  52. defer wg.Done()
  53. plan := Plan{
  54. ID: e.planID,
  55. Ops: p.Ops,
  56. }
  57. cli, err := p.Worker.NewClient()
  58. if err != nil {
  59. errLock.Lock()
  60. execErr = multierror.Append(execErr, fmt.Errorf("worker %v: new client: %w", p.Worker, err))
  61. errLock.Unlock()
  62. cancel()
  63. return
  64. }
  65. defer cli.Close()
  66. err = cli.ExecutePlan(ctx, plan)
  67. if err != nil {
  68. errLock.Lock()
  69. execErr = multierror.Append(execErr, fmt.Errorf("worker %v: execute plan: %w", p.Worker, err))
  70. errLock.Unlock()
  71. cancel()
  72. return
  73. }
  74. }(p, e.ctx.Context, e.cancel)
  75. }
  76. stored, err := e.driverExec.Run(e.ctx)
  77. if err != nil {
  78. errLock.Lock()
  79. execErr = multierror.Append(execErr, fmt.Errorf("driver: execute plan: %w", err))
  80. errLock.Unlock()
  81. e.cancel()
  82. }
  83. wg.Wait()
  84. e.callback.SetComplete(stored, execErr)
  85. }
  86. type DriverWriteStream struct {
  87. ID VarID
  88. RangeHint *math2.Range
  89. }
  90. type DriverReadStream struct {
  91. ID VarID
  92. }
  93. type DriverSignalVar struct {
  94. ID VarID
  95. Signal SignalValue
  96. }