Browse Source

重写集群模式下的全局锁

master
Sydonian 2 months ago
parent
commit
73de194e40
15 changed files with 811 additions and 279 deletions
  1. +25
    -7
      client/internal/cluster/cluster.go
  2. +128
    -27
      client/internal/cluster/fsm.go
  3. +4
    -0
      client/internal/cmdline/init.go
  4. +4
    -3
      client/internal/cmdline/serve.go
  5. +4
    -4
      client/internal/cmdline/test.go
  6. +4
    -3
      client/internal/cmdline/vfstest.go
  7. +2
    -0
      client/internal/config/config.go
  8. +1
    -1
      client/internal/http/v1/cluster.go
  9. +5
    -0
      client/internal/publock/config.go
  10. +457
    -92
      client/internal/publock/core.go
  11. +164
    -138
      client/internal/publock/service.go
  12. +4
    -2
      client/internal/rpc/cluster.go
  13. +3
    -0
      common/assets/confs/client.config.json
  14. +1
    -0
      common/ecode/ecode.go
  15. +5
    -2
      common/pkgs/rpc/client/cluster.go

+ 25
- 7
client/internal/cluster/cluster.go View File

@@ -2,6 +2,7 @@ package cluster

import (
"crypto/tls"
"encoding/binary"
"fmt"
"os"
"path/filepath"
@@ -24,9 +25,10 @@ type Cluster struct {
cfg Config
poolCfg clirpc.PoolConfig
masterCli MasterClient
doneCh chan any
raft *raft.Raft
fsm *raftFSM
transport *Transport
doneCh chan any
}

func New(cfg Config) *Cluster {
@@ -39,7 +41,7 @@ func New(cfg Config) *Cluster {
}
}

func (c *Cluster) Start() (*ClusterEventChan, error) {
func (c *Cluster) Start(fsms []FSM) (*ClusterEventChan, error) {
log := logger.WithField("Mod", "Cluster")

ch := async.NewUnboundChannel[ClusterEvent]()
@@ -48,6 +50,8 @@ func (c *Cluster) Start() (*ClusterEventChan, error) {
return ch, nil
}

c.fsm = NewFSM(fsms)

poolCfgJSON := clirpc.PoolConfigJSON{
RootCA: c.cfg.RootCA,
ClientCert: c.cfg.ClientCert,
@@ -83,11 +87,9 @@ func (c *Cluster) Start() (*ClusterEventChan, error) {
return nil, fmt.Errorf("create raft snapshot store: %w", err)
}

fsm := NewFSM()

c.transport = NewTransport(c.cfg.Announce, *poolCfg)

rft, err := raft.NewRaft(raftCfg, fsm, logDB, stableDB, snapshotStore, c.transport)
rft, err := raft.NewRaft(raftCfg, c.fsm, logDB, stableDB, snapshotStore, c.transport)
if err != nil {
return nil, fmt.Errorf("create raft: %w", err)
}
@@ -248,8 +250,24 @@ func (c *Cluster) RaftTransport() *Transport {
}

// 只有Leader才能调用
func (c *Cluster) Apply(data string, timeout time.Duration) error {
return c.raft.Apply([]byte(data), timeout).Error()
func (c *Cluster) Apply(fsmID string, data []byte, timeout time.Duration) ([]byte, error) {
fsmIDBytes := []byte(fsmID)

logBytes := make([]byte, 4+len(fsmIDBytes)+len(data))

// 前4个字节表示ID的长度,后面跟着ID和数据
binary.LittleEndian.PutUint32(logBytes[:4], uint32(len(fsmIDBytes)))
copy(logBytes[4:], fsmIDBytes)
copy(logBytes[4+len(fsmIDBytes):], data)

fut := c.raft.Apply(logBytes, timeout)
err := fut.Error()
if err != nil {
return nil, err
}

applyRet := fut.Response().(applyResult)
return applyRet.Result, applyRet.Error
}

type ClusterEvent interface {


+ 128
- 27
client/internal/cluster/fsm.go View File

@@ -1,67 +1,168 @@
package cluster

import (
"encoding/binary"
"fmt"
"io"
"sync"

"github.com/hashicorp/raft"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/common/utils/http2"
"gitlink.org.cn/cloudream/common/utils/serder"
)

type FSM struct {
data string
lock sync.Mutex
var ErrFSMNotFound = fmt.Errorf("fsm not found")

type FSM interface {
// 必须唯一且不能变化
ID() string
Apply(cmd []byte) ([]byte, error)
Snapshot() (FSMSnapshot, error)
Restore(input io.Reader) error
}

func NewFSM() *FSM {
return &FSM{data: ""}
type FSMSnapshot interface {
Persist(output io.Writer) error
Release()
}

func (f *FSM) Apply(l *raft.Log) interface{} {
fmt.Printf("log: %v\n", string(l.Data))
type SnapshotMeta struct {
Version int
}

f.lock.Lock()
defer f.lock.Unlock()
type raftFSM struct {
fsms map[string]FSM
}

f.data = f.data + "\n" + string(l.Data)
return nil
func NewFSM(fsms []FSM) *raftFSM {
fsmsMp := make(map[string]FSM)
for _, fsm := range fsms {
_, ok := fsmsMp[fsm.ID()]
if ok {
panic(fmt.Sprintf("duplicate fsm id: %s", fsm.ID()))
}
fsmsMp[fsm.ID()] = fsm
}

return &raftFSM{fsms: fsmsMp}
}

func (f *FSM) Snapshot() (raft.FSMSnapshot, error) {
f.lock.Lock()
defer f.lock.Unlock()
func (f *raftFSM) Apply(l *raft.Log) interface{} {
idLen := binary.LittleEndian.Uint32(l.Data[:4])
fsm, ok := f.fsms[string(l.Data[4:4+idLen])]
if !ok {
return ErrFSMNotFound
}

return &Snapshot{data: f.data}, nil
cmd := l.Data[4+idLen:]
res, err := fsm.Apply(cmd)
return applyResult{
Result: res,
Error: err,
}
}

func (f *raftFSM) Snapshot() (raft.FSMSnapshot, error) {
snapshots := make(map[string]FSMSnapshot)
for id, fsm := range f.fsms {
snapshot, err := fsm.Snapshot()
if err != nil {
for _, snapshot := range snapshots {
snapshot.Release()
}
return nil, err
}

snapshots[id] = snapshot
}

return &Snapshot{snapshots: snapshots}, nil
}

func (f *FSM) Restore(rc io.ReadCloser) error {
data, err := io.ReadAll(rc)
func (f *raftFSM) Restore(rc io.ReadCloser) error {
defer rc.Close()

cr := http2.NewChunkedReader(rc)

_, metaBytes, err := cr.NextDataPart()
if err != nil {
return err
return fmt.Errorf("read meta data part: %v", err)
}
meta := SnapshotMeta{}
if err := serder.JSONToObject(metaBytes, &meta); err != nil {
return fmt.Errorf("unmarshal meta data: %v", err)
}
// 进行类似的检查
if meta.Version != 1 {
return fmt.Errorf("unsupported version: %d", meta.Version)
}

f.lock.Lock()
defer f.lock.Unlock()
for {
id, reader, err := cr.NextPart()
if err != nil && err != io.EOF {
return err
}

if err == io.EOF {
// TODO 考虑检查一下是否调用了所有FSM的Restore方法
break
}

fsm, ok := f.fsms[id]
if !ok {
// TODO 兼容性
continue
}

err = fsm.Restore(reader)
if err != nil {
// TODO 不知道Raft库在发现Restore失败后是否能够及时停止服务
return fmt.Errorf("restore fsm %s: %v", id, err)
}
}

f.data = string(data)
return nil
}

var _ raft.FSM = (*FSM)(nil)
var _ raft.FSM = (*raftFSM)(nil)

type Snapshot struct {
data string
snapshots map[string]FSMSnapshot
}

func (s *Snapshot) Persist(sink raft.SnapshotSink) error {
err := io2.WriteAll(sink, []byte(s.data))
meta := SnapshotMeta{Version: 1}
metaBytes, err := serder.ObjectToJSON(meta)
if err != nil {
sink.Cancel()
return fmt.Errorf("marshal meta data: %v", err)
}

cw := http2.NewChunkedWriter(sink)
err = cw.WriteDataPart("meta", metaBytes)
if err != nil {
return sink.Cancel()
sink.Cancel()
return fmt.Errorf("write meta data part: %v", err)
}

for id, snapshot := range s.snapshots {
w := cw.BeginPart(id)
err := snapshot.Persist(w)
if err != nil {
sink.Cancel()
return fmt.Errorf("persist fsm %s: %v", id, err)
}
}

return sink.Close()
}

func (s *Snapshot) Release() {
for _, snapshot := range s.snapshots {
snapshot.Release()
}
}

type applyResult struct {
Result []byte
Error error
}

+ 4
- 0
client/internal/cmdline/init.go View File

@@ -20,6 +20,7 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http"
mntcfg "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/config"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock"
corrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/coordinator"
hubrpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/hub"
@@ -351,6 +352,9 @@ func saveConfig(cfg *clicfg.Config, configPath string) error {
CacheExpireTime: time.Minute * 1,
ScanDataDirInterval: time.Minute * 10,
}
cfg.PubLock = publock.Config{
LeaseExpiredSeconds: 5,
}

configData, err := json.MarshalIndent(cfg, "", " ")
if err != nil {


+ 4
- 3
client/internal/cmdline/serve.go View File

@@ -178,15 +178,16 @@ func serveHTTP(configPath string, opts serveHTTPOptions) {

// 集群模式
clster := cluster.New(config.Cfg().Cluster)
clsterCh, err := clster.Start()
// 公共锁
publock := publock.New(config.Cfg().PubLock, clster)

clsterCh, err := clster.Start([]cluster.FSM{publock.Core().FSM()})
if err != nil {
logger.Errorf("start cluster: %v", err)
os.Exit(1)
}
defer clster.Stop()

// 公共锁
publock := publock.NewMaster()
publock.Start()
defer publock.Stop()



+ 4
- 4
client/internal/cmdline/test.go View File

@@ -186,18 +186,18 @@ func test(configPath string) {

// 集群模式
clster := cluster.New(config.Cfg().Cluster)
clsterCh, err := clster.Start()
// 公共锁
publock := publock.New(config.Cfg().PubLock, clster)

clsterCh, err := clster.Start([]cluster.FSM{publock.Core().FSM()})
if err != nil {
logger.Errorf("start cluster: %v", err)
os.Exit(1)
}
defer clster.Stop()

// 公共锁
publock := publock.NewMaster()
publock.Start()
defer publock.Stop()

// 访问统计
acStat := accessstat.NewAccessStat(accessstat.Config{
// TODO 考虑放到配置里


+ 4
- 3
client/internal/cmdline/vfstest.go View File

@@ -166,15 +166,16 @@ func vfsTest(configPath string, opts serveHTTPOptions) {

// 集群模式
clster := cluster.New(config.Cfg().Cluster)
clsterCh, err := clster.Start()
// 公共锁
publock := publock.New(config.Cfg().PubLock, clster)

clsterCh, err := clster.Start([]cluster.FSM{publock.Core().FSM()})
if err != nil {
logger.Errorf("start cluster: %v", err)
os.Exit(1)
}
defer clster.Stop()

// 公共锁
publock := publock.NewMaster()
publock.Start()
defer publock.Stop()



+ 2
- 0
client/internal/config/config.go View File

@@ -10,6 +10,7 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/client/internal/downloader/strategy"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http"
mntcfg "gitlink.org.cn/cloudream/jcs-pub/client/internal/mount/config"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/ticktock"
stgglb "gitlink.org.cn/cloudream/jcs-pub/common/globals"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/connectivity"
@@ -35,6 +36,7 @@ type Config struct {
Mount *mntcfg.Config `json:"mount"`
AccessToken *accesstoken.Config `json:"accessToken"`
Cluster cluster.Config `json:"cluster"`
PubLock publock.Config `json:"publock"`
}

var cfg Config


+ 1
- 1
client/internal/http/v1/cluster.go View File

@@ -35,7 +35,7 @@ func (s *ClusterService) Status(ctx *gin.Context) {
}

_, cerr := s.svc.Cluster.MasterClient().ClusterApplyLog(context.TODO(), &clirpc.ClusterApplyLog{
Data: fmt.Sprintf("%v: %v", s.svc.Cluster.ID(), time.Now()),
Data: []byte(fmt.Sprintf("%v: %v", s.svc.Cluster.ID(), time.Now())),
})
if cerr != nil {
ctx.JSON(http.StatusOK, types.Failed(ecode.ErrorCode(cerr.Code), "%v", cerr.Message))


+ 5
- 0
client/internal/publock/config.go View File

@@ -0,0 +1,5 @@
package publock

type Config struct {
LeaseExpiredSeconds int `json:"leaseExpiredSeconds"`
}

+ 457
- 92
client/internal/publock/core.go View File

@@ -1,189 +1,554 @@
package publock

import (
"context"
"fmt"
"io"
"sync"
"time"

"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/async"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/trie"
"gitlink.org.cn/cloudream/common/pkgs/types"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/lockprovider"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types"
pubtypes "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types"
"gitlink.org.cn/cloudream/jcs-pub/common/ecode"
)

type Core struct {
cfg Config
clster *cluster.Cluster
fsm *ClusterFSM
lock *sync.Mutex
provdersTrie *trie.Trie[types.LockProvider]
acquirings []*acquireInfo
acquired map[types.RequestID]types.LockRequest
nextReqID int64
provdersTrie *trie.Trie[pubtypes.LockProvider]
acquirings []*acquiring // 必须使用数组,因为要保证顺序(集群多个节点的执行结果应该严格相同)
acquireds map[pubtypes.RequestID]*acquired
eventCh *async.UnboundChannel[CoreEvent]
doneCh chan any
}

func NewCore() *Core {
svc := &Core{
func NewCore(cfg Config, clster *cluster.Cluster) *Core {
c := &Core{
cfg: cfg,
clster: clster,
lock: &sync.Mutex{},
provdersTrie: trie.NewTrie[types.LockProvider](),
acquired: make(map[types.RequestID]types.LockRequest),
provdersTrie: trie.NewTrie[pubtypes.LockProvider](),
acquireds: make(map[pubtypes.RequestID]*acquired),
eventCh: async.NewUnboundChannel[CoreEvent](),
doneCh: make(chan any, 1),
}
c.fsm = &ClusterFSM{
core: c,
}

svc.provdersTrie.Create([]any{lockprovider.UserSpaceLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewUserSpaceLock()
svc.provdersTrie.Create([]any{lockprovider.PackageLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewPackageLock()
return svc
c.provdersTrie.Create([]any{lockprovider.UserSpaceLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewUserSpaceLock()
c.provdersTrie.Create([]any{lockprovider.PackageLockPathPrefix, trie.WORD_ANY}).Value = lockprovider.NewPackageLock()
return c
}

func (s *Core) Start() {

func (s *Core) Apply(cmd Command) {
switch cmd := cmd.(type) {
case *Acquire:
s.acquire(*cmd)
case *AcquireTimeout:
s.acquireTimeout(*cmd)
case *Release:
s.release(*cmd)
case *LeaseExpired:
s.leaseExpired(*cmd)
case *Renew:
s.renew(*cmd)
}
}

func (s *Core) Stop() {
func (s *Core) EventChan() *async.UnboundChannel[CoreEvent] {
return s.eventCh
}

func (s *Core) FSM() cluster.FSM {
return s.fsm
}

type acquireInfo struct {
Request types.LockRequest
Callback *future.SetValueFuture[types.RequestID]
LastErr error
func (s *Core) Start() {
log := logger.WithField("Mod", "Publock.Core")

go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

loop:
for {
select {
case <-s.doneCh:
break loop
case <-ticker.C:
var acTimeout []pubtypes.RequestID
var leExpired []pubtypes.RequestID

s.lock.Lock()

// 定时清理超时的锁请求
for _, req := range s.acquirings {
if time.Since(req.StartTime) < req.Cmd.Timeout {
continue
}

acTimeout = append(acTimeout, req.Cmd.ID)

}
// 定时清理过期的锁
for reqID, ac := range s.acquireds {
if ac.ExpireCounter < s.cfg.LeaseExpiredSeconds {
ac.ExpireCounter++
continue
}

leExpired = append(leExpired, reqID)
}

s.lock.Unlock()

for _, reqID := range acTimeout {
cmd := AcquireTimeout{
ID: reqID,
}

if s.clster != nil {
data, err := serder.ObjectToJSONEx(cmd)
if err != nil {
log.Warnf("cmd %T to json: %v", cmd, err)
continue
}

// 不管是否成功,有定时任务兜底
s.clster.Apply(s.fsm.ID(), data, time.Second*3)
} else {
s.acquireTimeout(AcquireTimeout{
ID: reqID,
})
}
}

for _, reqID := range leExpired {
cmd := LeaseExpired{
ID: reqID,
}

if s.clster != nil {
data, err := serder.ObjectToJSONEx(cmd)
if err != nil {
log.Warnf("cmd %T to json: %v", cmd, err)
continue
}

// 不管是否成功,有定时任务兜底
s.clster.Apply(s.fsm.ID(), data, time.Second*3)
} else {
s.leaseExpired(LeaseExpired{
ID: reqID,
})
}
}
}
}
}()
}

func (svc *Core) Acquire(req types.LockRequest, opt types.AcquireOption) (LockedRequest, error) {
ctx := context.Background()
if opt.Timeout != 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, opt.Timeout)
defer cancel()
func (s *Core) Stop() {
select {
case s.doneCh <- nil:
default:
}
s.eventCh.Close()
}

// 就地检测锁是否可用
svc.lock.Lock()
defer svc.lock.Unlock()
type acquiring struct {
Cmd Acquire
LastErr *ecode.CodeError
// 这个值来自自每个节点自身,所以可能会各不相同。
// 但这个值只是作为判断获取锁是否超时的依据,所以问题不大。
StartTime time.Time
}

reqID, err := svc.tryAcquireOne(req)
if err != nil {
return LockedRequest{}, err
}
type acquired struct {
ID pubtypes.RequestID
Req pubtypes.LockRequest
// 这个值用来记录锁经过的过期检查的次数,超过一定次数则认为过期。
// 因为未通知加锁的服务而释放一个锁是一个危险操作,所以这里采用这种计数的方式来实现过期
ExpireCounter int
}

if reqID != "" {
svc.acquired[reqID] = req
return LockedRequest{
Req: req,
ReqID: reqID,
}, nil
}
func (c *Core) acquire(cmd Acquire) {

// 就地检测失败,那么就需要异步等待锁可用
info := &acquireInfo{
Request: req,
Callback: future.NewSetValue[types.RequestID](),
}
svc.acquirings = append(svc.acquirings, info)
c.lock.Lock()
defer c.lock.Unlock()

// 等待的时候不加锁
svc.lock.Unlock()
reqID, err = info.Callback.Wait(ctx)
svc.lock.Lock()

if err == nil {
svc.acquired[reqID] = req
return LockedRequest{
Req: req,
ReqID: reqID,
}, nil
// 立刻检测锁是否可用
cerr := c.tryAcquireOne(cmd.ID, cmd.Request)
if cerr == nil {
err := c.eventCh.Send(&AcquireResult{
Raw: cmd,
Error: nil,
})
if err != nil {
panic(err)
}
return
}

if err != future.ErrCanceled {
lo2.Remove(svc.acquirings, info)
return LockedRequest{}, err
// 不可用则加入等待列表
info := &acquiring{
Cmd: cmd,
LastErr: cerr,
StartTime: time.Now(),
}
c.acquirings = append(c.acquirings, info)
go func() {
log := logger.WithField("Mod", "Publock.Core")
<-time.After(cmd.Timeout)

// 如果第一次等待是超时错误,那么在锁里再尝试获取一次结果
reqID, err = info.Callback.TryGetValue()
if err == nil {
svc.acquired[reqID] = req
return LockedRequest{
Req: req,
ReqID: reqID,
}, nil
}
ac := AcquireTimeout{
ID: cmd.ID,
}

lo2.Remove(svc.acquirings, info)
return LockedRequest{}, err
data, err := serder.ObjectToJSONEx(ac)
if err != nil {
log.Warnf("cmd %T to json: %v", cmd, err)
return
}

// 不管是否成功,有定时任务兜底
c.clster.Apply(c.fsm.ID(), data, cmd.Timeout)
}()
}

func (s *Core) release(reqID types.RequestID) {
func (s *Core) release(cmd Release) {
reqID := cmd.ID

s.lock.Lock()
defer s.lock.Unlock()

req, ok := s.acquired[reqID]
ac, ok := s.acquireds[reqID]
if !ok {
return
}

s.releaseRequest(reqID, req)
s.releaseRequest(reqID, ac.Req)
s.eventCh.Send(&Released{
ID: reqID,
})
s.tryAcquirings()
}

func (c *Core) acquireTimeout(cmd AcquireTimeout) {
c.lock.Lock()
defer c.lock.Unlock()

for i, req := range c.acquirings {
if req.Cmd.ID == cmd.ID {
c.eventCh.Send(&AcquireResult{
Raw: req.Cmd,
Error: req.LastErr,
})
c.acquirings = lo2.RemoveAt(c.acquirings, i)
return
}
}
}

func (c *Core) leaseExpired(cmd LeaseExpired) {
log := logger.WithField("Mod", "Publock.Core")

c.lock.Lock()
defer c.lock.Unlock()

ac, ok := c.acquireds[cmd.ID]
if !ok {
return
}

log.Warnf("lock request %v lease expired", ac.ID)

c.releaseRequest(ac.ID, ac.Req)
c.tryAcquirings()
}

func (c *Core) renew(cmd Renew) {
c.lock.Lock()
defer c.lock.Unlock()

for _, reqID := range cmd.IDs {
ac, ok := c.acquireds[reqID]
if !ok {
continue
}

ac.ExpireCounter = 0
}
}

func (a *Core) tryAcquirings() {
for i := 0; i < len(a.acquirings); i++ {
req := a.acquirings[i]

reqID, err := a.tryAcquireOne(req.Request)
err := a.tryAcquireOne(req.Cmd.ID, req.Cmd.Request)
if err != nil {
req.LastErr = err
continue
}

req.Callback.SetValue(reqID)
a.eventCh.Send(&AcquireResult{
Raw: req.Cmd,
Error: nil,
})
a.acquirings[i] = nil
}

a.acquirings = lo2.RemoveAllDefault(a.acquirings)
}

func (s *Core) tryAcquireOne(req types.LockRequest) (types.RequestID, error) {
err := s.testOneRequest(req)
if err != nil {
return "", err
func (s *Core) tryAcquireOne(reqID pubtypes.RequestID, req pubtypes.LockRequest) *ecode.CodeError {
cerr := s.testOneRequest(req)
if cerr != nil {
return cerr
}

reqID := types.RequestID(fmt.Sprintf("%d", s.nextReqID))
s.nextReqID++

s.applyRequest(reqID, req)
return reqID, nil
s.acquireds[reqID] = &acquired{
ID: reqID,
Req: req,
ExpireCounter: 0,
}
return nil
}

func (s *Core) testOneRequest(req types.LockRequest) error {
func (s *Core) testOneRequest(req pubtypes.LockRequest) *ecode.CodeError {
for _, lock := range req.Locks {
n, ok := s.provdersTrie.WalkEnd(lock.Path)
if !ok || n.Value == nil {
return fmt.Errorf("lock provider not found for path %v", lock.Path)
return ecode.Newf(ecode.DataNotFound, "lock provider not found for path %v", lock.Path)
}

err := n.Value.CanLock(lock)
if err != nil {
return err
return ecode.Newf(ecode.LockConflict, "%v", err)
}
}

return nil
}

func (s *Core) applyRequest(reqID types.RequestID, req types.LockRequest) {
func (s *Core) applyRequest(reqID pubtypes.RequestID, req pubtypes.LockRequest) {
for _, lock := range req.Locks {
p, _ := s.provdersTrie.WalkEnd(lock.Path)
p.Value.Lock(reqID, lock)
}
}

func (s *Core) releaseRequest(reqID types.RequestID, req types.LockRequest) {
func (s *Core) releaseRequest(reqID pubtypes.RequestID, req pubtypes.LockRequest) {
for _, lock := range req.Locks {
p, _ := s.provdersTrie.WalkEnd(lock.Path)
p.Value.Unlock(reqID, lock)
}
delete(s.acquireds, reqID)
}

type LockedRequest struct {
Req types.LockRequest
ReqID types.RequestID
Req pubtypes.LockRequest
ReqID pubtypes.RequestID
}

type Command interface {
IsCommand() bool
}

var _ = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[Command](
(*Acquire)(nil),
(*Release)(nil),
(*AcquireTimeout)(nil),
(*LeaseExpired)(nil),
(*Renew)(nil),
)))

type Acquire struct {
ID pubtypes.RequestID
Request pubtypes.LockRequest
Timeout time.Duration
Reason string
}

func (a *Acquire) IsCommand() bool {
return true
}

type Release struct {
ID pubtypes.RequestID
}

func (r *Release) IsCommand() bool {
return true
}

type AcquireTimeout struct {
ID pubtypes.RequestID
}

func (a *AcquireTimeout) IsCommand() bool {
return true
}

type LeaseExpired struct {
ID pubtypes.RequestID
}

func (l *LeaseExpired) IsCommand() bool {
return true
}

type Renew struct {
IDs []pubtypes.RequestID
}

func (r *Renew) IsCommand() bool {
return true
}

type CoreEvent interface {
IsCoreEvent() bool
}

type AcquireResult struct {
Raw Acquire
Error *ecode.CodeError
}

func (a *AcquireResult) IsCoreEvent() bool {
return true
}

type Released struct {
ID pubtypes.RequestID
}

func (r *Released) IsCoreEvent() bool {
return true
}

type ClusterFSM struct {
core *Core
}

func (f *ClusterFSM) ID() string {
return "Publock"
}

func (f *ClusterFSM) Apply(cmdData []byte) ([]byte, error) {
cmd, err := serder.JSONToObjectEx[Command](cmdData)
if err != nil {
return nil, fmt.Errorf("parse command: %v", err)
}

f.core.Apply(cmd)
return nil, nil
}

func (f *ClusterFSM) Snapshot() (cluster.FSMSnapshot, error) {
log := logger.WithField("Mod", "Publock.ClusterFSM")
log.Debugf("make snapshot")

f.core.lock.Lock()
defer f.core.lock.Unlock()

acquireds := make([]*acquired, 0, len(f.core.acquireds))
for _, ac := range f.core.acquireds {
newAc := &acquired{
ID: ac.ID,
Req: ac.Req,
ExpireCounter: ac.ExpireCounter,
}
acquireds = append(acquireds, newAc)
}

acquirings := make([]*acquiring, 0, len(f.core.acquirings))
for _, ac := range f.core.acquirings {
newAc := &acquiring{
Cmd: ac.Cmd,
LastErr: ac.LastErr,
StartTime: ac.StartTime,
}
acquirings = append(acquirings, newAc)
}

return &FSMSnapshot{
Acquireds: acquireds,
Acquirings: acquirings,
}, nil
}

func (f *ClusterFSM) Restore(input io.Reader) error {
log := logger.WithField("Mod", "Publock.ClusterFSM")
log.Debugf("restore from input")

snap := &FSMSnapshot{}
err := serder.JSONToObjectStream(input, snap)
if err != nil {
return err
}

f.core.lock.Lock()
defer f.core.lock.Unlock()

f.core.provdersTrie.Walk(nil, func(word string, wordIndex int, node *trie.Node[pubtypes.LockProvider], isWordNode bool) {
if node.Value != nil {
node.Value.Clear()
}
})

f.core.acquireds = make(map[pubtypes.RequestID]*acquired)
for _, a := range snap.Acquireds {
f.core.applyRequest(a.ID, a.Req)
}

f.core.acquirings = snap.Acquirings
for _, req := range f.core.acquirings {
// 已经超时的请求不启动精确的定时任务
if time.Since(req.StartTime) > req.Cmd.Timeout {
continue
}

go func() {
<-time.After(req.Cmd.Timeout - time.Since(req.StartTime))
cmd := AcquireTimeout{
ID: req.Cmd.ID,
}

data, err := serder.ObjectToJSONEx(cmd)
if err != nil {
log.Warnf("cmd %T to json: %v", cmd, err)
return
}

// 不管是否成功,有定时任务兜底
f.core.clster.Apply(f.core.fsm.ID(), data, req.Cmd.Timeout)
}()
}

return nil
}

type FSMSnapshot struct {
Acquireds []*acquired
Acquirings []*acquiring
}

func (s *FSMSnapshot) Persist(output io.Writer) error {
rc := serder.ObjectToJSONStream(s)
defer rc.Close()
_, err := io.Copy(output, rc)
return err
}

func (s *FSMSnapshot) Release() {}

+ 164
- 138
client/internal/publock/service.go View File

@@ -2,13 +2,17 @@ package publock

import (
"context"
"fmt"
"sync"
"time"

"github.com/google/uuid"
"gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/utils/lo2"
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock/types"
"gitlink.org.cn/cloudream/jcs-pub/common/ecode"
clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client"
)

@@ -28,26 +32,19 @@ func WithReason(reason string) AcquireOptionFn {

type PubLock struct {
core *Core
cliCli *clirpc.PoolClient
pubChan clirpc.PubLockMessageChan
clster *cluster.Cluster
lock sync.Mutex
acquirings map[string]*acquireInfo
releasing map[string]*releaseInfo
nextCtxID int64
acquirings map[types.RequestID]*svcAcquiring
acquired []types.RequestID
releasing map[types.RequestID]*svcReleasing
}

func NewMaster() *PubLock {
core := NewCore()
func New(cfg Config, clster *cluster.Cluster) *PubLock {
return &PubLock{
core: core,
}
}

func NewSlave(cli *clirpc.PoolClient) *PubLock {
return &PubLock{
cliCli: cli,
acquirings: make(map[string]*acquireInfo),
releasing: make(map[string]*releaseInfo),
core: NewCore(cfg, clster),
clster: clster,
acquirings: make(map[types.RequestID]*svcAcquiring),
releasing: make(map[types.RequestID]*svcReleasing),
}
}

@@ -67,69 +64,145 @@ func (p *PubLock) BeginMutex() *MutexBuilder {
return m
}

func (p *PubLock) Start() {
if p.core != nil {
p.core.Start()
return
}
func (p *PubLock) Core() *Core {
return p.core
}

func (p *PubLock) Stop() {
p.lock.Lock()
defer p.lock.Unlock()
func (p *PubLock) Start() {
log := logger.WithField("Mod", "PubLock")

if p.core != nil {
p.core.Stop()
return
}
go func() {
ch := p.core.EventChan()
evt := ch.Receive()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()

loop:
for {
select {
case <-ticker.C:
p.lock.Lock()
reqIDs := make([]types.RequestID, len(p.acquired))
copy(reqIDs, p.acquired)
p.lock.Unlock()

if len(reqIDs) == 0 {
continue
}

cmd := Renew{
IDs: reqIDs,
}
if p.clster == nil {
p.core.Apply(&cmd)
} else {
data, err := serder.ObjectToJSONEx(cmd)
if err != nil {
log.Warnf("cmd %T to json: %v", cmd, err)
continue
}

_, cerr := p.clster.MasterClient().ClusterApplyLog(context.Background(), &clirpc.ClusterApplyLog{
FSMID: p.core.FSM().ID(),
Data: data,
Timeout: time.Second * 3,
})
if cerr != nil {
log.Errorf("apply renew: %v", cerr)
}
}

case ret := <-evt.Chan():
if ret.Err != nil {
break loop
}

p.lock.Lock()
switch e := ret.Value.(type) {
case *AcquireResult:
a, ok := p.acquirings[e.Raw.ID]
if !ok {
break
}

if e.Error == nil {
a.Callback.SetVoid()
p.acquired = append(p.acquired, e.Raw.ID)
} else {
a.Callback.SetError(e.Error)
}
delete(p.acquirings, e.Raw.ID)

case *Released:
r, ok := p.releasing[e.ID]
if !ok {
break
}

r.Callback.SetVoid()
p.acquired = lo2.Remove(p.acquired, e.ID)
delete(p.releasing, e.ID)

}
p.lock.Unlock()

evt = ch.Receive()
}
}
}()

if p.pubChan != nil {
p.pubChan.Close()
p.pubChan = nil
}
p.core.Start()
}

p.cliCli.Release()
p.cliCli = nil
func (p *PubLock) Stop() {
p.core.Stop()
}

func (p *PubLock) Acquire(req types.LockRequest, opt types.AcquireOption) (LockedRequest, error) {
func (p *PubLock) Acquire(req types.LockRequest, opt types.AcquireOption) (LockedRequest, *ecode.CodeError) {
p.lock.Lock()

if p.core != nil {
p.lock.Unlock()
return p.core.Acquire(req, opt)
cmd := Acquire{
ID: types.RequestID(uuid.NewString()),
Request: req,
Timeout: opt.Timeout,
Reason: opt.Reason,
}

if p.pubChan == nil {
p.pubChan = p.cliCli.PubLockChannel(context.Background())
go p.receivingChan()
ac := &svcAcquiring{
RequestID: cmd.ID,
Request: cmd.Request,
Callback: future.NewSetVoid(),
}
p.acquirings[cmd.ID] = ac
p.lock.Unlock()

acqID := fmt.Sprintf("%v", p.nextCtxID)
p.nextCtxID++

cerr := p.pubChan.Send(&types.AcquireMsg{ContextID: acqID, Request: req, Option: opt})
if cerr != nil {
p.lock.Unlock()
return LockedRequest{}, cerr.ToError()
}
if p.clster == nil {
p.core.Apply(&cmd)
} else {
data, err := serder.ObjectToJSONEx(cmd)
if err != nil {
return LockedRequest{}, ecode.Newf(ecode.OperationFailed, "cmd %T to json: %v", cmd, err)
}

callback := future.NewSetValue[types.RequestID]()
info := &acquireInfo{
Request: req,
Callback: callback,
_, cerr := p.clster.MasterClient().ClusterApplyLog(context.Background(), &clirpc.ClusterApplyLog{
FSMID: p.core.FSM().ID(),
Data: data,
Timeout: opt.Timeout,
})
if cerr != nil {
return LockedRequest{}, ecode.New(ecode.ErrorCode(cerr.Code), cerr.Message)
}
}
p.acquirings[acqID] = info
p.lock.Unlock()

reqID, err := callback.Wait(context.Background())
err := ac.Callback.Wait(context.Background())
if err != nil {
return LockedRequest{}, err
return LockedRequest{}, ecode.Newf(ecode.OperationFailed, "wait acquire: %v", err)
}

return LockedRequest{
Req: req,
ReqID: reqID,
ReqID: ac.RequestID,
}, nil
}

@@ -137,97 +210,50 @@ func (p *PubLock) Release(reqID types.RequestID) {
log := logger.WithField("Mod", "PubLock")

p.lock.Lock()

if p.core != nil {
p.lock.Unlock()
p.core.release(reqID)
return
cmd := Release{
ID: reqID,
}

if p.pubChan == nil {
p.pubChan = p.cliCli.PubLockChannel(context.Background())
go p.receivingChan()
}

relID := fmt.Sprintf("%v", p.nextCtxID)
p.nextCtxID++

cerr := p.pubChan.Send(&types.ReleaseMsg{ContextID: relID, RequestID: reqID})
if cerr != nil {
p.lock.Unlock()
log.Warnf("unlock %v: %v", reqID, cerr.ToError())
return
r := &svcReleasing{
RequestID: cmd.ID,
Callback: future.NewSetVoid(),
}

callback := future.NewSetVoid()
info := &releaseInfo{
RequestID: reqID,
Callback: callback,
}
p.releasing[relID] = info
p.releasing[cmd.ID] = r
p.lock.Unlock()

err := callback.Wait(context.Background())
if err != nil {
log.Warnf("unlock %v: %v", reqID, err)
return
}

log.Tracef("unlock %v", reqID)
}

func (p *PubLock) receivingChan() {
log := logger.WithField("Mod", "PubLock")
for {
msg, cerr := p.pubChan.Receive()
if cerr != nil {
p.lock.Lock()
for _, info := range p.acquirings {
info.Callback.SetError(cerr.ToError())
}
p.acquirings = make(map[string]*acquireInfo)

for _, info := range p.releasing {
info.Callback.SetError(cerr.ToError())
}
p.releasing = make(map[string]*releaseInfo)
p.lock.Unlock()

log.Warnf("receive channel error: %v", cerr.ToError())
if p.clster == nil {
p.core.Apply(&cmd)
} else {
data, err := serder.ObjectToJSONEx(cmd)
if err != nil {
log.Warnf("cmd %T to json: %v", cmd, err)
return
}

p.lock.Lock()

switch msg := msg.(type) {
case *types.AcquireResultMsg:
info, ok := p.acquirings[msg.ContextID]
if !ok {
continue
}

if msg.Success {
info.Callback.SetValue(msg.RequestID)
} else {
info.Callback.SetError(fmt.Errorf(msg.Error))
}
delete(p.acquirings, msg.ContextID)

case *types.ReleaseResultMsg:
info, ok := p.releasing[msg.ContextID]
if !ok {
continue
}

info.Callback.SetVoid()
delete(p.releasing, msg.ContextID)
_, cerr := p.clster.MasterClient().ClusterApplyLog(context.Background(), &clirpc.ClusterApplyLog{
FSMID: p.core.FSM().ID(),
Data: data,
Timeout: 0,
})
if cerr != nil {
log.Errorf("apply release: %v", cerr)
}
}

p.lock.Unlock()
err := r.Callback.Wait(context.Background())
if err != nil {
log.Errorf("wait release: %v", err)
} else {
log.Tracef("unlock %v", reqID)
}
}

type releaseInfo struct {
type svcAcquiring struct {
RequestID types.RequestID
Request types.LockRequest
Callback *future.SetVoidFuture
}

type svcReleasing struct {
RequestID types.RequestID
Callback *future.SetVoidFuture
}

+ 4
- 2
client/internal/rpc/cluster.go View File

@@ -21,10 +21,12 @@ func (s *Service) ClusterRaftInstallSnapshot(ctx context.Context, msg *clirpc.Cl
}

func (s *Service) ClusterApplyLog(ctx context.Context, msg *clirpc.ClusterApplyLog) (*clirpc.ClusterApplyLogResp, *rpc.CodeError) {
err := s.cluster.Apply(msg.Data, msg.Timeout)
ret, err := s.cluster.Apply(msg.FSMID, msg.Data, msg.Timeout)
if err != nil {
return nil, rpc.Failed(ecode.OperationFailed, "%v", err)
}

return &clirpc.ClusterApplyLogResp{}, nil
return &clirpc.ClusterApplyLogResp{
Result: ret,
}, nil
}

+ 3
- 0
common/assets/confs/client.config.json View File

@@ -82,5 +82,8 @@
"clientCert": "",
"clientKey": "",
"storeBase": ""
},
"publock": {
"leaseExpiredSeconds": 5
}
}

+ 1
- 0
common/ecode/ecode.go View File

@@ -12,4 +12,5 @@ const (
Unauthorized ErrorCode = "Unauthorized"
ChannelClosed ErrorCode = "ChannelClosed"
ClusterNoMaster ErrorCode = "ClusterNoMaster"
LockConflict ErrorCode = "LockConflict"
)

+ 5
- 2
common/pkgs/rpc/client/cluster.go View File

@@ -84,10 +84,13 @@ func (s *Server) ClusterRaftInstallSnapshot(req Client_ClusterRaftInstallSnapsho
}

type ClusterApplyLog struct {
Data string
FSMID string
Data []byte
Timeout time.Duration
}
type ClusterApplyLogResp struct{}
type ClusterApplyLogResp struct {
Result []byte
}

func (c *Client) ClusterApplyLog(ctx context.Context, msg *ClusterApplyLog) (*ClusterApplyLogResp, *rpc.CodeError) {
if c.fusedErr != nil {


Loading…
Cancel
Save