Browse Source

优化future包实现;增加actor包

pull/1/head
Sydonian 2 years ago
parent
commit
b8808ad672
4 changed files with 154 additions and 9 deletions
  1. +54
    -0
      pkg/actor/actor.go
  2. +13
    -3
      pkg/future/future.go
  3. +36
    -6
      pkg/future/set_value_future.go
  4. +51
    -0
      pkg/future/set_void_future.go

+ 54
- 0
pkg/actor/actor.go View File

@@ -0,0 +1,54 @@
package actor

import "gitlink.org.cn/cloudream/common/pkg/future"

type CommandFn func()

type CommandChannel struct {
cmdChan chan CommandFn
}

func NewCommandChannel() *CommandChannel {
return &CommandChannel{
cmdChan: make(chan CommandFn),
}
}

func (c *CommandChannel) Send(cmd CommandFn) {
c.cmdChan <- cmd
}

func (c *CommandChannel) Receive() (CommandFn, bool) {
cmd, ok := <-c.cmdChan
return cmd, ok
}

func (c *CommandChannel) ChanReceive() <-chan CommandFn {
return c.cmdChan
}

func Wait(c *CommandChannel, cmd func() error) error {
fut := future.NewSetVoid()

c.Send(func() {
err := cmd()
if err != nil {
fut.SetError(err)
} else {
fut.SetVoid()
}
})

return fut.Wait()
}

func WaitValue[T any](c *CommandChannel, cmd func() (T, error)) (T, error) {
fut := future.NewSetValue[T]()

c.Send(func() {
val, err := cmd()
fut.SetComplete(val, err)
})

return fut.WaitValue()
}

+ 13
- 3
pkg/future/future.go View File

@@ -7,9 +7,19 @@ import (

var ErrWaitTimeout = fmt.Errorf("wait timeout")

type Future[T any] interface {
type Future interface {
Error() error
IsComplete() bool

Wait() (T, error)
WaitTimeout(timeout time.Duration) (T, error)
Wait() error
WaitTimeout(timeout time.Duration) error
}

type ValueFuture[T any] interface {
Future

Value() T

WaitValue() (T, error)
WaitValueTimeout(timeout time.Duration) (T, error)
}

+ 36
- 6
pkg/future/set_value_future.go View File

@@ -5,7 +5,7 @@ import (
)

type SetValueFuture[T any] struct {
result T
value T
err error
isCompleted bool
completeChan chan any
@@ -17,8 +17,15 @@ func NewSetValue[T any]() *SetValueFuture[T] {
}
}

func (f *SetValueFuture[T]) SetComplete(val T, err error) {
f.value = val
f.err = err
f.isCompleted = true
close(f.completeChan)
}

func (f *SetValueFuture[T]) SetValue(val T) {
f.result = val
f.value = val
f.isCompleted = true
close(f.completeChan)
}
@@ -29,19 +36,42 @@ func (f *SetValueFuture[T]) SetError(err error) {
close(f.completeChan)
}

func (f *SetValueFuture[T]) Error() error {
return f.err
}

func (f *SetValueFuture[T]) Value() T {
return f.value
}

func (f *SetValueFuture[T]) IsComplete() bool {
return f.isCompleted
}

func (f *SetValueFuture[T]) Wait() (T, error) {
func (f *SetValueFuture[T]) Wait() error {
<-f.completeChan
return f.err
}

func (f *SetValueFuture[T]) WaitTimeout(timeout time.Duration) error {
select {
case <-f.completeChan:
return f.err

case <-time.After(timeout):
return ErrWaitTimeout
}
}

func (f *SetValueFuture[T]) WaitValue() (T, error) {
<-f.completeChan
return f.result, f.err
return f.value, f.err
}

func (f *SetValueFuture[T]) WaitTimeout(timeout time.Duration) (T, error) {
func (f *SetValueFuture[T]) WaitValueTimeout(timeout time.Duration) (T, error) {
select {
case <-f.completeChan:
return f.result, f.err
return f.value, f.err

case <-time.After(timeout):
var ret T


+ 51
- 0
pkg/future/set_void_future.go View File

@@ -0,0 +1,51 @@
package future

import (
"time"
)

type SetVoidFuture struct {
err error
isCompleted bool
completeChan chan any
}

func NewSetVoid() *SetVoidFuture {
return &SetVoidFuture{
completeChan: make(chan any),
}
}

func (f *SetVoidFuture) SetVoid() {
f.isCompleted = true
close(f.completeChan)
}

func (f *SetVoidFuture) SetError(err error) {
f.err = err
f.isCompleted = true
close(f.completeChan)
}

func (f *SetVoidFuture) Error() error {
return f.err
}

func (f *SetVoidFuture) IsComplete() bool {
return f.isCompleted
}

func (f *SetVoidFuture) Wait() error {
<-f.completeChan
return f.err
}

func (f *SetVoidFuture) WaitTimeout(timeout time.Duration) error {
select {
case <-f.completeChan:
return f.err

case <-time.After(timeout):
return ErrWaitTimeout
}
}

Loading…
Cancel
Save