You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

queue_redis.go 5.6 kB

Graceful Queues: Issue Indexing and Tasks (#9363) * Queue: Add generic graceful queues with settings * Queue & Setting: Add worker pool implementation * Queue: Add worker settings * Queue: Make resizing worker pools * Queue: Add name variable to queues * Queue: Add monitoring * Queue: Improve logging * Issues: Gracefulise the issues indexer Remove the old now unused specific queues * Task: Move to generic queue and gracefulise * Issues: Standardise the issues indexer queue settings * Fix test * Queue: Allow Redis to connect to unix * Prevent deadlock during early shutdown of issue indexer * Add MaxWorker settings to queues * Merge branch 'master' into graceful-queues * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/indexer/issues/indexer.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Update modules/queue/queue_disk.go * Update modules/queue/queue_disk_channel.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * Rename queue.Description to queue.ManagedQueue as per @guillep2k * Cancel pool workers when removed * Remove dependency on queue from setting * Update modules/queue/queue_redis.go Co-Authored-By: guillep2k <18600385+guillep2k@users.noreply.github.com> * As per @guillep2k add mutex locks on shutdown/terminate * move unlocking out of setInternal * Add warning if number of workers < 0 * Small changes as per @guillep2k * No redis host specified not found * Clean up documentation for queues * Update docs/content/doc/advanced/config-cheat-sheet.en-us.md * Update modules/indexer/issues/indexer_test.go * Ensure that persistable channel queue is added to manager * Rename QUEUE_NAME REDIS_QUEUE_NAME * Revert "Rename QUEUE_NAME REDIS_QUEUE_NAME" This reverts commit 1f83b4fc9b9dabda186257b38c265fe7012f90df. Co-authored-by: guillep2k <18600385+guillep2k@users.noreply.github.com> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: techknowlogick <matti@mdranta.net> Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. // Copyright 2019 The Gitea Authors. All rights reserved.
  2. // Use of this source code is governed by a MIT-style
  3. // license that can be found in the LICENSE file.
  4. package queue
  5. import (
  6. "context"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "reflect"
  11. "strings"
  12. "sync"
  13. "time"
  14. "code.gitea.io/gitea/modules/log"
  15. "github.com/go-redis/redis"
  16. )
  17. // RedisQueueType is the type for redis queue
  18. const RedisQueueType Type = "redis"
  19. type redisClient interface {
  20. RPush(key string, args ...interface{}) *redis.IntCmd
  21. LPop(key string) *redis.StringCmd
  22. Ping() *redis.StatusCmd
  23. Close() error
  24. }
  25. // RedisQueue redis queue
  26. type RedisQueue struct {
  27. pool *WorkerPool
  28. client redisClient
  29. queueName string
  30. closed chan struct{}
  31. terminated chan struct{}
  32. exemplar interface{}
  33. workers int
  34. name string
  35. lock sync.Mutex
  36. }
  37. // RedisQueueConfiguration is the configuration for the redis queue
  38. type RedisQueueConfiguration struct {
  39. Network string
  40. Addresses string
  41. Password string
  42. DBIndex int
  43. BatchLength int
  44. QueueLength int
  45. QueueName string
  46. Workers int
  47. MaxWorkers int
  48. BlockTimeout time.Duration
  49. BoostTimeout time.Duration
  50. BoostWorkers int
  51. Name string
  52. }
  53. // NewRedisQueue creates single redis or cluster redis queue
  54. func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
  55. configInterface, err := toConfig(RedisQueueConfiguration{}, cfg)
  56. if err != nil {
  57. return nil, err
  58. }
  59. config := configInterface.(RedisQueueConfiguration)
  60. dbs := strings.Split(config.Addresses, ",")
  61. dataChan := make(chan Data, config.QueueLength)
  62. ctx, cancel := context.WithCancel(context.Background())
  63. var queue = &RedisQueue{
  64. pool: &WorkerPool{
  65. baseCtx: ctx,
  66. cancel: cancel,
  67. batchLength: config.BatchLength,
  68. handle: handle,
  69. dataChan: dataChan,
  70. blockTimeout: config.BlockTimeout,
  71. boostTimeout: config.BoostTimeout,
  72. boostWorkers: config.BoostWorkers,
  73. maxNumberOfWorkers: config.MaxWorkers,
  74. },
  75. queueName: config.QueueName,
  76. exemplar: exemplar,
  77. closed: make(chan struct{}),
  78. workers: config.Workers,
  79. name: config.Name,
  80. }
  81. if len(dbs) == 0 {
  82. return nil, errors.New("no redis host specified")
  83. } else if len(dbs) == 1 {
  84. queue.client = redis.NewClient(&redis.Options{
  85. Network: config.Network,
  86. Addr: strings.TrimSpace(dbs[0]), // use default Addr
  87. Password: config.Password, // no password set
  88. DB: config.DBIndex, // use default DB
  89. })
  90. } else {
  91. queue.client = redis.NewClusterClient(&redis.ClusterOptions{
  92. Addrs: dbs,
  93. })
  94. }
  95. if err := queue.client.Ping().Err(); err != nil {
  96. return nil, err
  97. }
  98. queue.pool.qid = GetManager().Add(queue, RedisQueueType, config, exemplar, queue.pool)
  99. return queue, nil
  100. }
  101. // Run runs the redis queue
  102. func (r *RedisQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
  103. atShutdown(context.Background(), r.Shutdown)
  104. atTerminate(context.Background(), r.Terminate)
  105. go func() {
  106. _ = r.pool.AddWorkers(r.workers, 0)
  107. }()
  108. go r.readToChan()
  109. log.Trace("RedisQueue: %s Waiting til closed", r.name)
  110. <-r.closed
  111. log.Trace("RedisQueue: %s Waiting til done", r.name)
  112. r.pool.Wait()
  113. log.Trace("RedisQueue: %s Waiting til cleaned", r.name)
  114. ctx, cancel := context.WithCancel(context.Background())
  115. atTerminate(ctx, cancel)
  116. r.pool.CleanUp(ctx)
  117. cancel()
  118. }
  119. func (r *RedisQueue) readToChan() {
  120. for {
  121. select {
  122. case <-r.closed:
  123. // tell the pool to shutdown
  124. r.pool.cancel()
  125. return
  126. default:
  127. bs, err := r.client.LPop(r.queueName).Bytes()
  128. if err != nil && err != redis.Nil {
  129. log.Error("RedisQueue: %s Error on LPop: %v", r.name, err)
  130. time.Sleep(time.Millisecond * 100)
  131. continue
  132. }
  133. if len(bs) == 0 {
  134. time.Sleep(time.Millisecond * 100)
  135. continue
  136. }
  137. var data Data
  138. if r.exemplar != nil {
  139. t := reflect.TypeOf(r.exemplar)
  140. n := reflect.New(t)
  141. ne := n.Elem()
  142. err = json.Unmarshal(bs, ne.Addr().Interface())
  143. data = ne.Interface().(Data)
  144. } else {
  145. err = json.Unmarshal(bs, &data)
  146. }
  147. if err != nil {
  148. log.Error("RedisQueue: %s Error on Unmarshal: %v", r.name, err)
  149. time.Sleep(time.Millisecond * 100)
  150. continue
  151. }
  152. log.Trace("RedisQueue: %s Task found: %#v", r.name, data)
  153. r.pool.Push(data)
  154. }
  155. }
  156. }
  157. // Push implements Queue
  158. func (r *RedisQueue) Push(data Data) error {
  159. if r.exemplar != nil {
  160. // Assert data is of same type as r.exemplar
  161. value := reflect.ValueOf(data)
  162. t := value.Type()
  163. exemplarType := reflect.ValueOf(r.exemplar).Type()
  164. if !t.AssignableTo(exemplarType) || data == nil {
  165. return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, r.exemplar, r.name)
  166. }
  167. }
  168. bs, err := json.Marshal(data)
  169. if err != nil {
  170. return err
  171. }
  172. return r.client.RPush(r.queueName, bs).Err()
  173. }
  174. // Shutdown processing from this queue
  175. func (r *RedisQueue) Shutdown() {
  176. log.Trace("Shutdown: %s", r.name)
  177. r.lock.Lock()
  178. select {
  179. case <-r.closed:
  180. default:
  181. close(r.closed)
  182. }
  183. r.lock.Unlock()
  184. }
  185. // Terminate this queue and close the queue
  186. func (r *RedisQueue) Terminate() {
  187. log.Trace("Terminating: %s", r.name)
  188. r.Shutdown()
  189. r.lock.Lock()
  190. select {
  191. case <-r.terminated:
  192. r.lock.Unlock()
  193. default:
  194. close(r.terminated)
  195. r.lock.Unlock()
  196. if err := r.client.Close(); err != nil {
  197. log.Error("Error whilst closing internal redis client in %s: %v", r.name, err)
  198. }
  199. }
  200. }
  201. // Name returns the name of this queue
  202. func (r *RedisQueue) Name() string {
  203. return r.name
  204. }
  205. func init() {
  206. queuesMap[RedisQueueType] = NewRedisQueue
  207. }