From b8808ad672f49612944aad8ada69de9ab1f5eecb Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 9 Jun 2023 16:08:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96future=E5=8C=85=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=EF=BC=9B=E5=A2=9E=E5=8A=A0actor=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/actor/actor.go | 54 ++++++++++++++++++++++++++++++++++ pkg/future/future.go | 16 ++++++++-- pkg/future/set_value_future.go | 42 ++++++++++++++++++++++---- pkg/future/set_void_future.go | 51 ++++++++++++++++++++++++++++++++ 4 files changed, 154 insertions(+), 9 deletions(-) create mode 100644 pkg/actor/actor.go create mode 100644 pkg/future/set_void_future.go diff --git a/pkg/actor/actor.go b/pkg/actor/actor.go new file mode 100644 index 0000000..8c6c296 --- /dev/null +++ b/pkg/actor/actor.go @@ -0,0 +1,54 @@ +package actor + +import "gitlink.org.cn/cloudream/common/pkg/future" + +type CommandFn func() + +type CommandChannel struct { + cmdChan chan CommandFn +} + +func NewCommandChannel() *CommandChannel { + return &CommandChannel{ + cmdChan: make(chan CommandFn), + } +} + +func (c *CommandChannel) Send(cmd CommandFn) { + c.cmdChan <- cmd +} + +func (c *CommandChannel) Receive() (CommandFn, bool) { + cmd, ok := <-c.cmdChan + return cmd, ok +} + +func (c *CommandChannel) ChanReceive() <-chan CommandFn { + return c.cmdChan +} + +func Wait(c *CommandChannel, cmd func() error) error { + fut := future.NewSetVoid() + + c.Send(func() { + err := cmd() + if err != nil { + fut.SetError(err) + } else { + fut.SetVoid() + } + }) + + return fut.Wait() +} + +func WaitValue[T any](c *CommandChannel, cmd func() (T, error)) (T, error) { + fut := future.NewSetValue[T]() + + c.Send(func() { + val, err := cmd() + fut.SetComplete(val, err) + }) + + return fut.WaitValue() +} diff --git a/pkg/future/future.go b/pkg/future/future.go index 662e89d..4602e3a 100644 --- a/pkg/future/future.go +++ b/pkg/future/future.go @@ -7,9 +7,19 @@ import ( var ErrWaitTimeout = fmt.Errorf("wait timeout") -type Future[T any] interface { +type Future interface { + Error() error IsComplete() bool - Wait() (T, error) - WaitTimeout(timeout time.Duration) (T, error) + Wait() error + WaitTimeout(timeout time.Duration) error +} + +type ValueFuture[T any] interface { + Future + + Value() T + + WaitValue() (T, error) + WaitValueTimeout(timeout time.Duration) (T, error) } diff --git a/pkg/future/set_value_future.go b/pkg/future/set_value_future.go index eada4ed..e438d29 100644 --- a/pkg/future/set_value_future.go +++ b/pkg/future/set_value_future.go @@ -5,7 +5,7 @@ import ( ) type SetValueFuture[T any] struct { - result T + value T err error isCompleted bool completeChan chan any @@ -17,8 +17,15 @@ func NewSetValue[T any]() *SetValueFuture[T] { } } +func (f *SetValueFuture[T]) SetComplete(val T, err error) { + f.value = val + f.err = err + f.isCompleted = true + close(f.completeChan) +} + func (f *SetValueFuture[T]) SetValue(val T) { - f.result = val + f.value = val f.isCompleted = true close(f.completeChan) } @@ -29,19 +36,42 @@ func (f *SetValueFuture[T]) SetError(err error) { close(f.completeChan) } +func (f *SetValueFuture[T]) Error() error { + return f.err +} + +func (f *SetValueFuture[T]) Value() T { + return f.value +} + func (f *SetValueFuture[T]) IsComplete() bool { return f.isCompleted } -func (f *SetValueFuture[T]) Wait() (T, error) { +func (f *SetValueFuture[T]) Wait() error { + <-f.completeChan + return f.err +} + +func (f *SetValueFuture[T]) WaitTimeout(timeout time.Duration) error { + select { + case <-f.completeChan: + return f.err + + case <-time.After(timeout): + return ErrWaitTimeout + } +} + +func (f *SetValueFuture[T]) WaitValue() (T, error) { <-f.completeChan - return f.result, f.err + return f.value, f.err } -func (f *SetValueFuture[T]) WaitTimeout(timeout time.Duration) (T, error) { +func (f *SetValueFuture[T]) WaitValueTimeout(timeout time.Duration) (T, error) { select { case <-f.completeChan: - return f.result, f.err + return f.value, f.err case <-time.After(timeout): var ret T diff --git a/pkg/future/set_void_future.go b/pkg/future/set_void_future.go new file mode 100644 index 0000000..ff3fb7a --- /dev/null +++ b/pkg/future/set_void_future.go @@ -0,0 +1,51 @@ +package future + +import ( + "time" +) + +type SetVoidFuture struct { + err error + isCompleted bool + completeChan chan any +} + +func NewSetVoid() *SetVoidFuture { + return &SetVoidFuture{ + completeChan: make(chan any), + } +} + +func (f *SetVoidFuture) SetVoid() { + f.isCompleted = true + close(f.completeChan) +} + +func (f *SetVoidFuture) SetError(err error) { + f.err = err + f.isCompleted = true + close(f.completeChan) +} + +func (f *SetVoidFuture) Error() error { + return f.err +} + +func (f *SetVoidFuture) IsComplete() bool { + return f.isCompleted +} + +func (f *SetVoidFuture) Wait() error { + <-f.completeChan + return f.err +} + +func (f *SetVoidFuture) WaitTimeout(timeout time.Duration) error { + select { + case <-f.completeChan: + return f.err + + case <-time.After(timeout): + return ErrWaitTimeout + } +}