You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

rpc_client.go 2.7 kB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package getty
  2. import (
  3. "fmt"
  4. "github.com/seata/seata-go/pkg/common/log"
  5. "net"
  6. "sync"
  7. )
  8. import (
  9. getty "github.com/apache/dubbo-getty"
  10. gxsync "github.com/dubbogo/gost/sync"
  11. )
  12. import (
  13. "github.com/seata/seata-go/pkg/config"
  14. )
  15. type RpcClient struct {
  16. conf *config.ClientConfig
  17. gettyClients []getty.Client
  18. futures *sync.Map
  19. }
  20. func init() {
  21. newRpcClient()
  22. }
  23. func newRpcClient() *RpcClient {
  24. rpcClient := &RpcClient{
  25. conf: config.GetClientConfig(),
  26. gettyClients: make([]getty.Client, 0),
  27. }
  28. rpcClient.init()
  29. return rpcClient
  30. }
  31. func (c *RpcClient) init() {
  32. addressList := getAvailServerList()
  33. if len(addressList) == 0 {
  34. log.Warn("no have valid seata server list")
  35. }
  36. for _, address := range addressList {
  37. gettyClient := getty.NewTCPClient(
  38. getty.WithServerAddress(address),
  39. getty.WithConnectionNumber((int)(c.conf.GettyConfig.ConnectionNum)),
  40. getty.WithReconnectInterval(c.conf.GettyConfig.ReconnectInterval),
  41. getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)),
  42. )
  43. go gettyClient.RunEventLoop(c.newSession)
  44. // c.gettyClients = append(c.gettyClients, gettyClient)
  45. }
  46. }
  47. // todo mock
  48. func getAvailServerList() []string {
  49. return []string{"127.0.0.1:8091"}
  50. }
  51. func (c *RpcClient) newSession(session getty.Session) error {
  52. var (
  53. ok bool
  54. tcpConn *net.TCPConn
  55. )
  56. if c.conf.GettyConfig.GettySessionParam.CompressEncoding {
  57. session.SetCompressType(getty.CompressZip)
  58. }
  59. if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
  60. panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
  61. }
  62. tcpConn.SetNoDelay(c.conf.GettyConfig.GettySessionParam.TCPNoDelay)
  63. tcpConn.SetKeepAlive(c.conf.GettyConfig.GettySessionParam.TCPKeepAlive)
  64. if c.conf.GettyConfig.GettySessionParam.TCPKeepAlive {
  65. tcpConn.SetKeepAlivePeriod(c.conf.GettyConfig.GettySessionParam.KeepAlivePeriod)
  66. }
  67. tcpConn.SetReadBuffer(c.conf.GettyConfig.GettySessionParam.TCPRBufSize)
  68. tcpConn.SetWriteBuffer(c.conf.GettyConfig.GettySessionParam.TCPWBufSize)
  69. session.SetName(c.conf.GettyConfig.GettySessionParam.SessionName)
  70. session.SetMaxMsgLen(c.conf.GettyConfig.GettySessionParam.MaxMsgLen)
  71. session.SetPkgHandler(rpcPkgHandler)
  72. session.SetEventListener(GetGettyClientHandlerInstance())
  73. session.SetReadTimeout(c.conf.GettyConfig.GettySessionParam.TCPReadTimeout)
  74. session.SetWriteTimeout(c.conf.GettyConfig.GettySessionParam.TCPWriteTimeout)
  75. session.SetCronPeriod((int)(c.conf.GettyConfig.HeartbeatPeriod.Nanoseconds() / 1e6))
  76. session.SetWaitTime(c.conf.GettyConfig.GettySessionParam.WaitTimeout)
  77. log.Debugf("rpc_client new session:%s\n", session.Stat())
  78. return nil
  79. }

Go Implementation For Seata