You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

unbound_channel.go 1.2 kB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package sync2
  2. import (
  3. "container/list"
  4. "sync"
  5. )
  6. type UnboundChannel[T any] struct {
  7. values *list.List
  8. cond *sync.Cond
  9. err error
  10. }
  11. func NewUnboundChannel[T any]() *UnboundChannel[T] {
  12. return &UnboundChannel[T]{
  13. values: list.New(),
  14. cond: sync.NewCond(&sync.Mutex{}),
  15. }
  16. }
  17. func (c *UnboundChannel[T]) Error() error {
  18. return c.err
  19. }
  20. func (c *UnboundChannel[T]) Send(val T) error {
  21. c.cond.L.Lock()
  22. if c.err != nil {
  23. c.cond.L.Unlock()
  24. return c.err
  25. }
  26. c.values.PushBack(val)
  27. c.cond.L.Unlock()
  28. c.cond.Signal()
  29. return nil
  30. }
  31. func (c *UnboundChannel[T]) Receive() (T, error) {
  32. c.cond.L.Lock()
  33. defer c.cond.L.Unlock()
  34. if c.values.Len() == 0 {
  35. c.cond.Wait()
  36. }
  37. if c.values.Len() == 0 {
  38. var ret T
  39. return ret, c.err
  40. }
  41. ret := c.values.Front().Value.(T)
  42. c.values.Remove(c.values.Front())
  43. return ret, nil
  44. }
  45. func (c *UnboundChannel[T]) Close() {
  46. c.cond.L.Lock()
  47. if c.err != nil {
  48. return
  49. }
  50. c.err = ErrChannelClosed
  51. c.cond.L.Unlock()
  52. c.cond.Broadcast()
  53. }
  54. func (c *UnboundChannel[T]) CloseWithError(err error) {
  55. c.cond.L.Lock()
  56. if c.err != nil {
  57. return
  58. }
  59. c.err = err
  60. c.cond.L.Unlock()
  61. c.cond.Broadcast()
  62. }