You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_disk_channel.go 5.8 kB

Graceful Queues: Issue Indexing and Tasks (#9363) * Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "time"
  8. "code.gitea.io/gitea/modules/log"
  9. )
  10. // PersistableChannelQueueType is the type for persistable queue
  11. const PersistableChannelQueueType Type = "persistable-channel"
  12. // PersistableChannelQueueConfiguration is the configuration for a PersistableChannelQueue
  13. type PersistableChannelQueueConfiguration struct {
  14. Name string
  15. DataDir string
  16. BatchLength int
  17. QueueLength int
  18. Timeout time.Duration
  19. MaxAttempts int
  20. Workers int
  21. MaxWorkers int
  22. BlockTimeout time.Duration
  23. BoostTimeout time.Duration
  24. BoostWorkers int
  25. }
  26. // PersistableChannelQueue wraps a channel queue and level queue together
  27. type PersistableChannelQueue struct {
  28. *ChannelQueue
  29. delayedStarter
  30. closed chan struct{}
  31. }
  32. // NewPersistableChannelQueue creates a wrapped batched channel queue with persistable level queue backend when shutting down
  33. // This differs from a wrapped queue in that the persistent queue is only used to persist at shutdown/terminate
  34. func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  35. configInterface, err := toConfig(PersistableChannelQueueConfiguration{}, cfg)
  36. if err != nil {
  37. return nil, err
  38. }
  39. config := configInterface.(PersistableChannelQueueConfiguration)
  40. channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
  41. QueueLength: config.QueueLength,
  42. BatchLength: config.BatchLength,
  43. Workers: config.Workers,
  44. MaxWorkers: config.MaxWorkers,
  45. BlockTimeout: config.BlockTimeout,
  46. BoostTimeout: config.BoostTimeout,
  47. BoostWorkers: config.BoostWorkers,
  48. Name: config.Name + "-channel",
  49. }, exemplar)
  50. if err != nil {
  51. return nil, err
  52. }
  53. // the level backend only needs temporary workers to catch up with the previously dropped work
  54. levelCfg := LevelQueueConfiguration{
  55. DataDir: config.DataDir,
  56. QueueLength: config.QueueLength,
  57. BatchLength: config.BatchLength,
  58. Workers: 1,
  59. MaxWorkers: 6,
  60. BlockTimeout: 1 * time.Second,
  61. BoostTimeout: 5 * time.Minute,
  62. BoostWorkers: 5,
  63. Name: config.Name + "-level",
  64. }
  65. levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
  66. if err == nil {
  67. queue := &PersistableChannelQueue{
  68. ChannelQueue: channelQueue.(*ChannelQueue),
  69. delayedStarter: delayedStarter{
  70. internal: levelQueue.(*LevelQueue),
  71. name: config.Name,
  72. },
  73. closed: make(chan struct{}),
  74. }
  75. _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
  76. return queue, nil
  77. }
  78. if IsErrInvalidConfiguration(err) {
  79. // Retrying ain't gonna make this any better...
  80. return nil, ErrInvalidConfiguration{cfg: cfg}
  81. }
  82. queue := &PersistableChannelQueue{
  83. ChannelQueue: channelQueue.(*ChannelQueue),
  84. delayedStarter: delayedStarter{
  85. cfg: levelCfg,
  86. underlying: LevelQueueType,
  87. timeout: config.Timeout,
  88. maxAttempts: config.MaxAttempts,
  89. name: config.Name,
  90. },
  91. closed: make(chan struct{}),
  92. }
  93. _ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
  94. return queue, nil
  95. }
  96. // Name returns the name of this queue
  97. func (p *PersistableChannelQueue) Name() string {
  98. return p.delayedStarter.name
  99. }
  100. // Push will push the indexer data to queue
  101. func (p *PersistableChannelQueue) Push(data Data) error {
  102. select {
  103. case <-p.closed:
  104. return p.internal.Push(data)
  105. default:
  106. return p.ChannelQueue.Push(data)
  107. }
  108. }
  109. // Run starts to run the queue
  110. func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
  111. p.lock.Lock()
  112. if p.internal == nil {
  113. err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar)
  114. p.lock.Unlock()
  115. if err != nil {
  116. log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err)
  117. return
  118. }
  119. } else {
  120. p.lock.Unlock()
  121. }
  122. atShutdown(context.Background(), p.Shutdown)
  123. atTerminate(context.Background(), p.Terminate)
  124. // Just run the level queue - we shut it down later
  125. go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
  126. go func() {
  127. _ = p.ChannelQueue.pool.AddWorkers(p.workers, 0)
  128. }()
  129. log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
  130. <-p.closed
  131. log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
  132. p.ChannelQueue.pool.cancel()
  133. p.internal.(*LevelQueue).pool.cancel()
  134. log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
  135. p.ChannelQueue.pool.Wait()
  136. p.internal.(*LevelQueue).pool.Wait()
  137. // Redirect all remaining data in the chan to the internal channel
  138. go func() {
  139. log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
  140. for data := range p.ChannelQueue.pool.dataChan {
  141. _ = p.internal.Push(data)
  142. }
  143. log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
  144. }()
  145. log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name)
  146. }
  147. // Shutdown processing this queue
  148. func (p *PersistableChannelQueue) Shutdown() {
  149. log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
  150. select {
  151. case <-p.closed:
  152. default:
  153. p.lock.Lock()
  154. defer p.lock.Unlock()
  155. if p.internal != nil {
  156. p.internal.(*LevelQueue).Shutdown()
  157. }
  158. close(p.closed)
  159. }
  160. }
  161. // Terminate this queue and close the queue
  162. func (p *PersistableChannelQueue) Terminate() {
  163. log.Trace("PersistableChannelQueue: %s Terminating", p.delayedStarter.name)
  164. p.Shutdown()
  165. p.lock.Lock()
  166. defer p.lock.Unlock()
  167. if p.internal != nil {
  168. p.internal.(*LevelQueue).Terminate()
  169. }
  170. }
  171. func init() {
  172. queuesMap[PersistableChannelQueueType] = NewPersistableChannelQueue
  173. }