You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

io.go 4.3 kB

1 year ago
2 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. package io2
  2. import (
  3. "io"
  4. "sync"
  5. )
  6. type PromiseWriteCloser[T any] interface {
  7. io.Writer
  8. Abort(err error) // 中断发送文件
  9. Finish() (T, error) // 发送文件完成,等待返回结果
  10. }
  11. func WriteAll(writer io.Writer, data []byte) error {
  12. pos := 0
  13. dataLen := len(data)
  14. for pos < dataLen {
  15. writeLen, err := writer.Write(data[pos:])
  16. if err != nil {
  17. return err
  18. }
  19. pos += writeLen
  20. }
  21. return nil
  22. }
  23. const (
  24. onceDisabled = 0
  25. onceEnabled = 1
  26. onceDone = 2
  27. )
  28. type readCloserHook struct {
  29. readCloser io.ReadCloser
  30. callback func(closer io.ReadCloser)
  31. once int
  32. isBefore bool // callback调用时机,true则在closer的Close之前调用
  33. lock *sync.Mutex
  34. }
  35. func (hook *readCloserHook) Read(buf []byte) (n int, err error) {
  36. return hook.readCloser.Read(buf)
  37. }
  38. func (hook *readCloserHook) Close() error {
  39. hook.lock.Lock()
  40. defer hook.lock.Unlock()
  41. if hook.once == onceDone {
  42. return hook.readCloser.Close()
  43. }
  44. if hook.isBefore {
  45. hook.callback(hook.readCloser)
  46. }
  47. err := hook.readCloser.Close()
  48. if !hook.isBefore {
  49. hook.callback(hook.readCloser)
  50. }
  51. if hook.once == onceEnabled {
  52. hook.once = onceDone
  53. }
  54. return err
  55. }
  56. func BeforeReadClosing(closer io.ReadCloser, callback func(closer io.ReadCloser)) io.ReadCloser {
  57. return &readCloserHook{
  58. readCloser: closer,
  59. callback: callback,
  60. once: onceDisabled,
  61. isBefore: true,
  62. lock: &sync.Mutex{},
  63. }
  64. }
  65. func AfterReadClosed(closer io.ReadCloser, callback func(closer io.ReadCloser)) io.ReadCloser {
  66. return &readCloserHook{
  67. readCloser: closer,
  68. callback: callback,
  69. once: onceDisabled,
  70. isBefore: false,
  71. lock: &sync.Mutex{},
  72. }
  73. }
  74. func AfterReadClosedOnce(closer io.ReadCloser, callback func(closer io.ReadCloser)) io.ReadCloser {
  75. return &readCloserHook{
  76. readCloser: closer,
  77. callback: callback,
  78. once: onceEnabled,
  79. isBefore: false,
  80. lock: &sync.Mutex{},
  81. }
  82. }
  83. type afterEOF struct {
  84. inner io.ReadCloser
  85. callback func(str io.ReadCloser, err error)
  86. lock *sync.Mutex
  87. }
  88. func (hook *afterEOF) Read(buf []byte) (n int, err error) {
  89. n, err = hook.inner.Read(buf)
  90. hook.lock.Lock()
  91. defer hook.lock.Unlock()
  92. if hook.callback != nil {
  93. if err == io.EOF {
  94. hook.callback(hook.inner, nil)
  95. hook.callback = nil
  96. } else if err != nil {
  97. hook.callback(hook.inner, err)
  98. hook.callback = nil
  99. }
  100. }
  101. return n, err
  102. }
  103. func (hook *afterEOF) Close() error {
  104. err := hook.inner.Close()
  105. hook.lock.Lock()
  106. defer hook.lock.Unlock()
  107. if hook.callback != nil {
  108. hook.callback(hook.inner, io.ErrClosedPipe)
  109. hook.callback = nil
  110. }
  111. return err
  112. }
  113. func AfterEOF(str io.ReadCloser, callback func(str io.ReadCloser, err error)) io.ReadCloser {
  114. return &afterEOF{
  115. inner: str,
  116. callback: callback,
  117. lock: &sync.Mutex{},
  118. }
  119. }
  120. type readerWithCloser struct {
  121. reader io.Reader
  122. closer func(reader io.Reader) error
  123. }
  124. func (hook *readerWithCloser) Read(buf []byte) (n int, err error) {
  125. return hook.reader.Read(buf)
  126. }
  127. func (c *readerWithCloser) Close() error {
  128. return c.closer(c.reader)
  129. }
  130. func WithCloser(reader io.Reader, closer func(reader io.Reader) error) io.ReadCloser {
  131. return &readerWithCloser{
  132. reader: reader,
  133. closer: closer,
  134. }
  135. }
  136. type LazyReadCloser struct {
  137. open func() (io.ReadCloser, error)
  138. stream io.ReadCloser
  139. }
  140. func (r *LazyReadCloser) Read(buf []byte) (n int, err error) {
  141. if r.stream == nil {
  142. var err error
  143. r.stream, err = r.open()
  144. if err != nil {
  145. return 0, err
  146. }
  147. }
  148. return r.stream.Read(buf)
  149. }
  150. func (r *LazyReadCloser) Close() error {
  151. if r.stream == nil {
  152. return nil
  153. }
  154. return r.stream.Close()
  155. }
  156. func Lazy(open func() (io.ReadCloser, error)) *LazyReadCloser {
  157. return &LazyReadCloser{
  158. open: open,
  159. }
  160. }
  161. func ToReaders(strs []io.ReadCloser) ([]io.Reader, func()) {
  162. var readers []io.Reader
  163. for _, s := range strs {
  164. readers = append(readers, s)
  165. }
  166. return readers, func() {
  167. for _, s := range strs {
  168. s.Close()
  169. }
  170. }
  171. }
  172. func DropWithBuf(str io.Reader, buf []byte) {
  173. for {
  174. _, err := str.Read(buf)
  175. if err != nil {
  176. break
  177. }
  178. }
  179. }
  180. func ReadMost(str io.Reader, n int) ([]byte, error) {
  181. buf := make([]byte, n)
  182. n, err := io.ReadFull(str, buf)
  183. if err == nil {
  184. return buf, nil
  185. }
  186. if err == io.EOF || err == io.ErrUnexpectedEOF {
  187. return buf[:n], nil
  188. }
  189. return buf[:n], err
  190. }