You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

clientManager.go 2.1 kB

3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package socketwrap
  2. import (
  3. "os"
  4. "os/signal"
  5. "syscall"
  6. "code.gitea.io/gitea/models"
  7. "code.gitea.io/gitea/modules/log"
  8. "github.com/elliotchance/orderedmap"
  9. )
  10. var opTypes = []int{1, 2, 5, 6, 7, 9, 10, 11, 12, 13, 14, 15, 17, 22, 23}
  11. type ClientsManager struct {
  12. Clients *orderedmap.OrderedMap
  13. Register chan *Client
  14. Unregister chan *Client
  15. }
  16. func NewClientsManager() *ClientsManager {
  17. return &ClientsManager{
  18. Register: make(chan *Client),
  19. Unregister: make(chan *Client),
  20. Clients: orderedmap.NewOrderedMap(),
  21. }
  22. }
  23. const MaxClients = 100
  24. var LastActionsQueue = NewSyncQueue(15)
  25. func (h *ClientsManager) Run() {
  26. initActionQueue()
  27. sig := make(chan os.Signal, 1)
  28. signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
  29. var signalsReceived uint
  30. for {
  31. select {
  32. case client := <-h.Register:
  33. h.Clients.Set(client, true)
  34. if h.Clients.Len() > MaxClients {
  35. h.Clients.Delete(h.Clients.Front().Key)
  36. }
  37. case client := <-h.Unregister:
  38. if _, ok := h.Clients.Get(client); ok {
  39. h.Clients.Delete(client)
  40. close(client.Send)
  41. }
  42. case message := <-models.ActionChan:
  43. if isInOpTypes(opTypes, message.OpType) {
  44. LastActionsQueue.Push(message)
  45. for _, client := range h.Clients.Keys() {
  46. select {
  47. case client.(*Client).Send <- message:
  48. default:
  49. close(client.(*Client).Send)
  50. h.Clients.Delete(client)
  51. }
  52. }
  53. }
  54. case s := <-sig:
  55. log.Info("received signal", s)
  56. signalsReceived++
  57. if signalsReceived < 2 {
  58. for _, client := range h.Clients.Keys() {
  59. h.Clients.Delete(client)
  60. client.(*Client).Close()
  61. }
  62. break
  63. }
  64. }
  65. }
  66. }
  67. func isInOpTypes(types []int, opType models.ActionType) bool {
  68. isFound := false
  69. for _, value := range types {
  70. if value == int(opType) {
  71. isFound = true
  72. break
  73. }
  74. }
  75. return isFound
  76. }
  77. func initActionQueue() {
  78. actions, err := models.GetLast20PublicFeeds(opTypes)
  79. if err == nil {
  80. for i := len(actions) - 1; i >= 0; i-- {
  81. user, err := models.GetUserByID(actions[i].UserID)
  82. if err == nil {
  83. if !user.IsOrganization() {
  84. LastActionsQueue.Push(actions[i])
  85. }
  86. }
  87. }
  88. }
  89. }