You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

server.go 3.1 kB

5 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package server
  2. import (
  3. "fmt"
  4. "net"
  5. "os"
  6. "os/signal"
  7. "github.com/dk-lockdown/seata-golang/logging"
  8. "github.com/dk-lockdown/seata-golang/tc/config"
  9. "syscall"
  10. "time"
  11. )
  12. import (
  13. "github.com/dubbogo/getty"
  14. "github.com/dubbogo/gost/sync"
  15. )
  16. var (
  17. srvGrpool *gxsync.TaskPool
  18. )
  19. func SetServerGrpool() {
  20. srvConf := config.GetServerConfig()
  21. if srvConf.GettyConfig.GrPoolSize > 1 {
  22. srvGrpool = gxsync.NewTaskPool(gxsync.WithTaskPoolTaskPoolSize(srvConf.GettyConfig.GrPoolSize),
  23. gxsync.WithTaskPoolTaskQueueLength(srvConf.GettyConfig.QueueLen),
  24. gxsync.WithTaskPoolTaskQueueNumber(srvConf.GettyConfig.QueueNumber))
  25. }
  26. }
  27. type Server struct {
  28. conf config.ServerConfig
  29. tcpServer getty.Server
  30. rpcHandler *DefaultCoordinator
  31. }
  32. func NewServer() *Server {
  33. s := &Server{
  34. conf: config.GetServerConfig(),
  35. }
  36. coordinator := NewDefaultCoordinator(s.conf)
  37. s.rpcHandler = coordinator
  38. return s
  39. }
  40. func (s *Server) newSession(session getty.Session) error {
  41. var (
  42. ok bool
  43. tcpConn *net.TCPConn
  44. )
  45. conf := s.conf
  46. if conf.GettyConfig.GettySessionParam.CompressEncoding {
  47. session.SetCompressType(getty.CompressZip)
  48. }
  49. if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
  50. panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
  51. }
  52. tcpConn.SetNoDelay(conf.GettyConfig.GettySessionParam.TcpNoDelay)
  53. tcpConn.SetKeepAlive(conf.GettyConfig.GettySessionParam.TcpKeepAlive)
  54. if conf.GettyConfig.GettySessionParam.TcpKeepAlive {
  55. tcpConn.SetKeepAlivePeriod(conf.GettyConfig.GettySessionParam.KeepAlivePeriod)
  56. }
  57. tcpConn.SetReadBuffer(conf.GettyConfig.GettySessionParam.TcpRBufSize)
  58. tcpConn.SetWriteBuffer(conf.GettyConfig.GettySessionParam.TcpWBufSize)
  59. session.SetName(conf.GettyConfig.GettySessionParam.SessionName)
  60. session.SetMaxMsgLen(conf.GettyConfig.GettySessionParam.MaxMsgLen)
  61. session.SetPkgHandler(RpcServerPkgHandler)
  62. session.SetEventListener(s.rpcHandler)
  63. session.SetWQLen(conf.GettyConfig.GettySessionParam.PkgWQSize)
  64. session.SetReadTimeout(conf.GettyConfig.GettySessionParam.TcpReadTimeout)
  65. session.SetWriteTimeout(conf.GettyConfig.GettySessionParam.TcpWriteTimeout)
  66. session.SetCronPeriod((int)(conf.GettyConfig.SessionTimeout.Nanoseconds() / 1e6))
  67. session.SetWaitTime(conf.GettyConfig.GettySessionParam.WaitTimeout)
  68. logging.Logger.Debugf("app accepts new session:%s\n", session.Stat())
  69. session.SetTaskPool(srvGrpool)
  70. return nil
  71. }
  72. func (s *Server) Start(addr string) {
  73. var (
  74. tcpServer getty.Server
  75. )
  76. tcpServer = getty.NewTCPServer(
  77. getty.WithLocalAddress(addr),
  78. )
  79. tcpServer.RunEventLoop(s.newSession)
  80. logging.Logger.Debugf("s bind addr{%s} ok!", addr)
  81. s.tcpServer = tcpServer
  82. c := make(chan os.Signal, 1)
  83. signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  84. for {
  85. sig := <-c
  86. logging.Logger.Info("get a signal %s", sig.String())
  87. switch sig {
  88. case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
  89. s.Stop()
  90. time.Sleep(time.Second)
  91. return
  92. case syscall.SIGHUP:
  93. default:
  94. return
  95. }
  96. }
  97. }
  98. func (s *Server) Stop() {
  99. s.tcpServer.Close()
  100. s.rpcHandler.Stop()
  101. }

Go Implementation For Seata