You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

service.go 4.4 kB

5 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package publock
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "time"
  7. "gitlink.org.cn/cloudream/common/pkgs/future"
  8. "gitlink.org.cn/cloudream/common/pkgs/logger"
  9. "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types"
  10. clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client"
  11. )
  12. type AcquireOptionFn func(opt *types.AcquireOption)
  13. func WithTimeout(timeout time.Duration) AcquireOptionFn {
  14. return func(opt *types.AcquireOption) {
  15. opt.Timeout = timeout
  16. }
  17. }
  18. func WithReason(reason string) AcquireOptionFn {
  19. return func(opt *types.AcquireOption) {
  20. opt.Reason = reason
  21. }
  22. }
  23. type PubLock struct {
  24. core *Core
  25. cliCli *clirpc.Client
  26. pubChan clirpc.PubLockMessageChan
  27. lock sync.Mutex
  28. acquirings map[string]*acquireInfo
  29. releasing map[string]*releaseInfo
  30. nextCtxID int64
  31. }
  32. func NewMaster() *PubLock {
  33. core := NewCore()
  34. return &PubLock{
  35. core: core,
  36. }
  37. }
  38. func NewSlave(cli *clirpc.Client) *PubLock {
  39. return &PubLock{
  40. cliCli: cli,
  41. acquirings: make(map[string]*acquireInfo),
  42. releasing: make(map[string]*releaseInfo),
  43. }
  44. }
  45. func (s *PubLock) BeginReentrant() *Reentrant {
  46. r := &Reentrant{
  47. p: s,
  48. }
  49. r.T = r
  50. return r
  51. }
  52. func (p *PubLock) BeginMutex() *MutexBuilder {
  53. m := &MutexBuilder{
  54. pub: p,
  55. }
  56. m.T = m
  57. return m
  58. }
  59. func (p *PubLock) Start() {
  60. if p.core != nil {
  61. p.core.Start()
  62. return
  63. }
  64. }
  65. func (p *PubLock) Stop() {
  66. p.lock.Lock()
  67. defer p.lock.Unlock()
  68. if p.core != nil {
  69. p.core.Stop()
  70. return
  71. }
  72. if p.pubChan != nil {
  73. p.pubChan.Close()
  74. p.pubChan = nil
  75. }
  76. p.cliCli.Release()
  77. p.cliCli = nil
  78. }
  79. func (p *PubLock) Acquire(req types.LockRequest, opt types.AcquireOption) (LockedRequest, error) {
  80. p.lock.Lock()
  81. if p.core != nil {
  82. p.lock.Unlock()
  83. return p.core.Acquire(req, opt)
  84. }
  85. if p.pubChan == nil {
  86. p.pubChan = p.cliCli.PubLockChannel(context.Background())
  87. go p.receivingChan()
  88. }
  89. acqID := fmt.Sprintf("%v", p.nextCtxID)
  90. p.nextCtxID++
  91. cerr := p.pubChan.Send(&types.AcquireMsg{ContextID: acqID, Request: req, Option: opt})
  92. if cerr != nil {
  93. p.lock.Unlock()
  94. return LockedRequest{}, cerr.ToError()
  95. }
  96. callback := future.NewSetValue[types.RequestID]()
  97. info := &acquireInfo{
  98. Request: req,
  99. Callback: callback,
  100. }
  101. p.acquirings[acqID] = info
  102. p.lock.Unlock()
  103. reqID, err := callback.Wait(context.Background())
  104. if err != nil {
  105. return LockedRequest{}, err
  106. }
  107. return LockedRequest{
  108. Req: req,
  109. ReqID: reqID,
  110. }, nil
  111. }
  112. func (p *PubLock) Release(reqID types.RequestID) {
  113. log := logger.WithField("Mod", "PubLock")
  114. p.lock.Lock()
  115. if p.core != nil {
  116. p.lock.Unlock()
  117. p.core.release(reqID)
  118. return
  119. }
  120. if p.pubChan == nil {
  121. p.pubChan = p.cliCli.PubLockChannel(context.Background())
  122. go p.receivingChan()
  123. }
  124. relID := fmt.Sprintf("%v", p.nextCtxID)
  125. p.nextCtxID++
  126. cerr := p.pubChan.Send(&types.ReleaseMsg{ContextID: relID, RequestID: reqID})
  127. if cerr != nil {
  128. p.lock.Unlock()
  129. log.Warnf("unlock %v: %v", reqID, cerr.ToError())
  130. return
  131. }
  132. callback := future.NewSetVoid()
  133. info := &releaseInfo{
  134. RequestID: reqID,
  135. Callback: callback,
  136. }
  137. p.releasing[relID] = info
  138. p.lock.Unlock()
  139. err := callback.Wait(context.Background())
  140. if err != nil {
  141. log.Warnf("unlock %v: %v", reqID, err)
  142. return
  143. }
  144. log.Tracef("unlock %v", reqID)
  145. }
  146. func (p *PubLock) receivingChan() {
  147. log := logger.WithField("Mod", "PubLock")
  148. for {
  149. msg, cerr := p.pubChan.Receive()
  150. if cerr != nil {
  151. p.lock.Lock()
  152. for _, info := range p.acquirings {
  153. info.Callback.SetError(cerr.ToError())
  154. }
  155. p.acquirings = make(map[string]*acquireInfo)
  156. for _, info := range p.releasing {
  157. info.Callback.SetError(cerr.ToError())
  158. }
  159. p.releasing = make(map[string]*releaseInfo)
  160. p.lock.Unlock()
  161. log.Warnf("receive channel error: %v", cerr.ToError())
  162. return
  163. }
  164. p.lock.Lock()
  165. switch msg := msg.(type) {
  166. case *types.AcquireResultMsg:
  167. info, ok := p.acquirings[msg.ContextID]
  168. if !ok {
  169. continue
  170. }
  171. if msg.Success {
  172. info.Callback.SetValue(msg.RequestID)
  173. } else {
  174. info.Callback.SetError(fmt.Errorf(msg.Error))
  175. }
  176. delete(p.acquirings, msg.ContextID)
  177. case *types.ReleaseResultMsg:
  178. info, ok := p.releasing[msg.ContextID]
  179. if !ok {
  180. continue
  181. }
  182. info.Callback.SetVoid()
  183. delete(p.releasing, msg.ContextID)
  184. }
  185. p.lock.Unlock()
  186. }
  187. }
  188. type releaseInfo struct {
  189. RequestID types.RequestID
  190. Callback *future.SetVoidFuture
  191. }

本项目旨在将云际存储公共基础设施化,使个人及企业可低门槛使用高效的云际存储服务(安装开箱即用云际存储客户端即可,无需关注其他组件的部署),同时支持用户灵活便捷定制云际存储的功能细节。