diff --git a/pkgs/tickevent/executor.go b/pkgs/tickevent/executor.go index 0bb0a22..f960404 100644 --- a/pkgs/tickevent/executor.go +++ b/pkgs/tickevent/executor.go @@ -20,7 +20,7 @@ type EventTicker[TArgs any] struct { event TickEvent[TArgs] intervalMs int doneChan chan int - done atomic.Bool + done *atomic.Bool } type Executor[TArgs any] struct { @@ -43,7 +43,7 @@ func (e *Executor[TArgs]) Start(event TickEvent[TArgs], intervalMs int, opts ... event: event, intervalMs: intervalMs, doneChan: make(chan int), - done: atomic.Bool{}, + done: &atomic.Bool{}, } ticker.done.Store(false) diff --git a/utils/io2/stats.go b/utils/io2/stats.go index b6c7956..cdc6ec5 100644 --- a/utils/io2/stats.go +++ b/utils/io2/stats.go @@ -2,21 +2,52 @@ package io2 import "io" -type Counter struct { +type counter struct { inner io.Reader count int64 } -func (c *Counter) Read(buf []byte) (n int, err error) { +func (c *counter) Read(buf []byte) (n int, err error) { n, err = c.inner.Read(buf) c.count += int64(n) return } -func (c *Counter) Count() int64 { +func (c *counter) Count() int64 { return c.count } -func NewCounter(inner io.Reader) *Counter { - return &Counter{inner: inner, count: 0} +func Counter(inner io.Reader) *counter { + return &counter{inner: inner, count: 0} +} + +type counterCloser struct { + inner io.ReadCloser + count int64 + callback func(cnt int64, err error) +} + +func (c *counterCloser) Read(buf []byte) (n int, err error) { + n, err = c.inner.Read(buf) + c.count += int64(n) + if err != nil && c.callback != nil { + c.callback(c.count, err) + c.callback = nil + } + return +} + +func (c *counterCloser) Close() error { + // 读取方主动Close,视为正常结束 + err := c.inner.Close() + if c.callback != nil { + c.callback(c.count, nil) + } + return err +} + +// 统计一个io.ReadCloser的读取字节数,在读取结束后调用callback函数。 +// 仅在读取方主动调用Close时,callback的err参数才会为nil。 +func CounterCloser(inner io.ReadCloser, callback func(cnt int64, err error)) io.ReadCloser { + return &counterCloser{inner: inner, count: 0, callback: callback} }