You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

service.go 3.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package service
  2. import (
  3. "fmt"
  4. "time"
  5. "gitlink.org.cn/cloudream/common/pkg/distlock"
  6. "gitlink.org.cn/cloudream/common/pkg/distlock/service/internal"
  7. clientv3 "go.etcd.io/etcd/client/v3"
  8. )
  9. type Service struct {
  10. cfg *distlock.Config
  11. etcdCli *clientv3.Client
  12. mainActor *internal.MainActor
  13. providersActor *internal.ProvidersActor
  14. watchEtcdActor *internal.WatchEtcdActor
  15. leaseActor *internal.LeaseActor
  16. }
  17. func NewService(cfg *distlock.Config) (*Service, error) {
  18. etcdCli, err := clientv3.New(clientv3.Config{
  19. Endpoints: []string{cfg.EtcdAddress},
  20. Username: cfg.EtcdUsername,
  21. Password: cfg.EtcdPassword,
  22. })
  23. if err != nil {
  24. return nil, fmt.Errorf("new etcd client failed, err: %w", err)
  25. }
  26. mainActor := internal.NewMainActor()
  27. providersActor := internal.NewProvidersActor()
  28. watchEtcdActor := internal.NewWatchEtcdActor()
  29. leaseActor := internal.NewLeaseActor()
  30. mainActor.Init(watchEtcdActor, providersActor)
  31. providersActor.Init()
  32. watchEtcdActor.Init(providersActor)
  33. leaseActor.Init(mainActor)
  34. initProviders(providersActor)
  35. return &Service{
  36. cfg: cfg,
  37. etcdCli: etcdCli,
  38. mainActor: mainActor,
  39. providersActor: providersActor,
  40. watchEtcdActor: watchEtcdActor,
  41. leaseActor: leaseActor,
  42. }, nil
  43. }
  44. // Acquire 请求一批锁。成功后返回锁请求ID
  45. func (svc *Service) Acquire(req distlock.LockRequest) (string, error) {
  46. reqID, err := svc.mainActor.Acquire(req)
  47. if err != nil {
  48. return "", err
  49. }
  50. // TODO 不影响结果,但考虑打日志
  51. svc.leaseActor.Add(reqID, time.Duration(svc.cfg.LockRequestLeaseTimeSec)*time.Second)
  52. return reqID, nil
  53. }
  54. // Renew 续约锁
  55. func (svc *Service) Renew(reqID string) error {
  56. return svc.leaseActor.Renew(reqID, time.Duration(svc.cfg.LockRequestLeaseTimeSec)*time.Second)
  57. }
  58. // Release 释放锁
  59. func (svc *Service) Release(reqID string) error {
  60. err := svc.mainActor.Release(reqID)
  61. // TODO 不影响结果,但考虑打日志
  62. svc.leaseActor.Remove(reqID)
  63. return err
  64. }
  65. func (svc *Service) Serve() error {
  66. go func() {
  67. // TODO 处理错误
  68. svc.providersActor.Serve()
  69. }()
  70. go func() {
  71. // TODO 处理错误
  72. svc.watchEtcdActor.Serve()
  73. }()
  74. go func() {
  75. // TODO 处理错误
  76. svc.mainActor.Serve()
  77. }()
  78. go func() {
  79. // TODO 处理错误
  80. svc.leaseActor.Server()
  81. }()
  82. err := svc.mainActor.ReloadEtcdData()
  83. if err != nil {
  84. // TODO 关闭其他的Actor,或者更好的错误处理方式
  85. return fmt.Errorf("init data failed, err: %w", err)
  86. }
  87. err = svc.watchEtcdActor.StartWatching()
  88. if err != nil {
  89. // TODO 关闭其他的Actor,或者更好的错误处理方式
  90. return fmt.Errorf("start watching etcd failed, err: %w", err)
  91. }
  92. err = svc.leaseActor.StartChecking()
  93. if err != nil {
  94. // TODO 关闭其他的Actor,或者更好的错误处理方式
  95. return fmt.Errorf("start checking lease failed, err: %w", err)
  96. }
  97. // TODO 临时解决办法
  98. ch := make(chan any)
  99. <-ch
  100. return nil
  101. }

公共库

Contributors (1)