From c44dc0225ac038aa04950bdffa4cc8f45b9e90bb Mon Sep 17 00:00:00 2001 From: JeshuaRen <270813223@qq.com> Date: Fri, 9 Aug 2024 17:42:11 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E6=96=B0=E5=A2=9E=E5=A2=9E=E9=87=8F?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B=E6=9B=B4=E6=96=B0=E4=BB=BB=E5=8A=A1=202?= =?UTF-8?q?=E3=80=81=E5=AE=8C=E6=88=90=E6=89=A7=E8=A1=8C=E5=99=A8=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=9C=BA=E5=88=B6=E4=B8=8E=E6=B6=88=E6=81=AF=E6=8E=A8?= =?UTF-8?q?=E9=80=81=E7=AD=89=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkgs/actor/actor.go | 4 +- pkgs/async/unbound_channel.go | 89 +++++++++++++++++ pkgs/distlock/internal/acquire_actor.go | 2 +- pkgs/future/future.go | 23 ++++- pkgs/future/ready.go | 87 +++++++++++++++++ pkgs/future/set_value_future.go | 125 ++++++++++++------------ pkgs/future/set_void_future.go | 27 ++--- sdks/scheduler/models.go | 4 +- 8 files changed, 277 insertions(+), 84 deletions(-) create mode 100644 pkgs/async/unbound_channel.go create mode 100644 pkgs/future/ready.go diff --git a/pkgs/actor/actor.go b/pkgs/actor/actor.go index 2df0b71..5f4994e 100644 --- a/pkgs/actor/actor.go +++ b/pkgs/actor/actor.go @@ -126,7 +126,7 @@ func WaitValue[T any](ctx context.Context, c *CommandChannel, cmd func() (T, err fut.SetComplete(val, err) }) - return fut.WaitValue(ctx) + return fut.Wait(ctx) } func WaitValue2[T1 any, T2 any](ctx context.Context, c *CommandChannel, cmd func() (T1, T2, error)) (T1, T2, error) { @@ -137,5 +137,5 @@ func WaitValue2[T1 any, T2 any](ctx context.Context, c *CommandChannel, cmd func fut.SetComplete(val1, val2, err) }) - return fut.WaitValue(ctx) + return fut.Wait(ctx) } diff --git a/pkgs/async/unbound_channel.go b/pkgs/async/unbound_channel.go new file mode 100644 index 0000000..5e2800d --- /dev/null +++ b/pkgs/async/unbound_channel.go @@ -0,0 +1,89 @@ +package async + +import ( + "container/list" + "errors" + "gitlink.org.cn/cloudream/common/pkgs/future" + "sync" +) + +var ErrChannelClosed = errors.New("channel is closed") + +type UnboundChannel[T any] struct { + values *list.List + waiters []*future.SetValueFuture[T] + lock sync.Mutex + err error +} + +func NewUnboundChannel[T any]() *UnboundChannel[T] { + return &UnboundChannel[T]{ + values: list.New(), + } +} + +func (c *UnboundChannel[T]) Error() error { + return c.err +} + +func (c *UnboundChannel[T]) Send(val T) error { + c.lock.Lock() + defer c.lock.Unlock() + + if c.err != nil { + return c.err + } + + c.values.PushBack(val) + + for len(c.waiters) > 0 && c.values.Len() > 0 { + waiter := c.waiters[0] + waiter.SetValue(c.values.Front().Value.(T)) + c.values.Remove(c.values.Front()) + c.waiters = c.waiters[1:] + return nil + } + + return nil +} + +func (c *UnboundChannel[T]) Receive() future.Future1[T] { + c.lock.Lock() + defer c.lock.Unlock() + + if c.err != nil { + return future.NewReadyError1[T](c.err) + } + + if c.values.Len() > 0 { + ret := c.values.Front().Value.(T) + c.values.Remove(c.values.Front()) + return future.NewReadyValue1[T](ret) + } + + fut := future.NewSetValue[T]() + c.waiters = append(c.waiters, fut) + + return fut +} + +func (c *UnboundChannel[T]) Close() { + c.CloseWithError(ErrChannelClosed) +} + +func (c *UnboundChannel[T]) CloseWithError(err error) { + c.lock.Lock() + defer c.lock.Unlock() + + if c.err != nil { + return + } + c.err = err + + for i := 0; i < len(c.waiters); i++ { + c.waiters[i].SetError(c.err) + } + + c.waiters = nil + c.values = nil +} diff --git a/pkgs/distlock/internal/acquire_actor.go b/pkgs/distlock/internal/acquire_actor.go index d53f199..324da5f 100644 --- a/pkgs/distlock/internal/acquire_actor.go +++ b/pkgs/distlock/internal/acquire_actor.go @@ -93,7 +93,7 @@ func (a *AcquireActor) Acquire(ctx context.Context, req LockRequest) (string, er }() // 此处不能直接用ctx去等Callback,原因是Wait超时不代表锁没有获取到,这会导致锁泄露。 - return info.Callback.WaitValue(context.Background()) + return info.Callback.Wait(context.Background()) } // TryAcquireNow 重试一下内部还没有成功的锁请求。不会阻塞调用者 diff --git a/pkgs/future/future.go b/pkgs/future/future.go index 705864f..b964e2f 100644 --- a/pkgs/future/future.go +++ b/pkgs/future/future.go @@ -6,18 +6,31 @@ import ( ) var ErrContextCancelled = fmt.Errorf("context cancelled") +var ErrCompleted = fmt.Errorf("context cancelled") type Future interface { - Error() error IsComplete() bool + Chan() <-chan error + Wait(ctx context.Context) error } -type ValueFuture[T any] interface { - Future +type ChanValue1[T any] struct { + Value T + Err error +} + +type ChanValue2[T1 any, T2 any] struct { + Value1 T1 + Value2 T2 + Err error +} + +type Future1[T any] interface { + IsComplete() bool - Value() T + Chan() <-chan ChanValue1[T] - WaitValue(ctx context.Context) (T, error) + Wait(ctx context.Context) (T, error) } diff --git a/pkgs/future/ready.go b/pkgs/future/ready.go new file mode 100644 index 0000000..9c1e766 --- /dev/null +++ b/pkgs/future/ready.go @@ -0,0 +1,87 @@ +package future + +import "context" + +type Ready struct { + ch chan error +} + +func NewReady(err error) *Ready { + ch := make(chan error, 1) + ch <- err + close(ch) + + return &Ready{ + ch: ch, + } +} + +func (f *Ready) IsComplete() bool { + return true +} + +func (f *Ready) Wait(ctx context.Context) error { + select { + case v, ok := <-f.ch: + if !ok { + return ErrCompleted + } + return v + + case <-ctx.Done(): + return ErrContextCancelled + } +} + +func (f *Ready) Chan() <-chan error { + return f.ch +} + +type Ready1[T any] struct { + ch chan ChanValue1[T] +} + +func NewReady1[T any](val T, err error) *Ready1[T] { + ch := make(chan ChanValue1[T], 1) + ch <- ChanValue1[T]{ + Err: err, + Value: val, + } + close(ch) + + return &Ready1[T]{ + ch: ch, + } +} + +func NewReadyValue1[T any](val T) *Ready1[T] { + return NewReady1[T](val, nil) +} + +func NewReadyError1[T any](err error) *Ready1[T] { + var ret T + return NewReady1[T](ret, err) +} + +func (f *Ready1[T]) IsComplete() bool { + return true +} + +func (f *Ready1[T]) Wait(ctx context.Context) (T, error) { + select { + case cv, ok := <-f.ch: + if !ok { + var ret T + return ret, cv.Err + } + return cv.Value, cv.Err + + case <-ctx.Done(): + var ret T + return ret, ErrContextCancelled + } +} + +func (f *Ready1[T]) Chan() <-chan ChanValue1[T] { + return f.ch +} diff --git a/pkgs/future/set_value_future.go b/pkgs/future/set_value_future.go index bf7f94a..129e544 100644 --- a/pkgs/future/set_value_future.go +++ b/pkgs/future/set_value_future.go @@ -6,72 +6,76 @@ import ( ) type SetValueFuture[T any] struct { - value T - err error isCompleted bool - completeChan chan any + ch chan ChanValue1[T] completeOnce sync.Once } func NewSetValue[T any]() *SetValueFuture[T] { return &SetValueFuture[T]{ - completeChan: make(chan any), + ch: make(chan ChanValue1[T], 1), } } func (f *SetValueFuture[T]) SetComplete(val T, err error) { f.completeOnce.Do(func() { - f.value = val - f.err = err + f.ch <- ChanValue1[T]{ + Err: err, + Value: val, + } + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } func (f *SetValueFuture[T]) SetValue(val T) { f.completeOnce.Do(func() { - f.value = val + f.ch <- ChanValue1[T]{ + Value: val, + } + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } func (f *SetValueFuture[T]) SetError(err error) { f.completeOnce.Do(func() { - f.err = err + f.ch <- ChanValue1[T]{ + Err: err, + } + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } -func (f *SetValueFuture[T]) Error() error { - return f.err -} - -func (f *SetValueFuture[T]) Value() T { - return f.value -} - func (f *SetValueFuture[T]) IsComplete() bool { return f.isCompleted } -// 等待直到Complete或者ctx被取消。 -// 注:返回ErrContextCancelled不代表产生结果的过程没有执行过,甚至不代表Future没有Complete -func (f *SetValueFuture[T]) Wait(ctx context.Context) error { - select { - case <-f.completeChan: - return f.err - - case <-ctx.Done(): - return ErrContextCancelled - } +func (f *SetValueFuture[T]) Chan() <-chan ChanValue1[T] { + return f.ch } -func (f *SetValueFuture[T]) WaitValue(ctx context.Context) (T, error) { +// 等待直到Complete或者ctx被取消。 +// 注:返回ErrContextCancelled不代表产生结果的过程没有执行过,甚至不代表Future没有Complete +//func (f *SetValueFuture[T]) Wait(ctx context.Context) error { +// select { +// case <-f.ch: +// return f.err +// +// case <-ctx.Done(): +// return ErrContextCancelled +// } +//} + +func (f *SetValueFuture[T]) Wait(ctx context.Context) (T, error) { select { - case <-f.completeChan: - return f.value, f.err + case cv, ok := <-f.ch: + if !ok { + var ret T + return ret, cv.Err + } + return cv.Value, cv.Err case <-ctx.Done(): var ret T @@ -80,68 +84,61 @@ func (f *SetValueFuture[T]) WaitValue(ctx context.Context) (T, error) { } type SetValueFuture2[T1 any, T2 any] struct { - value1 T1 - value2 T2 - err error isCompleted bool - completeChan chan any + ch chan ChanValue2[T1, T2] completeOnce sync.Once } func NewSetValue2[T1 any, T2 any]() *SetValueFuture2[T1, T2] { return &SetValueFuture2[T1, T2]{ - completeChan: make(chan any), + ch: make(chan ChanValue2[T1, T2], 1), } } func (f *SetValueFuture2[T1, T2]) SetComplete(val1 T1, val2 T2, err error) { f.completeOnce.Do(func() { - f.value1 = val1 - f.value2 = val2 - f.err = err + f.ch <- ChanValue2[T1, T2]{ + Value1: val1, + Value2: val2, + Err: err, + } + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } func (f *SetValueFuture2[T1, T2]) SetValue(val1 T1, val2 T2) { f.completeOnce.Do(func() { - f.value1 = val1 - f.value2 = val2 + f.ch <- ChanValue2[T1, T2]{ + Value1: val1, + Value2: val2, + } + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } func (f *SetValueFuture2[T1, T2]) SetError(err error) { f.completeOnce.Do(func() { - f.err = err + f.ch <- ChanValue2[T1, T2]{ + Err: err, + } + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } -func (f *SetValueFuture2[T1, T2]) Error() error { - return f.err -} - -func (f *SetValueFuture2[T1, T2]) Value() (T1, T2) { - return f.value1, f.value2 -} - func (f *SetValueFuture2[T1, T2]) IsComplete() bool { return f.isCompleted } -func (f *SetValueFuture2[T1, T2]) Wait() error { - <-f.completeChan - return f.err -} - -func (f *SetValueFuture2[T1, T2]) WaitValue(ctx context.Context) (T1, T2, error) { +func (f *SetValueFuture2[T1, T2]) Wait(ctx context.Context) (T1, T2, error) { select { - case <-f.completeChan: - return f.value1, f.value2, f.err + case cv, ok := <-f.ch: + if !ok { + return cv.Value1, cv.Value2, cv.Err + } + return cv.Value1, cv.Value2, cv.Err case <-ctx.Done(): var ret1 T1 @@ -149,3 +146,7 @@ func (f *SetValueFuture2[T1, T2]) WaitValue(ctx context.Context) (T1, T2, error) return ret1, ret2, ErrContextCancelled } } + +func (f *SetValueFuture2[T1, T2]) Chan() <-chan ChanValue2[T1, T2] { + return f.ch +} diff --git a/pkgs/future/set_void_future.go b/pkgs/future/set_void_future.go index ed96a65..e9f80bd 100644 --- a/pkgs/future/set_void_future.go +++ b/pkgs/future/set_void_future.go @@ -6,47 +6,50 @@ import ( ) type SetVoidFuture struct { - err error isCompleted bool - completeChan chan any + ch chan error completeOnce sync.Once } func NewSetVoid() *SetVoidFuture { return &SetVoidFuture{ - completeChan: make(chan any), + ch: make(chan error, 1), } } func (f *SetVoidFuture) SetVoid() { f.completeOnce.Do(func() { + f.ch <- nil + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } func (f *SetVoidFuture) SetError(err error) { f.completeOnce.Do(func() { - f.err = err + f.ch <- err + close(f.ch) f.isCompleted = true - close(f.completeChan) }) } -func (f *SetVoidFuture) Error() error { - return f.err -} - func (f *SetVoidFuture) IsComplete() bool { return f.isCompleted } func (f *SetVoidFuture) Wait(ctx context.Context) error { select { - case <-f.completeChan: - return f.err + case v, ok := <-f.ch: + if !ok { + return ErrCompleted + } + return v case <-ctx.Done(): return ErrContextCancelled } } + +func (f *SetVoidFuture) Chan() <-chan error { + return f.ch +} diff --git a/sdks/scheduler/models.go b/sdks/scheduler/models.go index 024dc9c..f990dc3 100644 --- a/sdks/scheduler/models.go +++ b/sdks/scheduler/models.go @@ -90,8 +90,8 @@ type UpdateMultiInstanceJobInfo struct { Files JobFilesInfo `json:"files"` Runtime JobRuntimeInfo `json:"runtime"` MultiInstanceJobSetID JobSetID `json:"multiInstanceJobSetID"` - InstanceIDs []JobID `json:"instanceIDs"` - UpdateStrategy string `json:"updateStrategy"` + //InstanceIDs []JobID `json:"instanceIDs"` + UpdateStrategy string `json:"updateStrategy"` } type ModelJobInfo struct {