diff --git a/client/internal/cluster/cluster.go b/client/internal/cluster/cluster.go index 377a32e..9e9d5b7 100644 --- a/client/internal/cluster/cluster.go +++ b/client/internal/cluster/cluster.go @@ -2,6 +2,7 @@ package cluster import ( "crypto/tls" + "encoding/binary" "fmt" "os" "path/filepath" @@ -24,9 +25,10 @@ type Cluster struct { cfg Config poolCfg clirpc.PoolConfig masterCli MasterClient + doneCh chan any raft *raft.Raft + fsm *raftFSM transport *Transport - doneCh chan any } func New(cfg Config) *Cluster { @@ -39,7 +41,7 @@ func New(cfg Config) *Cluster { } } -func (c *Cluster) Start() (*ClusterEventChan, error) { +func (c *Cluster) Start(fsms []FSM) (*ClusterEventChan, error) { log := logger.WithField("Mod", "Cluster") ch := async.NewUnboundChannel[ClusterEvent]() @@ -48,6 +50,8 @@ func (c *Cluster) Start() (*ClusterEventChan, error) { return ch, nil } + c.fsm = NewFSM(fsms) + poolCfgJSON := clirpc.PoolConfigJSON{ RootCA: c.cfg.RootCA, ClientCert: c.cfg.ClientCert, @@ -83,11 +87,9 @@ func (c *Cluster) Start() (*ClusterEventChan, error) { return nil, fmt.Errorf("create raft snapshot store: %w", err) } - fsm := NewFSM() - c.transport = NewTransport(c.cfg.Announce, *poolCfg) - rft, err := raft.NewRaft(raftCfg, fsm, logDB, stableDB, snapshotStore, c.transport) + rft, err := raft.NewRaft(raftCfg, c.fsm, logDB, stableDB, snapshotStore, c.transport) if err != nil { return nil, fmt.Errorf("create raft: %w", err) } @@ -248,8 +250,24 @@ func (c *Cluster) RaftTransport() *Transport { } // 只有Leader才能调用 -func (c *Cluster) Apply(data string, timeout time.Duration) error { - return c.raft.Apply([]byte(data), timeout).Error() +func (c *Cluster) Apply(fsmID string, data []byte, timeout time.Duration) ([]byte, error) { + fsmIDBytes := []byte(fsmID) + + logBytes := make([]byte, 4+len(fsmIDBytes)+len(data)) + + // 前4个字节表示ID的长度,后面跟着ID和数据 + binary.LittleEndian.PutUint32(logBytes[:4], uint32(len(fsmIDBytes))) + copy(logBytes[4:], fsmIDBytes) + copy(logBytes[4+len(fsmIDBytes):], data) + + fut := c.raft.Apply(logBytes, timeout) + err := fut.Error() + if err != nil { + return nil, err + } + + applyRet := fut.Response().(applyResult) + return applyRet.Result, applyRet.Error } type ClusterEvent interface { diff --git a/client/internal/cluster/fsm.go b/client/internal/cluster/fsm.go index 9791e37..d415adc 100644 --- a/client/internal/cluster/fsm.go +++ b/client/internal/cluster/fsm.go @@ -1,67 +1,168 @@ package cluster import ( + "encoding/binary" "fmt" "io" - "sync" "github.com/hashicorp/raft" - "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/common/utils/http2" + "gitlink.org.cn/cloudream/common/utils/serder" ) -type FSM struct { - data string - lock sync.Mutex +var ErrFSMNotFound = fmt.Errorf("fsm not found") + +type FSM interface { + // 必须唯一且不能变化 + ID() string + Apply(cmd []byte) ([]byte, error) + Snapshot() (FSMSnapshot, error) + Restore(input io.Reader) error } -func NewFSM() *FSM { - return &FSM{data: ""} +type FSMSnapshot interface { + Persist(output io.Writer) error + Release() } -func (f *FSM) Apply(l *raft.Log) interface{} { - fmt.Printf("log: %v\n", string(l.Data)) +type SnapshotMeta struct { + Version int +} - f.lock.Lock() - defer f.lock.Unlock() +type raftFSM struct { + fsms map[string]FSM +} - f.data = f.data + "\n" + string(l.Data) - return nil +func NewFSM(fsms []FSM) *raftFSM { + fsmsMp := make(map[string]FSM) + for _, fsm := range fsms { + _, ok := fsmsMp[fsm.ID()] + if ok { + panic(fmt.Sprintf("duplicate fsm id: %s", fsm.ID())) + } + fsmsMp[fsm.ID()] = fsm + } + + return &raftFSM{fsms: fsmsMp} } -func (f *FSM) Snapshot() (raft.FSMSnapshot, error) { - f.lock.Lock() - defer f.lock.Unlock() +func (f *raftFSM) Apply(l *raft.Log) interface{} { + idLen := binary.LittleEndian.Uint32(l.Data[:4]) + fsm, ok := f.fsms[string(l.Data[4:4+idLen])] + if !ok { + return ErrFSMNotFound + } - return &Snapshot{data: f.data}, nil + cmd := l.Data[4+idLen:] + res, err := fsm.Apply(cmd) + return applyResult{ + Result: res, + Error: err, + } +} + +func (f *raftFSM) Snapshot() (raft.FSMSnapshot, error) { + snapshots := make(map[string]FSMSnapshot) + for id, fsm := range f.fsms { + snapshot, err := fsm.Snapshot() + if err != nil { + for _, snapshot := range snapshots { + snapshot.Release() + } + return nil, err + } + + snapshots[id] = snapshot + } + + return &Snapshot{snapshots: snapshots}, nil } -func (f *FSM) Restore(rc io.ReadCloser) error { - data, err := io.ReadAll(rc) +func (f *raftFSM) Restore(rc io.ReadCloser) error { + defer rc.Close() + + cr := http2.NewChunkedReader(rc) + + _, metaBytes, err := cr.NextDataPart() if err != nil { - return err + return fmt.Errorf("read meta data part: %v", err) + } + meta := SnapshotMeta{} + if err := serder.JSONToObject(metaBytes, &meta); err != nil { + return fmt.Errorf("unmarshal meta data: %v", err) + } + // 进行类似的检查 + if meta.Version != 1 { + return fmt.Errorf("unsupported version: %d", meta.Version) } - f.lock.Lock() - defer f.lock.Unlock() + for { + id, reader, err := cr.NextPart() + if err != nil && err != io.EOF { + return err + } + + if err == io.EOF { + // TODO 考虑检查一下是否调用了所有FSM的Restore方法 + break + } + + fsm, ok := f.fsms[id] + if !ok { + // TODO 兼容性 + continue + } + + err = fsm.Restore(reader) + if err != nil { + // TODO 不知道Raft库在发现Restore失败后是否能够及时停止服务 + return fmt.Errorf("restore fsm %s: %v", id, err) + } + } - f.data = string(data) return nil } -var _ raft.FSM = (*FSM)(nil) +var _ raft.FSM = (*raftFSM)(nil) type Snapshot struct { - data string + snapshots map[string]FSMSnapshot } func (s *Snapshot) Persist(sink raft.SnapshotSink) error { - err := io2.WriteAll(sink, []byte(s.data)) + meta := SnapshotMeta{Version: 1} + metaBytes, err := serder.ObjectToJSON(meta) + if err != nil { + sink.Cancel() + return fmt.Errorf("marshal meta data: %v", err) + } + + cw := http2.NewChunkedWriter(sink) + err = cw.WriteDataPart("meta", metaBytes) if err != nil { - return sink.Cancel() + sink.Cancel() + return fmt.Errorf("write meta data part: %v", err) + } + + for id, snapshot := range s.snapshots { + w := cw.BeginPart(id) + err := snapshot.Persist(w) + if err != nil { + sink.Cancel() + return fmt.Errorf("persist fsm %s: %v", id, err) + } } return sink.Close() } func (s *Snapshot) Release() { + for _, snapshot := range s.snapshots { + snapshot.Release() + } +} + +type applyResult struct { + Result []byte + Error error } diff --git a/client/internal/cmdline/init.go b/client/internal/cmdline/init.go index 0730a40..408fdc1 100644 --- a/client/internal/cmdline/init.go +++ b/client/internal/cmdline/init.go @@ -20,6 +20,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http" mntcfg "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/config" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" "gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock" corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator" hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub" @@ -351,6 +352,9 @@ func saveConfig(cfg *clicfg.Config, configPath string) error { CacheExpireTime: time.Minute * 1, ScanDataDirInterval: time.Minute * 10, } + cfg.PubLock = publock.Config{ + LeaseExpiredSeconds: 5, + } configData, err := json.MarshalIndent(cfg, "", " ") if err != nil { diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index 9b8c78c..86421a5 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -178,15 +178,16 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { // 集群模式 clster := cluster.New(config.Cfg().Cluster) - clsterCh, err := clster.Start() + // 公共锁 + publock := publock.New(config.Cfg().PubLock, clster) + + clsterCh, err := clster.Start([]cluster.FSM{publock.Core().FSM()}) if err != nil { logger.Errorf("start cluster: %v", err) os.Exit(1) } defer clster.Stop() - // 公共锁 - publock := publock.NewMaster() publock.Start() defer publock.Stop() diff --git a/client/internal/cmdline/test.go b/client/internal/cmdline/test.go index 381342b..5ac7b33 100644 --- a/client/internal/cmdline/test.go +++ b/client/internal/cmdline/test.go @@ -186,18 +186,18 @@ func test(configPath string) { // 集群模式 clster := cluster.New(config.Cfg().Cluster) - clsterCh, err := clster.Start() + // 公共锁 + publock := publock.New(config.Cfg().PubLock, clster) + + clsterCh, err := clster.Start([]cluster.FSM{publock.Core().FSM()}) if err != nil { logger.Errorf("start cluster: %v", err) os.Exit(1) } defer clster.Stop() - // 公共锁 - publock := publock.NewMaster() publock.Start() defer publock.Stop() - // 访问统计 acStat := accessstat.NewAccessStat(accessstat.Config{ // TODO 考虑放到配置里 diff --git a/client/internal/cmdline/vfstest.go b/client/internal/cmdline/vfstest.go index 73ec5f9..d3d0c13 100644 --- a/client/internal/cmdline/vfstest.go +++ b/client/internal/cmdline/vfstest.go @@ -166,15 +166,16 @@ func vfsTest(configPath string, opts serveHTTPOptions) { // 集群模式 clster := cluster.New(config.Cfg().Cluster) - clsterCh, err := clster.Start() + // 公共锁 + publock := publock.New(config.Cfg().PubLock, clster) + + clsterCh, err := clster.Start([]cluster.FSM{publock.Core().FSM()}) if err != nil { logger.Errorf("start cluster: %v", err) os.Exit(1) } defer clster.Stop() - // 公共锁 - publock := publock.NewMaster() publock.Start() defer publock.Stop() diff --git a/client/internal/config/config.go b/client/internal/config/config.go index d1f0708..6f27a6f 100644 --- a/client/internal/config/config.go +++ b/client/internal/config/config.go @@ -10,6 +10,7 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http" mntcfg "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/config" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" "gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock" stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity" @@ -35,6 +36,7 @@ type Config struct { Mount *mntcfg.Config `json:"mount"` AccessToken *accesstoken.Config `json:"accessToken"` Cluster cluster.Config `json:"cluster"` + PubLock publock.Config `json:"publock"` } var cfg Config diff --git a/client/internal/http/v1/cluster.go b/client/internal/http/v1/cluster.go index 333d0bf..4acce36 100644 --- a/client/internal/http/v1/cluster.go +++ b/client/internal/http/v1/cluster.go @@ -35,7 +35,7 @@ func (s *ClusterService) Status(ctx *gin.Context) { } _, cerr := s.svc.Cluster.MasterClient().ClusterApplyLog(context.TODO(), &clirpc.ClusterApplyLog{ - Data: fmt.Sprintf("%v: %v", s.svc.Cluster.ID(), time.Now()), + Data: []byte(fmt.Sprintf("%v: %v", s.svc.Cluster.ID(), time.Now())), }) if cerr != nil { ctx.JSON(http.StatusOK, types.Failed(ecode.ErrorCode(cerr.Code), "%v", cerr.Message)) diff --git a/client/internal/publock/config.go b/client/internal/publock/config.go new file mode 100644 index 0000000..8e969c5 --- /dev/null +++ b/client/internal/publock/config.go @@ -0,0 +1,5 @@ +package publock + +type Config struct { + LeaseExpiredSeconds int `json:"leaseExpiredSeconds"` +} diff --git a/client/internal/publock/core.go b/client/internal/publock/core.go index ed2aba2..5d76992 100644 --- a/client/internal/publock/core.go +++ b/client/internal/publock/core.go @@ -1,189 +1,554 @@ package publock import ( - "context" "fmt" + "io" "sync" + "time" - "gitlink.org.cn/cloudream/common/pkgs/future" + "gitlink.org.cn/cloudream/common/pkgs/async" + "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/trie" + "gitlink.org.cn/cloudream/common/pkgs/types" "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/lockprovider" - "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" + pubtypes "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" + "gitlink.org.cn/cloudream/jcs-pub/common/ecode" ) type Core struct { + cfg Config + clster *cluster.Cluster + fsm *ClusterFSM lock *sync.Mutex - provdersTrie *trie.Trie[types.LockProvider] - acquirings []*acquireInfo - acquired map[types.RequestID]types.LockRequest - nextReqID int64 + provdersTrie *trie.Trie[pubtypes.LockProvider] + acquirings []*acquiring // 必须使用数组,因为要保证顺序(集群多个节点的执行结果应该严格相同) + acquireds map[pubtypes.RequestID]*acquired + eventCh *async.UnboundChannel[CoreEvent] + doneCh chan any } -func NewCore() *Core { - svc := &Core{ +func NewCore(cfg Config, clster *cluster.Cluster) *Core { + c := &Core{ + cfg: cfg, + clster: clster, lock: &sync.Mutex{}, - provdersTrie: trie.NewTrie[types.LockProvider](), - acquired: make(map[types.RequestID]types.LockRequest), + provdersTrie: trie.NewTrie[pubtypes.LockProvider](), + acquireds: make(map[pubtypes.RequestID]*acquired), + eventCh: async.NewUnboundChannel[CoreEvent](), + doneCh: make(chan any, 1), + } + c.fsm = &ClusterFSM{ + core: c, } - svc.provdersTrie.Create([]any{lockprovider.UserSpaceLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewUserSpaceLock() - svc.provdersTrie.Create([]any{lockprovider.PackageLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewPackageLock() - return svc + c.provdersTrie.Create([]any{lockprovider.UserSpaceLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewUserSpaceLock() + c.provdersTrie.Create([]any{lockprovider.PackageLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewPackageLock() + return c } -func (s *Core) Start() { - +func (s *Core) Apply(cmd Command) { + switch cmd := cmd.(type) { + case *Acquire: + s.acquire(*cmd) + case *AcquireTimeout: + s.acquireTimeout(*cmd) + case *Release: + s.release(*cmd) + case *LeaseExpired: + s.leaseExpired(*cmd) + case *Renew: + s.renew(*cmd) + } } -func (s *Core) Stop() { +func (s *Core) EventChan() *async.UnboundChannel[CoreEvent] { + return s.eventCh +} +func (s *Core) FSM() cluster.FSM { + return s.fsm } -type acquireInfo struct { - Request types.LockRequest - Callback *future.SetValueFuture[types.RequestID] - LastErr error +func (s *Core) Start() { + log := logger.WithField("Mod", "Publock.Core") + + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + loop: + for { + select { + case <-s.doneCh: + break loop + case <-ticker.C: + var acTimeout []pubtypes.RequestID + var leExpired []pubtypes.RequestID + + s.lock.Lock() + + // 定时清理超时的锁请求 + for _, req := range s.acquirings { + if time.Since(req.StartTime) < req.Cmd.Timeout { + continue + } + + acTimeout = append(acTimeout, req.Cmd.ID) + + } + // 定时清理过期的锁 + for reqID, ac := range s.acquireds { + if ac.ExpireCounter < s.cfg.LeaseExpiredSeconds { + ac.ExpireCounter++ + continue + } + + leExpired = append(leExpired, reqID) + } + + s.lock.Unlock() + + for _, reqID := range acTimeout { + cmd := AcquireTimeout{ + ID: reqID, + } + + if s.clster != nil { + data, err := serder.ObjectToJSONEx(cmd) + if err != nil { + log.Warnf("cmd %T to json: %v", cmd, err) + continue + } + + // 不管是否成功,有定时任务兜底 + s.clster.Apply(s.fsm.ID(), data, time.Second*3) + } else { + s.acquireTimeout(AcquireTimeout{ + ID: reqID, + }) + } + } + + for _, reqID := range leExpired { + cmd := LeaseExpired{ + ID: reqID, + } + + if s.clster != nil { + data, err := serder.ObjectToJSONEx(cmd) + if err != nil { + log.Warnf("cmd %T to json: %v", cmd, err) + continue + } + + // 不管是否成功,有定时任务兜底 + s.clster.Apply(s.fsm.ID(), data, time.Second*3) + } else { + s.leaseExpired(LeaseExpired{ + ID: reqID, + }) + } + } + } + } + }() } -func (svc *Core) Acquire(req types.LockRequest, opt types.AcquireOption) (LockedRequest, error) { - ctx := context.Background() - if opt.Timeout != 0 { - var cancel func() - ctx, cancel = context.WithTimeout(ctx, opt.Timeout) - defer cancel() +func (s *Core) Stop() { + select { + case s.doneCh <- nil: + default: } + s.eventCh.Close() +} - // 就地检测锁是否可用 - svc.lock.Lock() - defer svc.lock.Unlock() +type acquiring struct { + Cmd Acquire + LastErr *ecode.CodeError + // 这个值来自自每个节点自身,所以可能会各不相同。 + // 但这个值只是作为判断获取锁是否超时的依据,所以问题不大。 + StartTime time.Time +} - reqID, err := svc.tryAcquireOne(req) - if err != nil { - return LockedRequest{}, err - } +type acquired struct { + ID pubtypes.RequestID + Req pubtypes.LockRequest + // 这个值用来记录锁经过的过期检查的次数,超过一定次数则认为过期。 + // 因为未通知加锁的服务而释放一个锁是一个危险操作,所以这里采用这种计数的方式来实现过期 + ExpireCounter int +} - if reqID != "" { - svc.acquired[reqID] = req - return LockedRequest{ - Req: req, - ReqID: reqID, - }, nil - } +func (c *Core) acquire(cmd Acquire) { - // 就地检测失败,那么就需要异步等待锁可用 - info := &acquireInfo{ - Request: req, - Callback: future.NewSetValue[types.RequestID](), - } - svc.acquirings = append(svc.acquirings, info) + c.lock.Lock() + defer c.lock.Unlock() - // 等待的时候不加锁 - svc.lock.Unlock() - reqID, err = info.Callback.Wait(ctx) - svc.lock.Lock() - - if err == nil { - svc.acquired[reqID] = req - return LockedRequest{ - Req: req, - ReqID: reqID, - }, nil + // 立刻检测锁是否可用 + cerr := c.tryAcquireOne(cmd.ID, cmd.Request) + if cerr == nil { + err := c.eventCh.Send(&AcquireResult{ + Raw: cmd, + Error: nil, + }) + if err != nil { + panic(err) + } + return } - if err != future.ErrCanceled { - lo2.Remove(svc.acquirings, info) - return LockedRequest{}, err + // 不可用则加入等待列表 + info := &acquiring{ + Cmd: cmd, + LastErr: cerr, + StartTime: time.Now(), } + c.acquirings = append(c.acquirings, info) + go func() { + log := logger.WithField("Mod", "Publock.Core") + <-time.After(cmd.Timeout) - // 如果第一次等待是超时错误,那么在锁里再尝试获取一次结果 - reqID, err = info.Callback.TryGetValue() - if err == nil { - svc.acquired[reqID] = req - return LockedRequest{ - Req: req, - ReqID: reqID, - }, nil - } + ac := AcquireTimeout{ + ID: cmd.ID, + } - lo2.Remove(svc.acquirings, info) - return LockedRequest{}, err + data, err := serder.ObjectToJSONEx(ac) + if err != nil { + log.Warnf("cmd %T to json: %v", cmd, err) + return + } + + // 不管是否成功,有定时任务兜底 + c.clster.Apply(c.fsm.ID(), data, cmd.Timeout) + }() } -func (s *Core) release(reqID types.RequestID) { +func (s *Core) release(cmd Release) { + reqID := cmd.ID + s.lock.Lock() defer s.lock.Unlock() - req, ok := s.acquired[reqID] + ac, ok := s.acquireds[reqID] if !ok { return } - s.releaseRequest(reqID, req) + s.releaseRequest(reqID, ac.Req) + s.eventCh.Send(&Released{ + ID: reqID, + }) s.tryAcquirings() } +func (c *Core) acquireTimeout(cmd AcquireTimeout) { + c.lock.Lock() + defer c.lock.Unlock() + + for i, req := range c.acquirings { + if req.Cmd.ID == cmd.ID { + c.eventCh.Send(&AcquireResult{ + Raw: req.Cmd, + Error: req.LastErr, + }) + c.acquirings = lo2.RemoveAt(c.acquirings, i) + return + } + } +} + +func (c *Core) leaseExpired(cmd LeaseExpired) { + log := logger.WithField("Mod", "Publock.Core") + + c.lock.Lock() + defer c.lock.Unlock() + + ac, ok := c.acquireds[cmd.ID] + if !ok { + return + } + + log.Warnf("lock request %v lease expired", ac.ID) + + c.releaseRequest(ac.ID, ac.Req) + c.tryAcquirings() +} + +func (c *Core) renew(cmd Renew) { + c.lock.Lock() + defer c.lock.Unlock() + + for _, reqID := range cmd.IDs { + ac, ok := c.acquireds[reqID] + if !ok { + continue + } + + ac.ExpireCounter = 0 + } +} + func (a *Core) tryAcquirings() { for i := 0; i < len(a.acquirings); i++ { req := a.acquirings[i] - reqID, err := a.tryAcquireOne(req.Request) + err := a.tryAcquireOne(req.Cmd.ID, req.Cmd.Request) if err != nil { req.LastErr = err continue } - req.Callback.SetValue(reqID) + a.eventCh.Send(&AcquireResult{ + Raw: req.Cmd, + Error: nil, + }) a.acquirings[i] = nil } a.acquirings = lo2.RemoveAllDefault(a.acquirings) } -func (s *Core) tryAcquireOne(req types.LockRequest) (types.RequestID, error) { - err := s.testOneRequest(req) - if err != nil { - return "", err +func (s *Core) tryAcquireOne(reqID pubtypes.RequestID, req pubtypes.LockRequest) *ecode.CodeError { + cerr := s.testOneRequest(req) + if cerr != nil { + return cerr } - reqID := types.RequestID(fmt.Sprintf("%d", s.nextReqID)) - s.nextReqID++ - s.applyRequest(reqID, req) - return reqID, nil + s.acquireds[reqID] = &acquired{ + ID: reqID, + Req: req, + ExpireCounter: 0, + } + return nil } -func (s *Core) testOneRequest(req types.LockRequest) error { +func (s *Core) testOneRequest(req pubtypes.LockRequest) *ecode.CodeError { for _, lock := range req.Locks { n, ok := s.provdersTrie.WalkEnd(lock.Path) if !ok || n.Value == nil { - return fmt.Errorf("lock provider not found for path %v", lock.Path) + return ecode.Newf(ecode.DataNotFound, "lock provider not found for path %v", lock.Path) } err := n.Value.CanLock(lock) if err != nil { - return err + return ecode.Newf(ecode.LockConflict, "%v", err) } } return nil } -func (s *Core) applyRequest(reqID types.RequestID, req types.LockRequest) { +func (s *Core) applyRequest(reqID pubtypes.RequestID, req pubtypes.LockRequest) { for _, lock := range req.Locks { p, _ := s.provdersTrie.WalkEnd(lock.Path) p.Value.Lock(reqID, lock) } } -func (s *Core) releaseRequest(reqID types.RequestID, req types.LockRequest) { +func (s *Core) releaseRequest(reqID pubtypes.RequestID, req pubtypes.LockRequest) { for _, lock := range req.Locks { p, _ := s.provdersTrie.WalkEnd(lock.Path) p.Value.Unlock(reqID, lock) } + delete(s.acquireds, reqID) } type LockedRequest struct { - Req types.LockRequest - ReqID types.RequestID + Req pubtypes.LockRequest + ReqID pubtypes.RequestID +} + +type Command interface { + IsCommand() bool +} + +var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[Command]( + (*Acquire)(nil), + (*Release)(nil), + (*AcquireTimeout)(nil), + (*LeaseExpired)(nil), + (*Renew)(nil), +))) + +type Acquire struct { + ID pubtypes.RequestID + Request pubtypes.LockRequest + Timeout time.Duration + Reason string +} + +func (a *Acquire) IsCommand() bool { + return true +} + +type Release struct { + ID pubtypes.RequestID +} + +func (r *Release) IsCommand() bool { + return true +} + +type AcquireTimeout struct { + ID pubtypes.RequestID } + +func (a *AcquireTimeout) IsCommand() bool { + return true +} + +type LeaseExpired struct { + ID pubtypes.RequestID +} + +func (l *LeaseExpired) IsCommand() bool { + return true +} + +type Renew struct { + IDs []pubtypes.RequestID +} + +func (r *Renew) IsCommand() bool { + return true +} + +type CoreEvent interface { + IsCoreEvent() bool +} + +type AcquireResult struct { + Raw Acquire + Error *ecode.CodeError +} + +func (a *AcquireResult) IsCoreEvent() bool { + return true +} + +type Released struct { + ID pubtypes.RequestID +} + +func (r *Released) IsCoreEvent() bool { + return true +} + +type ClusterFSM struct { + core *Core +} + +func (f *ClusterFSM) ID() string { + return "Publock" +} + +func (f *ClusterFSM) Apply(cmdData []byte) ([]byte, error) { + cmd, err := serder.JSONToObjectEx[Command](cmdData) + if err != nil { + return nil, fmt.Errorf("parse command: %v", err) + } + + f.core.Apply(cmd) + return nil, nil +} + +func (f *ClusterFSM) Snapshot() (cluster.FSMSnapshot, error) { + log := logger.WithField("Mod", "Publock.ClusterFSM") + log.Debugf("make snapshot") + + f.core.lock.Lock() + defer f.core.lock.Unlock() + + acquireds := make([]*acquired, 0, len(f.core.acquireds)) + for _, ac := range f.core.acquireds { + newAc := &acquired{ + ID: ac.ID, + Req: ac.Req, + ExpireCounter: ac.ExpireCounter, + } + acquireds = append(acquireds, newAc) + } + + acquirings := make([]*acquiring, 0, len(f.core.acquirings)) + for _, ac := range f.core.acquirings { + newAc := &acquiring{ + Cmd: ac.Cmd, + LastErr: ac.LastErr, + StartTime: ac.StartTime, + } + acquirings = append(acquirings, newAc) + } + + return &FSMSnapshot{ + Acquireds: acquireds, + Acquirings: acquirings, + }, nil +} + +func (f *ClusterFSM) Restore(input io.Reader) error { + log := logger.WithField("Mod", "Publock.ClusterFSM") + log.Debugf("restore from input") + + snap := &FSMSnapshot{} + err := serder.JSONToObjectStream(input, snap) + if err != nil { + return err + } + + f.core.lock.Lock() + defer f.core.lock.Unlock() + + f.core.provdersTrie.Walk(nil, func(word string, wordIndex int, node *trie.Node[pubtypes.LockProvider], isWordNode bool) { + if node.Value != nil { + node.Value.Clear() + } + }) + + f.core.acquireds = make(map[pubtypes.RequestID]*acquired) + for _, a := range snap.Acquireds { + f.core.applyRequest(a.ID, a.Req) + } + + f.core.acquirings = snap.Acquirings + for _, req := range f.core.acquirings { + // 已经超时的请求不启动精确的定时任务 + if time.Since(req.StartTime) > req.Cmd.Timeout { + continue + } + + go func() { + <-time.After(req.Cmd.Timeout - time.Since(req.StartTime)) + cmd := AcquireTimeout{ + ID: req.Cmd.ID, + } + + data, err := serder.ObjectToJSONEx(cmd) + if err != nil { + log.Warnf("cmd %T to json: %v", cmd, err) + return + } + + // 不管是否成功,有定时任务兜底 + f.core.clster.Apply(f.core.fsm.ID(), data, req.Cmd.Timeout) + }() + } + + return nil +} + +type FSMSnapshot struct { + Acquireds []*acquired + Acquirings []*acquiring +} + +func (s *FSMSnapshot) Persist(output io.Writer) error { + rc := serder.ObjectToJSONStream(s) + defer rc.Close() + _, err := io.Copy(output, rc) + return err +} + +func (s *FSMSnapshot) Release() {} diff --git a/client/internal/publock/service.go b/client/internal/publock/service.go index f3a59b1..b2e7cb8 100644 --- a/client/internal/publock/service.go +++ b/client/internal/publock/service.go @@ -2,13 +2,17 @@ package publock import ( "context" - "fmt" "sync" "time" + "github.com/google/uuid" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/utils/lo2" + "gitlink.org.cn/cloudream/common/utils/serder" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types" + "gitlink.org.cn/cloudream/jcs-pub/common/ecode" clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" ) @@ -28,26 +32,19 @@ func WithReason(reason string) AcquireOptionFn { type PubLock struct { core *Core - cliCli *clirpc.PoolClient - pubChan clirpc.PubLockMessageChan + clster *cluster.Cluster lock sync.Mutex - acquirings map[string]*acquireInfo - releasing map[string]*releaseInfo - nextCtxID int64 + acquirings map[types.RequestID]*svcAcquiring + acquired []types.RequestID + releasing map[types.RequestID]*svcReleasing } -func NewMaster() *PubLock { - core := NewCore() +func New(cfg Config, clster *cluster.Cluster) *PubLock { return &PubLock{ - core: core, - } -} - -func NewSlave(cli *clirpc.PoolClient) *PubLock { - return &PubLock{ - cliCli: cli, - acquirings: make(map[string]*acquireInfo), - releasing: make(map[string]*releaseInfo), + core: NewCore(cfg, clster), + clster: clster, + acquirings: make(map[types.RequestID]*svcAcquiring), + releasing: make(map[types.RequestID]*svcReleasing), } } @@ -67,69 +64,145 @@ func (p *PubLock) BeginMutex() *MutexBuilder { return m } -func (p *PubLock) Start() { - if p.core != nil { - p.core.Start() - return - } +func (p *PubLock) Core() *Core { + return p.core } -func (p *PubLock) Stop() { - p.lock.Lock() - defer p.lock.Unlock() +func (p *PubLock) Start() { + log := logger.WithField("Mod", "PubLock") - if p.core != nil { - p.core.Stop() - return - } + go func() { + ch := p.core.EventChan() + evt := ch.Receive() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + loop: + for { + select { + case <-ticker.C: + p.lock.Lock() + reqIDs := make([]types.RequestID, len(p.acquired)) + copy(reqIDs, p.acquired) + p.lock.Unlock() + + if len(reqIDs) == 0 { + continue + } + + cmd := Renew{ + IDs: reqIDs, + } + if p.clster == nil { + p.core.Apply(&cmd) + } else { + data, err := serder.ObjectToJSONEx(cmd) + if err != nil { + log.Warnf("cmd %T to json: %v", cmd, err) + continue + } + + _, cerr := p.clster.MasterClient().ClusterApplyLog(context.Background(), &clirpc.ClusterApplyLog{ + FSMID: p.core.FSM().ID(), + Data: data, + Timeout: time.Second * 3, + }) + if cerr != nil { + log.Errorf("apply renew: %v", cerr) + } + } + + case ret := <-evt.Chan(): + if ret.Err != nil { + break loop + } + + p.lock.Lock() + switch e := ret.Value.(type) { + case *AcquireResult: + a, ok := p.acquirings[e.Raw.ID] + if !ok { + break + } + + if e.Error == nil { + a.Callback.SetVoid() + p.acquired = append(p.acquired, e.Raw.ID) + } else { + a.Callback.SetError(e.Error) + } + delete(p.acquirings, e.Raw.ID) + + case *Released: + r, ok := p.releasing[e.ID] + if !ok { + break + } + + r.Callback.SetVoid() + p.acquired = lo2.Remove(p.acquired, e.ID) + delete(p.releasing, e.ID) + + } + p.lock.Unlock() + + evt = ch.Receive() + } + } + }() - if p.pubChan != nil { - p.pubChan.Close() - p.pubChan = nil - } + p.core.Start() +} - p.cliCli.Release() - p.cliCli = nil +func (p *PubLock) Stop() { + p.core.Stop() } -func (p *PubLock) Acquire(req types.LockRequest, opt types.AcquireOption) (LockedRequest, error) { +func (p *PubLock) Acquire(req types.LockRequest, opt types.AcquireOption) (LockedRequest, *ecode.CodeError) { p.lock.Lock() - if p.core != nil { - p.lock.Unlock() - return p.core.Acquire(req, opt) + cmd := Acquire{ + ID: types.RequestID(uuid.NewString()), + Request: req, + Timeout: opt.Timeout, + Reason: opt.Reason, } - if p.pubChan == nil { - p.pubChan = p.cliCli.PubLockChannel(context.Background()) - go p.receivingChan() + ac := &svcAcquiring{ + RequestID: cmd.ID, + Request: cmd.Request, + Callback: future.NewSetVoid(), } + p.acquirings[cmd.ID] = ac + p.lock.Unlock() - acqID := fmt.Sprintf("%v", p.nextCtxID) - p.nextCtxID++ - - cerr := p.pubChan.Send(&types.AcquireMsg{ContextID: acqID, Request: req, Option: opt}) - if cerr != nil { - p.lock.Unlock() - return LockedRequest{}, cerr.ToError() - } + if p.clster == nil { + p.core.Apply(&cmd) + } else { + data, err := serder.ObjectToJSONEx(cmd) + if err != nil { + return LockedRequest{}, ecode.Newf(ecode.OperationFailed, "cmd %T to json: %v", cmd, err) + } - callback := future.NewSetValue[types.RequestID]() - info := &acquireInfo{ - Request: req, - Callback: callback, + _, cerr := p.clster.MasterClient().ClusterApplyLog(context.Background(), &clirpc.ClusterApplyLog{ + FSMID: p.core.FSM().ID(), + Data: data, + Timeout: opt.Timeout, + }) + if cerr != nil { + return LockedRequest{}, ecode.New(ecode.ErrorCode(cerr.Code), cerr.Message) + } } - p.acquirings[acqID] = info - p.lock.Unlock() - reqID, err := callback.Wait(context.Background()) + err := ac.Callback.Wait(context.Background()) if err != nil { - return LockedRequest{}, err + return LockedRequest{}, ecode.Newf(ecode.OperationFailed, "wait acquire: %v", err) } return LockedRequest{ Req: req, - ReqID: reqID, + ReqID: ac.RequestID, }, nil } @@ -137,97 +210,50 @@ func (p *PubLock) Release(reqID types.RequestID) { log := logger.WithField("Mod", "PubLock") p.lock.Lock() - - if p.core != nil { - p.lock.Unlock() - p.core.release(reqID) - return + cmd := Release{ + ID: reqID, } - - if p.pubChan == nil { - p.pubChan = p.cliCli.PubLockChannel(context.Background()) - go p.receivingChan() - } - - relID := fmt.Sprintf("%v", p.nextCtxID) - p.nextCtxID++ - - cerr := p.pubChan.Send(&types.ReleaseMsg{ContextID: relID, RequestID: reqID}) - if cerr != nil { - p.lock.Unlock() - log.Warnf("unlock %v: %v", reqID, cerr.ToError()) - return + r := &svcReleasing{ + RequestID: cmd.ID, + Callback: future.NewSetVoid(), } - - callback := future.NewSetVoid() - info := &releaseInfo{ - RequestID: reqID, - Callback: callback, - } - p.releasing[relID] = info + p.releasing[cmd.ID] = r p.lock.Unlock() - err := callback.Wait(context.Background()) - if err != nil { - log.Warnf("unlock %v: %v", reqID, err) - return - } - - log.Tracef("unlock %v", reqID) -} - -func (p *PubLock) receivingChan() { - log := logger.WithField("Mod", "PubLock") - for { - msg, cerr := p.pubChan.Receive() - if cerr != nil { - p.lock.Lock() - for _, info := range p.acquirings { - info.Callback.SetError(cerr.ToError()) - } - p.acquirings = make(map[string]*acquireInfo) - - for _, info := range p.releasing { - info.Callback.SetError(cerr.ToError()) - } - p.releasing = make(map[string]*releaseInfo) - p.lock.Unlock() - - log.Warnf("receive channel error: %v", cerr.ToError()) + if p.clster == nil { + p.core.Apply(&cmd) + } else { + data, err := serder.ObjectToJSONEx(cmd) + if err != nil { + log.Warnf("cmd %T to json: %v", cmd, err) return } - p.lock.Lock() - - switch msg := msg.(type) { - case *types.AcquireResultMsg: - info, ok := p.acquirings[msg.ContextID] - if !ok { - continue - } - - if msg.Success { - info.Callback.SetValue(msg.RequestID) - } else { - info.Callback.SetError(fmt.Errorf(msg.Error)) - } - delete(p.acquirings, msg.ContextID) - - case *types.ReleaseResultMsg: - info, ok := p.releasing[msg.ContextID] - if !ok { - continue - } - - info.Callback.SetVoid() - delete(p.releasing, msg.ContextID) + _, cerr := p.clster.MasterClient().ClusterApplyLog(context.Background(), &clirpc.ClusterApplyLog{ + FSMID: p.core.FSM().ID(), + Data: data, + Timeout: 0, + }) + if cerr != nil { + log.Errorf("apply release: %v", cerr) } + } - p.lock.Unlock() + err := r.Callback.Wait(context.Background()) + if err != nil { + log.Errorf("wait release: %v", err) + } else { + log.Tracef("unlock %v", reqID) } } -type releaseInfo struct { +type svcAcquiring struct { + RequestID types.RequestID + Request types.LockRequest + Callback *future.SetVoidFuture +} + +type svcReleasing struct { RequestID types.RequestID Callback *future.SetVoidFuture } diff --git a/client/internal/rpc/cluster.go b/client/internal/rpc/cluster.go index e854bdc..4890f41 100644 --- a/client/internal/rpc/cluster.go +++ b/client/internal/rpc/cluster.go @@ -21,10 +21,12 @@ func (s *Service) ClusterRaftInstallSnapshot(ctx context.Context, msg *clirpc.Cl } func (s *Service) ClusterApplyLog(ctx context.Context, msg *clirpc.ClusterApplyLog) (*clirpc.ClusterApplyLogResp, *rpc.CodeError) { - err := s.cluster.Apply(msg.Data, msg.Timeout) + ret, err := s.cluster.Apply(msg.FSMID, msg.Data, msg.Timeout) if err != nil { return nil, rpc.Failed(ecode.OperationFailed, "%v", err) } - return &clirpc.ClusterApplyLogResp{}, nil + return &clirpc.ClusterApplyLogResp{ + Result: ret, + }, nil } diff --git a/common/assets/confs/client.config.json b/common/assets/confs/client.config.json index cfabf66..5f80dad 100644 --- a/common/assets/confs/client.config.json +++ b/common/assets/confs/client.config.json @@ -82,5 +82,8 @@ "clientCert": "", "clientKey": "", "storeBase": "" + }, + "publock": { + "leaseExpiredSeconds": 5 } } \ No newline at end of file diff --git a/common/ecode/ecode.go b/common/ecode/ecode.go index 0761ecd..e2f6831 100644 --- a/common/ecode/ecode.go +++ b/common/ecode/ecode.go @@ -12,4 +12,5 @@ const ( Unauthorized ErrorCode = "Unauthorized" ChannelClosed ErrorCode = "ChannelClosed" ClusterNoMaster ErrorCode = "ClusterNoMaster" + LockConflict ErrorCode = "LockConflict" ) diff --git a/common/pkgs/rpc/client/cluster.go b/common/pkgs/rpc/client/cluster.go index c2624a5..0595318 100644 --- a/common/pkgs/rpc/client/cluster.go +++ b/common/pkgs/rpc/client/cluster.go @@ -84,10 +84,13 @@ func (s *Server) ClusterRaftInstallSnapshot(req Client_ClusterRaftInstallSnapsho } type ClusterApplyLog struct { - Data string + FSMID string + Data []byte Timeout time.Duration } -type ClusterApplyLogResp struct{} +type ClusterApplyLogResp struct { + Result []byte +} func (c *Client) ClusterApplyLog(ctx context.Context, msg *ClusterApplyLog) (*ClusterApplyLogResp, *rpc.CodeError) { if c.fusedErr != nil {