|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- package server
-
- import (
- "fmt"
- "net"
- "os"
- "os/signal"
- "github.com/dk-lockdown/seata-golang/logging"
- "github.com/dk-lockdown/seata-golang/tc/config"
- "syscall"
- "time"
- )
-
- import (
- "github.com/dubbogo/getty"
- "github.com/dubbogo/gost/sync"
- )
-
- var (
- srvGrpool *gxsync.TaskPool
- )
-
-
- func SetServerGrpool() {
- srvConf := config.GetServerConfig()
- if srvConf.GettyConfig.GrPoolSize > 1 {
- srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GettyConfig.GrPoolSize),
- gxsync.WithTaskPoolTaskQueueLength(srvConf.GettyConfig.QueueLen),
- gxsync.WithTaskPoolTaskQueueNumber(srvConf.GettyConfig.QueueNumber))
- }
- }
-
- type Server struct {
- conf config.ServerConfig
- tcpServer getty.Server
- rpcHandler *DefaultCoordinator
- }
-
- func NewServer() *Server {
-
- s := &Server{
- conf: config.GetServerConfig(),
- }
- coordinator := NewDefaultCoordinator(s.conf)
- s.rpcHandler = coordinator
-
- return s
- }
-
- func (s *Server) newSession(session getty.Session) error {
- var (
- ok bool
- tcpConn *net.TCPConn
- )
- conf := s.conf
-
- if conf.GettyConfig.GettySessionParam.CompressEncoding {
- session.SetCompressType(getty.CompressZip)
- }
-
- if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
- panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
- }
-
- tcpConn.SetNoDelay(conf.GettyConfig.GettySessionParam.TcpNoDelay)
- tcpConn.SetKeepAlive(conf.GettyConfig.GettySessionParam.TcpKeepAlive)
- if conf.GettyConfig.GettySessionParam.TcpKeepAlive {
- tcpConn.SetKeepAlivePeriod(conf.GettyConfig.GettySessionParam.KeepAlivePeriod)
- }
- tcpConn.SetReadBuffer(conf.GettyConfig.GettySessionParam.TcpRBufSize)
- tcpConn.SetWriteBuffer(conf.GettyConfig.GettySessionParam.TcpWBufSize)
-
- session.SetName(conf.GettyConfig.GettySessionParam.SessionName)
- session.SetMaxMsgLen(conf.GettyConfig.GettySessionParam.MaxMsgLen)
- session.SetPkgHandler(RpcServerPkgHandler)
- session.SetEventListener(s.rpcHandler)
- session.SetWQLen(conf.GettyConfig.GettySessionParam.PkgWQSize)
- session.SetReadTimeout(conf.GettyConfig.GettySessionParam.TcpReadTimeout)
- session.SetWriteTimeout(conf.GettyConfig.GettySessionParam.TcpWriteTimeout)
- session.SetCronPeriod((int)(conf.GettyConfig.SessionTimeout.Nanoseconds() / 1e6))
- session.SetWaitTime(conf.GettyConfig.GettySessionParam.WaitTimeout)
- logging.Logger.Debugf("app accepts new session:%s\n", session.Stat())
-
- session.SetTaskPool(srvGrpool)
-
- return nil
- }
-
- func (s *Server) Start(addr string) {
- var (
- tcpServer getty.Server
- )
-
- tcpServer = getty.NewTCPServer(
- getty.WithLocalAddress(addr),
- )
- tcpServer.RunEventLoop(s.newSession)
- logging.Logger.Debugf("s bind addr{%s} ok!", addr)
- s.tcpServer = tcpServer
-
- c := make(chan os.Signal, 1)
- signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
- for {
- sig := <-c
- logging.Logger.Info("get a signal %s", sig.String())
- switch sig {
- case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
- s.Stop()
- time.Sleep(time.Second)
- return
- case syscall.SIGHUP:
- default:
- return
- }
- }
- }
-
- func (s *Server) Stop() {
- s.tcpServer.Close()
- s.rpcHandler.Stop()
- }
|