diff --git a/utils/sync2/counter_cond.go b/utils/sync2/counter_cond.go index 8da29e2..03bc8b6 100644 --- a/utils/sync2/counter_cond.go +++ b/utils/sync2/counter_cond.go @@ -32,7 +32,9 @@ func (c *CounterCond) Wait() bool { } func (c *CounterCond) Release() { + c.cond.L.Lock() c.count++ + c.cond.L.Unlock() c.cond.Signal() } diff --git a/utils/sync2/unbound_channel.go b/utils/sync2/unbound_channel.go new file mode 100644 index 0000000..ff051a4 --- /dev/null +++ b/utils/sync2/unbound_channel.go @@ -0,0 +1,74 @@ +package sync2 + +import ( + "container/list" + "sync" +) + +type UnboundChannel[T any] struct { + values *list.List + cond *sync.Cond + err error +} + +func NewUnboundChannel[T any]() *UnboundChannel[T] { + return &UnboundChannel[T]{ + values: list.New(), + cond: sync.NewCond(&sync.Mutex{}), + } +} + +func (c *UnboundChannel[T]) Error() error { + return c.err +} + +func (c *UnboundChannel[T]) Send(val T) error { + c.cond.L.Lock() + if c.err != nil { + c.cond.L.Unlock() + return c.err + } + c.values.PushBack(val) + c.cond.L.Unlock() + + c.cond.Signal() + return nil +} + +func (c *UnboundChannel[T]) Receive() (T, error) { + c.cond.L.Lock() + defer c.cond.L.Unlock() + + if c.values.Len() == 0 { + c.cond.Wait() + } + + if c.values.Len() == 0 { + var ret T + return ret, c.err + } + + ret := c.values.Front().Value.(T) + c.values.Remove(c.values.Front()) + return ret, nil +} + +func (c *UnboundChannel[T]) Close() { + c.cond.L.Lock() + if c.err != nil { + return + } + c.err = ErrChannelClosed + c.cond.L.Unlock() + c.cond.Broadcast() +} + +func (c *UnboundChannel[T]) CloseWithError(err error) { + c.cond.L.Lock() + if c.err != nil { + return + } + c.err = err + c.cond.L.Unlock() + c.cond.Broadcast() +} diff --git a/utils/time2/duration.go b/utils/time2/duration.go new file mode 100644 index 0000000..ebe0425 --- /dev/null +++ b/utils/time2/duration.go @@ -0,0 +1,28 @@ +package time2 + +import ( + "fmt" + "time" +) + +type Duration struct { + time.Duration +} + +func (d *Duration) Std() time.Duration { + return d.Duration +} + +func (d *Duration) Scan(state fmt.ScanState, verb rune) error { + data, err := state.Token(true, nil) + if err != nil { + return err + } + + d.Duration, err = time.ParseDuration(string(data)) + if err != nil { + return err + } + + return nil +} diff --git a/utils/time2/time2_test.go b/utils/time2/time2_test.go new file mode 100644 index 0000000..2dd0202 --- /dev/null +++ b/utils/time2/time2_test.go @@ -0,0 +1,25 @@ +package time2 + +import ( + "fmt" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" +) + +func Test_Duration(t *testing.T) { + Convey("从字符串解析", t, func() { + dur := Duration{} + _, err := fmt.Sscanf("10s", "%v", &dur) + So(err, ShouldEqual, nil) + So(dur.Std(), ShouldEqual, 10*time.Second) + }) + + Convey("包含空白字符", t, func() { + dur := Duration{} + _, err := fmt.Sscanf(" 10s\t\n\r", "%v", &dur) + So(err, ShouldEqual, nil) + So(dur.Std(), ShouldEqual, 10*time.Second) + }) +}