|
- package sync2
-
- import (
- "context"
- "errors"
- "sync"
- )
-
- var ErrChannelClosed = errors.New("channel is closed")
-
- type Channel[T any] struct {
- ch chan T
- closed chan any
- closeOnce sync.Once
- err error
- }
-
- func NewChannel[T any]() *Channel[T] {
- return &Channel[T]{
- ch: make(chan T),
- closed: make(chan any),
- }
- }
-
- func (c *Channel[T]) Error() error {
- return c.err
- }
-
- func (c *Channel[T]) Send(val T) error {
- select {
- case c.ch <- val:
- return nil
- case <-c.closed:
- return c.err
- }
- }
-
- func (c *Channel[T]) Receive(ctx context.Context) (T, error) {
- select {
- case val := <-c.ch:
- return val, nil
- case <-c.closed:
- var t T
- return t, c.err
- case <-ctx.Done():
- var t T
- return t, ctx.Err()
- }
- }
-
- // 获取channel的发送端,需要与Closed一起使用,防止错过关闭信号
- func (c *Channel[T]) Sender() chan<- T {
- return c.ch
- }
-
- // 获取channel的接收端,需要与Closed一起使用,防止错过关闭信号
- func (c *Channel[T]) Receiver() <-chan T {
- return c.ch
- }
-
- // 获取channel的关闭信号,用于通知接收端和发送端关闭
- func (c *Channel[T]) Closed() <-chan any {
- return c.closed
- }
-
- // 关闭channel。注:此操作不会关闭Sender和Receiver返回的channel
- func (c *Channel[T]) Close() {
- c.closeOnce.Do(func() {
- close(c.closed)
- c.err = ErrChannelClosed
- })
- }
-
- // 关闭channel并设置error。注:此操作不会关闭Sender和Receiver返回的channel
- func (c *Channel[T]) CloseWithError(err error) {
- c.closeOnce.Do(func() {
- close(c.closed)
- c.err = err
- })
- }
|