You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

channel.go 1.5 kB

1 year ago
1 year ago
1 year ago
1 year ago
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package sync2
  2. import (
  3. "context"
  4. "errors"
  5. "sync"
  6. )
  7. var ErrChannelClosed = errors.New("channel is closed")
  8. type Channel[T any] struct {
  9. ch chan T
  10. closed chan any
  11. closeOnce sync.Once
  12. err error
  13. }
  14. func NewChannel[T any]() *Channel[T] {
  15. return &Channel[T]{
  16. ch: make(chan T),
  17. closed: make(chan any),
  18. }
  19. }
  20. func (c *Channel[T]) Error() error {
  21. return c.err
  22. }
  23. func (c *Channel[T]) Send(val T) error {
  24. select {
  25. case c.ch <- val:
  26. return nil
  27. case <-c.closed:
  28. return c.err
  29. }
  30. }
  31. func (c *Channel[T]) Receive(ctx context.Context) (T, error) {
  32. select {
  33. case val := <-c.ch:
  34. return val, nil
  35. case <-c.closed:
  36. var t T
  37. return t, c.err
  38. case <-ctx.Done():
  39. var t T
  40. return t, ctx.Err()
  41. }
  42. }
  43. // 获取channel的发送端,需要与Closed一起使用,防止错过关闭信号
  44. func (c *Channel[T]) Sender() chan<- T {
  45. return c.ch
  46. }
  47. // 获取channel的接收端,需要与Closed一起使用,防止错过关闭信号
  48. func (c *Channel[T]) Receiver() <-chan T {
  49. return c.ch
  50. }
  51. // 获取channel的关闭信号,用于通知接收端和发送端关闭
  52. func (c *Channel[T]) Closed() <-chan any {
  53. return c.closed
  54. }
  55. // 关闭channel。注:此操作不会关闭Sender和Receiver返回的channel
  56. func (c *Channel[T]) Close() {
  57. c.closeOnce.Do(func() {
  58. close(c.closed)
  59. c.err = ErrChannelClosed
  60. })
  61. }
  62. // 关闭channel并设置error。注:此操作不会关闭Sender和Receiver返回的channel
  63. func (c *Channel[T]) CloseWithError(err error) {
  64. c.closeOnce.Do(func() {
  65. close(c.closed)
  66. c.err = err
  67. })
  68. }