From 17e44524350759cda58346145cc8f94f7ee2a81c Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Fri, 1 Aug 2025 09:14:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0PubLock=E7=9A=84RPC=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/internal/cmdline/serve.go | 26 ++++++++++++ client/internal/config/config.go | 2 + client/internal/publock/core.go | 10 +---- client/internal/publock/mutex.go | 17 ++++++-- client/internal/publock/reentrant.go | 15 +++++-- client/internal/publock/service.go | 21 ++++----- client/internal/publock/types/channel.go | 3 +- client/internal/publock/types/models.go | 6 +++ client/internal/rpc/publock.go | 54 ++++++++++++++++++++++++ client/internal/rpc/rpc.go | 18 ++++++++ 10 files changed, 142 insertions(+), 30 deletions(-) create mode 100644 client/internal/rpc/publock.go create mode 100644 client/internal/rpc/rpc.go diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index e6bbb90..8e4139a 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -17,6 +17,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/metacache" "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" + myrpc "gitlink.org.cn/cloudream/jcs-pub/client/internal/rpc" "gitlink.org.cn/cloudream/jcs-pub/client/internal/services" "gitlink.org.cn/cloudream/jcs-pub/client/internal/spacesyncer" "gitlink.org.cn/cloudream/jcs-pub/client/internal/speedstats" @@ -24,6 +25,8 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/uploader" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" + clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/storage/pool" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" jcstypes "gitlink.org.cn/cloudream/jcs-pub/common/types" @@ -240,6 +243,11 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { httpChan := httpSvr.Start() defer httpSvr.Stop() + // RPC接口 + rpcSvr := clirpc.NewServer(config.Cfg().RPC, myrpc.NewService(publock), nil) + rpcChan := rpcSvr.Start() + defer rpcSvr.Stop() + /// 开始监听各个模块的事件 accTokenEvt := accTokenChan.Receive() @@ -249,6 +257,7 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { spaceSyncEvt := spaceSyncChan.Receive() // replEvt := replCh.Receive() httpEvt := httpChan.Receive() + rpcEvt := rpcChan.Receive() mntEvt := mntChan.Receive() loop: @@ -358,6 +367,23 @@ loop: } httpEvt = httpChan.Receive() + case e := <-rpcEvt.Chan(): + if e.Err != nil { + logger.Errorf("receive rpc event: %v", err) + break loop + } + + switch e := e.Value.(type) { + case rpc.ExitEvent: + if e.Err != nil { + logger.Errorf("rpc server exited with error: %v", e.Err) + } else { + logger.Infof("rpc server exited") + } + break loop + } + rpcEvt = rpcChan.Receive() + case e := <-mntEvt.Chan(): if e.Err != nil { logger.Errorf("receive mount event: %v", e.Err) diff --git a/client/internal/config/config.go b/client/internal/config/config.go index 7a7cb50..38727d0 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -12,6 +12,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/sysevent" @@ -29,6 +30,7 @@ type Config struct { DownloadStrategy strategy.Config `json:"downloadStrategy"` TickTock ticktock.Config `json:"tickTock"` HTTP *http.ConfigJSON `json:"http"` + RPC rpc.Config `json:"rpc"` Mount *mntcfg.Config `json:"mount"` AccessToken *accesstoken.Config `json:"accessToken"` } diff --git a/client/internal/publock/core.go b/client/internal/publock/core.go index 5368b84..ed2aba2 100644 --- a/client/internal/publock/core.go +++ b/client/internal/publock/core.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "sync" - "time" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/trie" @@ -47,14 +46,7 @@ type acquireInfo struct { LastErr error } -func (svc *Core) Acquire(req types.LockRequest, opts ...AcquireOptionFn) (LockedRequest, error) { - var opt = AcquireOption{ - Timeout: time.Second * 10, - } - for _, fn := range opts { - fn(&opt) - } - +func (svc *Core) Acquire(req types.LockRequest, opt types.AcquireOption) (LockedRequest, error) { ctx := context.Background() if opt.Timeout != 0 { var cancel func() diff --git a/client/internal/publock/mutex.go b/client/internal/publock/mutex.go index 475a039..c13eefc 100644 --- a/client/internal/publock/mutex.go +++ b/client/internal/publock/mutex.go @@ -1,6 +1,8 @@ package publock import ( + "time" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" ) @@ -11,7 +13,7 @@ type Mutex struct { } func (m *Mutex) Unlock() { - m.pub.release(m.locked.ReqID) + m.pub.Release(m.locked.ReqID) } type MutexBuilder struct { @@ -19,10 +21,17 @@ type MutexBuilder struct { pub *PubLock } -func (b *MutexBuilder) Lock(opt ...AcquireOptionFn) (*Mutex, error) { - lkd, err := b.pub.acquire(types.LockRequest{ +func (b *MutexBuilder) Lock(opts ...AcquireOptionFn) (*Mutex, error) { + var opt = types.AcquireOption{ + Timeout: time.Second * 10, + } + for _, fn := range opts { + fn(&opt) + } + + lkd, err := b.pub.Acquire(types.LockRequest{ Locks: b.Locks, - }, opt...) + }, opt) if err != nil { return nil, err } diff --git a/client/internal/publock/reentrant.go b/client/internal/publock/reentrant.go index ecb35f6..9cf331a 100644 --- a/client/internal/publock/reentrant.go +++ b/client/internal/publock/reentrant.go @@ -1,6 +1,8 @@ package publock import ( + "time" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/reqbuilder" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" ) @@ -12,7 +14,14 @@ type Reentrant struct { locked []LockedRequest } -func (r *Reentrant) Lock(opt ...AcquireOptionFn) error { +func (r *Reentrant) Lock(opts ...AcquireOptionFn) error { + var opt = types.AcquireOption{ + Timeout: time.Second * 10, + } + for _, fn := range opts { + fn(&opt) + } + var willLock []types.Lock loop: @@ -37,7 +46,7 @@ loop: Locks: willLock, } - m, err := r.p.acquire(newReq, opt...) + m, err := r.p.Acquire(newReq, opt) if err != nil { return err } @@ -50,7 +59,7 @@ loop: func (r *Reentrant) Unlock() { for i := len(r.reqs) - 1; i >= 0; i-- { - r.p.release(r.locked[i].ReqID) + r.p.Release(r.locked[i].ReqID) } r.locked = nil r.reqs = nil diff --git a/client/internal/publock/service.go b/client/internal/publock/service.go index 79c27c9..cb11c3b 100644 --- a/client/internal/publock/service.go +++ b/client/internal/publock/service.go @@ -12,21 +12,16 @@ import ( clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" ) -type AcquireOption struct { - Timeout time.Duration - Reason string -} - -type AcquireOptionFn func(opt *AcquireOption) +type AcquireOptionFn func(opt *types.AcquireOption) func WithTimeout(timeout time.Duration) AcquireOptionFn { - return func(opt *AcquireOption) { + return func(opt *types.AcquireOption) { opt.Timeout = timeout } } func WithReason(reason string) AcquireOptionFn { - return func(opt *AcquireOption) { + return func(opt *types.AcquireOption) { opt.Reason = reason } } @@ -97,12 +92,12 @@ func (p *PubLock) Stop() { p.cliCli = nil } -func (p *PubLock) acquire(req types.LockRequest, opts ...AcquireOptionFn) (LockedRequest, error) { +func (p *PubLock) Acquire(req types.LockRequest, opt types.AcquireOption) (LockedRequest, error) { p.lock.Lock() if p.core != nil { p.lock.Unlock() - return p.core.Acquire(req, opts...) + return p.core.Acquire(req, opt) } if p.pubChan == nil { @@ -113,7 +108,7 @@ func (p *PubLock) acquire(req types.LockRequest, opts ...AcquireOptionFn) (Locke acqID := fmt.Sprintf("%v", p.nextCtxID) p.nextCtxID++ - cerr := p.pubChan.Send(&types.AcquireMsg{ContextID: acqID, Request: req}) + cerr := p.pubChan.Send(&types.AcquireMsg{ContextID: acqID, Request: req, Option: opt}) if cerr != nil { p.lock.Unlock() return LockedRequest{}, cerr.ToError() @@ -138,7 +133,7 @@ func (p *PubLock) acquire(req types.LockRequest, opts ...AcquireOptionFn) (Locke }, nil } -func (p *PubLock) release(reqID types.RequestID) { +func (p *PubLock) Release(reqID types.RequestID) { log := logger.WithField("Mod", "PubLock") p.lock.Lock() @@ -214,7 +209,7 @@ func (p *PubLock) receivingChan() { if msg.Success { info.Callback.SetValue(msg.RequestID) } else { - info.Callback.SetError(fmt.Errorf(msg.Reason)) + info.Callback.SetError(fmt.Errorf(msg.Error)) } delete(p.acquirings, msg.ContextID) diff --git a/client/internal/publock/types/channel.go b/client/internal/publock/types/channel.go index dc53bfd..331c00e 100644 --- a/client/internal/publock/types/channel.go +++ b/client/internal/publock/types/channel.go @@ -3,6 +3,7 @@ package types type AcquireMsg struct { ContextID string Request LockRequest + Option AcquireOption } func (*AcquireMsg) IsPubLockMessage() bool { return true } @@ -10,7 +11,7 @@ func (*AcquireMsg) IsPubLockMessage() bool { return true } type AcquireResultMsg struct { ContextID string Success bool - Reason string + Error string RequestID RequestID } diff --git a/client/internal/publock/types/models.go b/client/internal/publock/types/models.go index c946cc1..54fd92a 100644 --- a/client/internal/publock/types/models.go +++ b/client/internal/publock/types/models.go @@ -2,6 +2,7 @@ package types import ( "fmt" + "time" "gitlink.org.cn/cloudream/common/utils/lo2" ) @@ -58,3 +59,8 @@ func NewLockTargetBusyError(lockName string) *LockTargetBusyError { lockName: lockName, } } + +type AcquireOption struct { + Timeout time.Duration + Reason string +} diff --git a/client/internal/rpc/publock.go b/client/internal/rpc/publock.go new file mode 100644 index 0000000..b6a0ad6 --- /dev/null +++ b/client/internal/rpc/publock.go @@ -0,0 +1,54 @@ +package rpc + +import ( + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" + clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" +) + +func (s *Service) PubLockChannel(ch clirpc.PubLockMessageChan) { + log := logger.WithField("Mod", "RPC") + + for { + msg, cerr := ch.Receive() + if cerr != nil { + log.Warnf("receive publock message: %v", cerr.ToError()) + break + } + + switch msg := msg.(type) { + case *types.AcquireMsg: + go func() { + lkd, err := s.pubLock.Acquire(msg.Request, msg.Option) + if err != nil { + cerr := ch.Send(&types.AcquireResultMsg{ + ContextID: msg.ContextID, + Success: false, + Error: err.Error(), + }) + if cerr != nil { + log.Warnf("send acquire result message: %v", cerr.ToError()) + } + return + } + + cerr := ch.Send(&types.AcquireResultMsg{ + ContextID: msg.ContextID, + Success: true, + RequestID: lkd.ReqID, + }) + if cerr != nil { + log.Warnf("send acquire result message: %v", cerr.ToError()) + } + }() + case *types.ReleaseMsg: + s.pubLock.Release(msg.RequestID) + cerr := ch.Send(&types.ReleaseResultMsg{ + ContextID: msg.ContextID, + }) + if cerr != nil { + log.Warnf("send release result message: %v", cerr.ToError()) + } + } + } +} diff --git a/client/internal/rpc/rpc.go b/client/internal/rpc/rpc.go new file mode 100644 index 0000000..110779e --- /dev/null +++ b/client/internal/rpc/rpc.go @@ -0,0 +1,18 @@ +package rpc + +import ( + "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" + clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" +) + +type Service struct { + pubLock *publock.PubLock +} + +func NewService(pubLock *publock.PubLock) *Service { + return &Service{ + pubLock: pubLock, + } +} + +var _ clirpc.ClientAPI = (*Service)(nil)