You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

manager.go 6.2 kB

Graceful Queues: Issue Indexing and Tasks (#9363) * Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "encoding/json"
  8. "fmt"
  9. "reflect"
  10. "sort"
  11. "sync"
  12. "time"
  13. "code.gitea.io/gitea/modules/log"
  14. )
  15. var manager *Manager
  16. // Manager is a queue manager
  17. type Manager struct {
  18. mutex sync.Mutex
  19. counter int64
  20. Queues map[int64]*ManagedQueue
  21. }
  22. // ManagedQueue represents a working queue inheriting from Gitea.
  23. type ManagedQueue struct {
  24. mutex sync.Mutex
  25. QID int64
  26. Queue Queue
  27. Type Type
  28. Name string
  29. Configuration interface{}
  30. ExemplarType string
  31. Pool ManagedPool
  32. counter int64
  33. PoolWorkers map[int64]*PoolWorkers
  34. }
  35. // ManagedPool is a simple interface to get certain details from a worker pool
  36. type ManagedPool interface {
  37. AddWorkers(number int, timeout time.Duration) context.CancelFunc
  38. NumberOfWorkers() int
  39. MaxNumberOfWorkers() int
  40. SetMaxNumberOfWorkers(int)
  41. BoostTimeout() time.Duration
  42. BlockTimeout() time.Duration
  43. BoostWorkers() int
  44. SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
  45. }
  46. // ManagedQueueList implements the sort.Interface
  47. type ManagedQueueList []*ManagedQueue
  48. // PoolWorkers represents a working queue inheriting from Gitea.
  49. type PoolWorkers struct {
  50. PID int64
  51. Workers int
  52. Start time.Time
  53. Timeout time.Time
  54. HasTimeout bool
  55. Cancel context.CancelFunc
  56. }
  57. // PoolWorkersList implements the sort.Interface
  58. type PoolWorkersList []*PoolWorkers
  59. func init() {
  60. _ = GetManager()
  61. }
  62. // GetManager returns a Manager and initializes one as singleton if there's none yet
  63. func GetManager() *Manager {
  64. if manager == nil {
  65. manager = &Manager{
  66. Queues: make(map[int64]*ManagedQueue),
  67. }
  68. }
  69. return manager
  70. }
  71. // Add adds a queue to this manager
  72. func (m *Manager) Add(queue Queue,
  73. t Type,
  74. configuration,
  75. exemplar interface{},
  76. pool ManagedPool) int64 {
  77. cfg, _ := json.Marshal(configuration)
  78. mq := &ManagedQueue{
  79. Queue: queue,
  80. Type: t,
  81. Configuration: string(cfg),
  82. ExemplarType: reflect.TypeOf(exemplar).String(),
  83. PoolWorkers: make(map[int64]*PoolWorkers),
  84. Pool: pool,
  85. }
  86. m.mutex.Lock()
  87. m.counter++
  88. mq.QID = m.counter
  89. mq.Name = fmt.Sprintf("queue-%d", mq.QID)
  90. if named, ok := queue.(Named); ok {
  91. mq.Name = named.Name()
  92. }
  93. m.Queues[mq.QID] = mq
  94. m.mutex.Unlock()
  95. log.Trace("Queue Manager registered: %s (QID: %d)", mq.Name, mq.QID)
  96. return mq.QID
  97. }
  98. // Remove a queue from the Manager
  99. func (m *Manager) Remove(qid int64) {
  100. m.mutex.Lock()
  101. delete(m.Queues, qid)
  102. m.mutex.Unlock()
  103. log.Trace("Queue Manager removed: QID: %d", qid)
  104. }
  105. // GetManagedQueue by qid
  106. func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
  107. m.mutex.Lock()
  108. defer m.mutex.Unlock()
  109. return m.Queues[qid]
  110. }
  111. // ManagedQueues returns the managed queues
  112. func (m *Manager) ManagedQueues() []*ManagedQueue {
  113. m.mutex.Lock()
  114. mqs := make([]*ManagedQueue, 0, len(m.Queues))
  115. for _, mq := range m.Queues {
  116. mqs = append(mqs, mq)
  117. }
  118. m.mutex.Unlock()
  119. sort.Sort(ManagedQueueList(mqs))
  120. return mqs
  121. }
  122. // Workers returns the poolworkers
  123. func (q *ManagedQueue) Workers() []*PoolWorkers {
  124. q.mutex.Lock()
  125. workers := make([]*PoolWorkers, 0, len(q.PoolWorkers))
  126. for _, worker := range q.PoolWorkers {
  127. workers = append(workers, worker)
  128. }
  129. q.mutex.Unlock()
  130. sort.Sort(PoolWorkersList(workers))
  131. return workers
  132. }
  133. // RegisterWorkers registers workers to this queue
  134. func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 {
  135. q.mutex.Lock()
  136. defer q.mutex.Unlock()
  137. q.counter++
  138. q.PoolWorkers[q.counter] = &PoolWorkers{
  139. PID: q.counter,
  140. Workers: number,
  141. Start: start,
  142. Timeout: timeout,
  143. HasTimeout: hasTimeout,
  144. Cancel: cancel,
  145. }
  146. return q.counter
  147. }
  148. // CancelWorkers cancels pooled workers with pid
  149. func (q *ManagedQueue) CancelWorkers(pid int64) {
  150. q.mutex.Lock()
  151. pw, ok := q.PoolWorkers[pid]
  152. q.mutex.Unlock()
  153. if !ok {
  154. return
  155. }
  156. pw.Cancel()
  157. }
  158. // RemoveWorkers deletes pooled workers with pid
  159. func (q *ManagedQueue) RemoveWorkers(pid int64) {
  160. q.mutex.Lock()
  161. pw, ok := q.PoolWorkers[pid]
  162. delete(q.PoolWorkers, pid)
  163. q.mutex.Unlock()
  164. if ok && pw.Cancel != nil {
  165. pw.Cancel()
  166. }
  167. }
  168. // AddWorkers adds workers to the queue if it has registered an add worker function
  169. func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
  170. if q.Pool != nil {
  171. // the cancel will be added to the pool workers description above
  172. return q.Pool.AddWorkers(number, timeout)
  173. }
  174. return nil
  175. }
  176. // NumberOfWorkers returns the number of workers in the queue
  177. func (q *ManagedQueue) NumberOfWorkers() int {
  178. if q.Pool != nil {
  179. return q.Pool.NumberOfWorkers()
  180. }
  181. return -1
  182. }
  183. // MaxNumberOfWorkers returns the maximum number of workers for the pool
  184. func (q *ManagedQueue) MaxNumberOfWorkers() int {
  185. if q.Pool != nil {
  186. return q.Pool.MaxNumberOfWorkers()
  187. }
  188. return 0
  189. }
  190. // BoostWorkers returns the number of workers for a boost
  191. func (q *ManagedQueue) BoostWorkers() int {
  192. if q.Pool != nil {
  193. return q.Pool.BoostWorkers()
  194. }
  195. return -1
  196. }
  197. // BoostTimeout returns the timeout of the next boost
  198. func (q *ManagedQueue) BoostTimeout() time.Duration {
  199. if q.Pool != nil {
  200. return q.Pool.BoostTimeout()
  201. }
  202. return 0
  203. }
  204. // BlockTimeout returns the timeout til the next boost
  205. func (q *ManagedQueue) BlockTimeout() time.Duration {
  206. if q.Pool != nil {
  207. return q.Pool.BlockTimeout()
  208. }
  209. return 0
  210. }
  211. // SetSettings sets the setable boost values
  212. func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
  213. if q.Pool != nil {
  214. q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout)
  215. }
  216. }
  217. func (l ManagedQueueList) Len() int {
  218. return len(l)
  219. }
  220. func (l ManagedQueueList) Less(i, j int) bool {
  221. return l[i].Name < l[j].Name
  222. }
  223. func (l ManagedQueueList) Swap(i, j int) {
  224. l[i], l[j] = l[j], l[i]
  225. }
  226. func (l PoolWorkersList) Len() int {
  227. return len(l)
  228. }
  229. func (l PoolWorkersList) Less(i, j int) bool {
  230. return l[i].Start.Before(l[j].Start)
  231. }
  232. func (l PoolWorkersList) Swap(i, j int) {
  233. l[i], l[j] = l[j], l[i]
  234. }