package cluster import ( "context" "io" "sync" "time" "github.com/hashicorp/raft" "gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/jcs-pub/common/ecode" "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" ) const ( CmdAppendEntries = "AppendEntries" CmdRequestVote = "RequestVote" CmdRequestPreVote = "RequestPreVote" CmdTimeoutNow = "TimeoutNow" ) type Transport struct { localAddr string shutdownCh chan any shutdownLock sync.Mutex shutdown bool consumeCh chan raft.RPC cliPoolCfg clirpc.PoolConfig cliPools map[raft.ServerAddress]*clirpc.Pool poolLock sync.Mutex heartbeatFn func(raft.RPC) heartbeatFnLock sync.Mutex } func NewTransport(localAddr string, cliPoolCfg clirpc.PoolConfig) *Transport { return &Transport{ localAddr: localAddr, shutdownCh: make(chan any), consumeCh: make(chan raft.RPC), cliPoolCfg: cliPoolCfg, cliPools: make(map[raft.ServerAddress]*clirpc.Pool), heartbeatFn: nil, } } func (t *Transport) Close() { t.shutdownLock.Lock() defer t.shutdownLock.Unlock() if !t.shutdown { close(t.shutdownCh) t.shutdown = true } } func (n *Transport) IsClosed() bool { select { case <-n.shutdownCh: return true default: return false } } func (t *Transport) Consumer() <-chan raft.RPC { return t.consumeCh } func (t *Transport) LocalAddr() raft.ServerAddress { return raft.ServerAddress(t.localAddr) } func (t *Transport) AppendEntriesPipeline(id raft.ServerID, target raft.ServerAddress) (raft.AppendPipeline, error) { t.poolLock.Lock() defer t.poolLock.Unlock() pool := t.cliPools[target] if pool == nil { cfg := t.cliPoolCfg cfg.Address = string(target) pool = clirpc.NewPool(cfg) t.cliPools[target] = pool } cli := pool.Get() return &AppendPipeline{ cli: cli, consumerCh: make(chan raft.AppendFuture), done: make(chan any), }, nil } func (t *Transport) AppendEntries(id raft.ServerID, target raft.ServerAddress, args *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) error { cli := t.getCli(target) reqData, err := serder.ObjectToJSON(args) if err != nil { return err } r, cerr := cli.ClusterRaftRPC(context.Background(), &clirpc.ClusterRaftRPC{ Type: CmdAppendEntries, Data: string(reqData), }) if cerr != nil { return cerr.ToError() } err = serder.JSONToObject([]byte(r.Data), resp) if err != nil { return err } return nil } func (t *Transport) RequestVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestVoteRequest, resp *raft.RequestVoteResponse) error { cli := t.getCli(target) reqData, err := serder.ObjectToJSON(args) if err != nil { return err } r, cerr := cli.ClusterRaftRPC(context.Background(), &clirpc.ClusterRaftRPC{ Type: CmdRequestVote, Data: string(reqData), }) if cerr != nil { return cerr.ToError() } err = serder.JSONToObject([]byte(r.Data), resp) if err != nil { return err } return nil } func (t *Transport) RequestPreVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestPreVoteRequest, resp *raft.RequestPreVoteResponse) error { cli := t.getCli(target) reqData, err := serder.ObjectToJSON(args) if err != nil { return err } r, cerr := cli.ClusterRaftRPC(context.Background(), &clirpc.ClusterRaftRPC{ Type: CmdRequestPreVote, Data: string(reqData), }) if cerr != nil { return cerr.ToError() } err = serder.JSONToObject([]byte(r.Data), resp) if err != nil { return err } return nil } func (t *Transport) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, args *raft.InstallSnapshotRequest, resp *raft.InstallSnapshotResponse, data io.Reader) error { cli := t.getCli(target) r, cerr := cli.ClusterRaftInstallSnapshot(context.Background(), &clirpc.ClusterRaftInstallSnapshot{ Args: args, Data: data, }) if cerr != nil { return cerr.ToError() } *resp = *r.Resp return nil } func (t *Transport) EncodePeer(id raft.ServerID, addr raft.ServerAddress) []byte { return []byte(addr) } func (t *Transport) DecodePeer(data []byte) raft.ServerAddress { return raft.ServerAddress(string(data)) } func (t *Transport) SetHeartbeatHandler(cb func(rpc raft.RPC)) { t.heartbeatFnLock.Lock() defer t.heartbeatFnLock.Unlock() t.heartbeatFn = cb } func (t *Transport) TimeoutNow(id raft.ServerID, target raft.ServerAddress, args *raft.TimeoutNowRequest, resp *raft.TimeoutNowResponse) error { cli := t.getCli(target) reqData, err := serder.ObjectToJSON(args) if err != nil { return err } r, cerr := cli.ClusterRaftRPC(context.Background(), &clirpc.ClusterRaftRPC{ Type: CmdTimeoutNow, Data: string(reqData), }) if cerr != nil { return cerr.ToError() } err = serder.JSONToObject([]byte(r.Data), resp) if err != nil { return err } return nil } func (t *Transport) OnRPC(req *clirpc.ClusterRaftRPC) (*clirpc.ClusterRaftRPCResp, *rpc.CodeError) { respChan := make(chan raft.RPCResponse, 1) rp := raft.RPC{ RespChan: respChan, } isHeartbeat := false switch req.Type { case CmdAppendEntries: var r raft.AppendEntriesRequest err := serder.JSONToObject([]byte(req.Data), &r) if err != nil { return nil, rpc.Failed(ecode.OperationFailed, err.Error()) } rp.Command = &r leaderAddr := r.RPCHeader.Addr // Check if this is a heartbeat if r.Term != 0 && leaderAddr != nil && r.PrevLogEntry == 0 && r.PrevLogTerm == 0 && len(r.Entries) == 0 && r.LeaderCommitIndex == 0 { isHeartbeat = true } case CmdRequestVote: var r raft.RequestVoteRequest err := serder.JSONToObject([]byte(req.Data), &r) if err != nil { return nil, rpc.Failed(ecode.OperationFailed, err.Error()) } rp.Command = &r case CmdRequestPreVote: var r raft.RequestPreVoteRequest err := serder.JSONToObject([]byte(req.Data), &r) if err != nil { return nil, rpc.Failed(ecode.OperationFailed, err.Error()) } rp.Command = &r case CmdTimeoutNow: var r raft.TimeoutNowRequest err := serder.JSONToObject([]byte(req.Data), &r) if err != nil { return nil, rpc.Failed(ecode.OperationFailed, err.Error()) } rp.Command = &r } if isHeartbeat { t.heartbeatFnLock.Lock() fn := t.heartbeatFn t.heartbeatFnLock.Unlock() if fn != nil { fn(rp) goto RESP } } // Dispatch the RPC select { case t.consumeCh <- rp: case <-t.shutdownCh: return nil, rpc.Failed(ecode.OperationFailed, raft.ErrTransportShutdown.Error()) } // Wait for response RESP: select { case resp := <-respChan: if resp.Error != nil { return nil, rpc.Failed(ecode.OperationFailed, resp.Error.Error()) } data, err := serder.ObjectToJSON(resp.Response) if err != nil { return nil, rpc.Failed(ecode.OperationFailed, err.Error()) } return &clirpc.ClusterRaftRPCResp{ Data: string(data), }, nil case <-t.shutdownCh: return nil, rpc.Failed(ecode.OperationFailed, raft.ErrTransportShutdown.Error()) } } func (t *Transport) OnInstallSnapshot(req *clirpc.ClusterRaftInstallSnapshot) (*clirpc.ClusterRaftInstallSnapshotResp, *rpc.CodeError) { respChan := make(chan raft.RPCResponse, 1) rp := raft.RPC{ Command: req.Args, Reader: req.Data, RespChan: respChan, } select { case t.consumeCh <- rp: case <-t.shutdownCh: return nil, rpc.Failed(ecode.OperationFailed, raft.ErrTransportShutdown.Error()) } select { case resp := <-respChan: if resp.Error != nil { return nil, rpc.Failed(ecode.OperationFailed, resp.Error.Error()) } return &clirpc.ClusterRaftInstallSnapshotResp{ Resp: resp.Response.(*raft.InstallSnapshotResponse), }, nil case <-t.shutdownCh: return nil, rpc.Failed(ecode.OperationFailed, raft.ErrTransportShutdown.Error()) } } func (t *Transport) getCli(target raft.ServerAddress) *clirpc.PoolClient { t.poolLock.Lock() defer t.poolLock.Unlock() pool := t.cliPools[target] if pool == nil { cfg := t.cliPoolCfg cfg.Address = string(target) pool = clirpc.NewPool(cfg) t.cliPools[target] = pool } return pool.Get() } var _ raft.Transport = (*Transport)(nil) var _ raft.WithPreVote = (*Transport)(nil) type AppendPipeline struct { cli *clirpc.PoolClient consumerCh chan raft.AppendFuture done chan any closeLock sync.Mutex } func (c *AppendPipeline) AppendEntries(args *raft.AppendEntriesRequest, resp *raft.AppendEntriesResponse) (raft.AppendFuture, error) { reqData, err := serder.ObjectToJSON(args) if err != nil { return nil, err } startTime := time.Now() r, cerr := c.cli.ClusterRaftRPC(context.Background(), &clirpc.ClusterRaftRPC{ Type: CmdAppendEntries, Data: string(reqData), }) if cerr != nil { return nil, cerr.ToError() } err = serder.JSONToObject([]byte(r.Data), resp) if err != nil { return nil, err } f := &appendFuture{ startTime: startTime, req: args, resp: resp, } select { case c.consumerCh <- f: return f, nil case <-c.done: return nil, raft.ErrPipelineShutdown } } func (c *AppendPipeline) Consumer() <-chan raft.AppendFuture { return c.consumerCh } func (c *AppendPipeline) Close() error { c.closeLock.Lock() defer c.closeLock.Unlock() if c.done == nil { return nil } close(c.done) c.done = nil c.cli.Release() return nil } var _ raft.AppendPipeline = (*AppendPipeline)(nil) type appendFuture struct { startTime time.Time req *raft.AppendEntriesRequest resp *raft.AppendEntriesResponse } func (f *appendFuture) Error() error { return nil } func (f *appendFuture) Start() time.Time { return f.startTime } func (f *appendFuture) Request() *raft.AppendEntriesRequest { return f.req } func (f *appendFuture) Response() *raft.AppendEntriesResponse { return f.resp }