You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

workerpool.go 8.3 kB

Graceful Queues: Issue Indexing and Tasks (#9363) * Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "sync"
  8. "time"
  9. "code.gitea.io/gitea/modules/log"
  10. )
  11. // WorkerPool takes
  12. type WorkerPool struct {
  13. lock sync.Mutex
  14. baseCtx context.Context
  15. cancel context.CancelFunc
  16. cond *sync.Cond
  17. qid int64
  18. maxNumberOfWorkers int
  19. numberOfWorkers int
  20. batchLength int
  21. handle HandlerFunc
  22. dataChan chan Data
  23. blockTimeout time.Duration
  24. boostTimeout time.Duration
  25. boostWorkers int
  26. }
  27. // Push pushes the data to the internal channel
  28. func (p *WorkerPool) Push(data Data) {
  29. p.lock.Lock()
  30. if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
  31. p.lock.Unlock()
  32. p.pushBoost(data)
  33. } else {
  34. p.lock.Unlock()
  35. p.dataChan <- data
  36. }
  37. }
  38. func (p *WorkerPool) pushBoost(data Data) {
  39. select {
  40. case p.dataChan <- data:
  41. default:
  42. p.lock.Lock()
  43. if p.blockTimeout <= 0 {
  44. p.lock.Unlock()
  45. p.dataChan <- data
  46. return
  47. }
  48. ourTimeout := p.blockTimeout
  49. timer := time.NewTimer(p.blockTimeout)
  50. p.lock.Unlock()
  51. select {
  52. case p.dataChan <- data:
  53. if timer.Stop() {
  54. select {
  55. case <-timer.C:
  56. default:
  57. }
  58. }
  59. case <-timer.C:
  60. p.lock.Lock()
  61. if p.blockTimeout > ourTimeout || (p.numberOfWorkers > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0) {
  62. p.lock.Unlock()
  63. p.dataChan <- data
  64. return
  65. }
  66. p.blockTimeout *= 2
  67. ctx, cancel := context.WithCancel(p.baseCtx)
  68. mq := GetManager().GetManagedQueue(p.qid)
  69. boost := p.boostWorkers
  70. if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 {
  71. boost = p.maxNumberOfWorkers - p.numberOfWorkers
  72. }
  73. if mq != nil {
  74. log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout)
  75. start := time.Now()
  76. pid := mq.RegisterWorkers(boost, start, false, start, cancel)
  77. go func() {
  78. <-ctx.Done()
  79. mq.RemoveWorkers(pid)
  80. cancel()
  81. }()
  82. } else {
  83. log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout)
  84. }
  85. go func() {
  86. <-time.After(p.boostTimeout)
  87. cancel()
  88. p.lock.Lock()
  89. p.blockTimeout /= 2
  90. p.lock.Unlock()
  91. }()
  92. p.addWorkers(ctx, boost)
  93. p.lock.Unlock()
  94. p.dataChan <- data
  95. }
  96. }
  97. }
  98. // NumberOfWorkers returns the number of current workers in the pool
  99. func (p *WorkerPool) NumberOfWorkers() int {
  100. p.lock.Lock()
  101. defer p.lock.Unlock()
  102. return p.numberOfWorkers
  103. }
  104. // MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool
  105. func (p *WorkerPool) MaxNumberOfWorkers() int {
  106. p.lock.Lock()
  107. defer p.lock.Unlock()
  108. return p.maxNumberOfWorkers
  109. }
  110. // BoostWorkers returns the number of workers for a boost
  111. func (p *WorkerPool) BoostWorkers() int {
  112. p.lock.Lock()
  113. defer p.lock.Unlock()
  114. return p.boostWorkers
  115. }
  116. // BoostTimeout returns the timeout of the next boost
  117. func (p *WorkerPool) BoostTimeout() time.Duration {
  118. p.lock.Lock()
  119. defer p.lock.Unlock()
  120. return p.boostTimeout
  121. }
  122. // BlockTimeout returns the timeout til the next boost
  123. func (p *WorkerPool) BlockTimeout() time.Duration {
  124. p.lock.Lock()
  125. defer p.lock.Unlock()
  126. return p.blockTimeout
  127. }
  128. // SetSettings sets the setable boost values
  129. func (p *WorkerPool) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
  130. p.lock.Lock()
  131. defer p.lock.Unlock()
  132. p.maxNumberOfWorkers = maxNumberOfWorkers
  133. p.boostWorkers = boostWorkers
  134. p.boostTimeout = timeout
  135. }
  136. // SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool
  137. // Changing this number will not change the number of current workers but will change the limit
  138. // for future additions
  139. func (p *WorkerPool) SetMaxNumberOfWorkers(newMax int) {
  140. p.lock.Lock()
  141. defer p.lock.Unlock()
  142. p.maxNumberOfWorkers = newMax
  143. }
  144. // AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
  145. func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
  146. var ctx context.Context
  147. var cancel context.CancelFunc
  148. start := time.Now()
  149. end := start
  150. hasTimeout := false
  151. if timeout > 0 {
  152. ctx, cancel = context.WithTimeout(p.baseCtx, timeout)
  153. end = start.Add(timeout)
  154. hasTimeout = true
  155. } else {
  156. ctx, cancel = context.WithCancel(p.baseCtx)
  157. }
  158. mq := GetManager().GetManagedQueue(p.qid)
  159. if mq != nil {
  160. pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel)
  161. go func() {
  162. <-ctx.Done()
  163. mq.RemoveWorkers(pid)
  164. cancel()
  165. }()
  166. log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid)
  167. } else {
  168. log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number)
  169. }
  170. p.addWorkers(ctx, number)
  171. return cancel
  172. }
  173. // addWorkers adds workers to the pool
  174. func (p *WorkerPool) addWorkers(ctx context.Context, number int) {
  175. for i := 0; i < number; i++ {
  176. p.lock.Lock()
  177. if p.cond == nil {
  178. p.cond = sync.NewCond(&p.lock)
  179. }
  180. p.numberOfWorkers++
  181. p.lock.Unlock()
  182. go func() {
  183. p.doWork(ctx)
  184. p.lock.Lock()
  185. p.numberOfWorkers--
  186. if p.numberOfWorkers == 0 {
  187. p.cond.Broadcast()
  188. } else if p.numberOfWorkers < 0 {
  189. // numberOfWorkers can't go negative but...
  190. log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid)
  191. p.numberOfWorkers = 0
  192. p.cond.Broadcast()
  193. }
  194. p.lock.Unlock()
  195. }()
  196. }
  197. }
  198. // Wait for WorkerPool to finish
  199. func (p *WorkerPool) Wait() {
  200. p.lock.Lock()
  201. defer p.lock.Unlock()
  202. if p.cond == nil {
  203. p.cond = sync.NewCond(&p.lock)
  204. }
  205. if p.numberOfWorkers <= 0 {
  206. return
  207. }
  208. p.cond.Wait()
  209. }
  210. // CleanUp will drain the remaining contents of the channel
  211. // This should be called after AddWorkers context is closed
  212. func (p *WorkerPool) CleanUp(ctx context.Context) {
  213. log.Trace("WorkerPool: %d CleanUp", p.qid)
  214. close(p.dataChan)
  215. for data := range p.dataChan {
  216. p.handle(data)
  217. select {
  218. case <-ctx.Done():
  219. log.Warn("WorkerPool: %d Cleanup context closed before finishing clean-up", p.qid)
  220. return
  221. default:
  222. }
  223. }
  224. log.Trace("WorkerPool: %d CleanUp Done", p.qid)
  225. }
  226. func (p *WorkerPool) doWork(ctx context.Context) {
  227. delay := time.Millisecond * 300
  228. var data = make([]Data, 0, p.batchLength)
  229. for {
  230. select {
  231. case <-ctx.Done():
  232. if len(data) > 0 {
  233. log.Trace("Handling: %d data, %v", len(data), data)
  234. p.handle(data...)
  235. }
  236. log.Trace("Worker shutting down")
  237. return
  238. case datum, ok := <-p.dataChan:
  239. if !ok {
  240. // the dataChan has been closed - we should finish up:
  241. if len(data) > 0 {
  242. log.Trace("Handling: %d data, %v", len(data), data)
  243. p.handle(data...)
  244. }
  245. log.Trace("Worker shutting down")
  246. return
  247. }
  248. data = append(data, datum)
  249. if len(data) >= p.batchLength {
  250. log.Trace("Handling: %d data, %v", len(data), data)
  251. p.handle(data...)
  252. data = make([]Data, 0, p.batchLength)
  253. }
  254. default:
  255. timer := time.NewTimer(delay)
  256. select {
  257. case <-ctx.Done():
  258. if timer.Stop() {
  259. select {
  260. case <-timer.C:
  261. default:
  262. }
  263. }
  264. if len(data) > 0 {
  265. log.Trace("Handling: %d data, %v", len(data), data)
  266. p.handle(data...)
  267. }
  268. log.Trace("Worker shutting down")
  269. return
  270. case datum, ok := <-p.dataChan:
  271. if timer.Stop() {
  272. select {
  273. case <-timer.C:
  274. default:
  275. }
  276. }
  277. if !ok {
  278. // the dataChan has been closed - we should finish up:
  279. if len(data) > 0 {
  280. log.Trace("Handling: %d data, %v", len(data), data)
  281. p.handle(data...)
  282. }
  283. log.Trace("Worker shutting down")
  284. return
  285. }
  286. data = append(data, datum)
  287. if len(data) >= p.batchLength {
  288. log.Trace("Handling: %d data, %v", len(data), data)
  289. p.handle(data...)
  290. data = make([]Data, 0, p.batchLength)
  291. }
  292. case <-timer.C:
  293. delay = time.Millisecond * 100
  294. if len(data) > 0 {
  295. log.Trace("Handling: %d data, %v", len(data), data)
  296. p.handle(data...)
  297. data = make([]Data, 0, p.batchLength)
  298. }
  299. }
  300. }
  301. }
  302. }