Browse Source

feat: add getty config (#399)

add getty config
tags/v1.0.3
lxfeng1997 GitHub 2 years ago
parent
commit
dcb6ec9ece
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 187 additions and 17 deletions
  1. +4
    -2
      pkg/client/config.go
  2. +40
    -3
      pkg/client/config_test.go
  3. +38
    -0
      pkg/remoting/getty/config.go
  4. +54
    -0
      pkg/remoting/getty/session_config.go
  5. +17
    -12
      testdata/conf/seatago.yml
  6. +17
    -0
      testdata/context.go
  7. +17
    -0
      testdata/mock_tcc.go

+ 4
- 2
pkg/client/config.go View File

@@ -26,13 +26,13 @@ import (
"runtime"
"strings"

"github.com/seata/seata-go/pkg/tm"

"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/json"
"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/pkg/util/flagext"
)
@@ -63,11 +63,13 @@ func (c *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
type Config struct {
TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"`
ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"`
GettyConfig getty.Config `yaml:"getty" json:"getty" koanf:"getty"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.TCCConfig.FenceConfig.RegisterFlagsWithPrefix("tcc", f)
c.ClientConfig.RegisterFlagsWithPrefix("client", f)
c.GettyConfig.RegisterFlagsWithPrefix("getty", f)
}

type loaderConf struct {


+ 40
- 3
pkg/client/config_test.go View File

@@ -29,9 +29,9 @@ import (
func TestLoadPath(t *testing.T) {
cfg := LoadPath("../../testdata/conf/seatago.yml")
assert.NotNil(t, cfg)

assert.NotNil(t, cfg.TCCConfig)
assert.NotNil(t, cfg.TCCConfig.FenceConfig)

assert.Equal(t, "tcc_fence_log_test", cfg.TCCConfig.FenceConfig.LogTableName)
assert.Equal(t, time.Second*60, cfg.TCCConfig.FenceConfig.CleanPeriod)

@@ -45,17 +45,36 @@ func TestLoadPath(t *testing.T) {
assert.Equal(t, time.Second*10, cfg.ClientConfig.TmConfig.DegradeCheckAllowTimes)
assert.Equal(t, -2147482648, cfg.ClientConfig.TmConfig.InterceptorOrder)

assert.NotNil(t, cfg.GettyConfig)
assert.NotNil(t, cfg.GettyConfig.SessionConfig)
assert.Equal(t, 0, cfg.GettyConfig.ReconnectInterval)
assert.Equal(t, 16, cfg.GettyConfig.ConnectionNum)
assert.Equal(t, time.Second*15, cfg.GettyConfig.HeartbeatPeriod)
assert.Equal(t, false, cfg.GettyConfig.SessionConfig.CompressEncoding)
assert.Equal(t, true, cfg.GettyConfig.SessionConfig.TCPNoDelay)
assert.Equal(t, true, cfg.GettyConfig.SessionConfig.TCPKeepAlive)
assert.Equal(t, time.Minute*2, cfg.GettyConfig.SessionConfig.KeepAlivePeriod)
assert.Equal(t, 262144, cfg.GettyConfig.SessionConfig.TCPRBufSize)
assert.Equal(t, 65536, cfg.GettyConfig.SessionConfig.TCPWBufSize)
assert.Equal(t, time.Second, cfg.GettyConfig.SessionConfig.TCPReadTimeout)
assert.Equal(t, time.Second*5, cfg.GettyConfig.SessionConfig.TCPWriteTimeout)
assert.Equal(t, time.Second, cfg.GettyConfig.SessionConfig.WaitTimeout)
assert.Equal(t, 16498688, cfg.GettyConfig.SessionConfig.MaxMsgLen)
assert.Equal(t, "client_test", cfg.GettyConfig.SessionConfig.SessionName)
assert.Equal(t, time.Second, cfg.GettyConfig.SessionConfig.CronPeriod)

// reset flag.CommandLine
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
}

func TestLoadJson(t *testing.T) {
confJson := `{"client":{"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}}}`
confJson := `{"client":{"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},
"getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}}}`
cfg := LoadJson([]byte(confJson))
assert.NotNil(t, cfg)

assert.NotNil(t, cfg.TCCConfig)
assert.NotNil(t, cfg.TCCConfig.FenceConfig)

assert.Equal(t, "tcc_fence_log_test2", cfg.TCCConfig.FenceConfig.LogTableName)
assert.Equal(t, time.Second*80, cfg.TCCConfig.FenceConfig.CleanPeriod)

@@ -69,6 +88,24 @@ func TestLoadJson(t *testing.T) {
assert.Equal(t, time.Second*10, cfg.ClientConfig.TmConfig.DegradeCheckAllowTimes)
assert.Equal(t, -2147482648, cfg.ClientConfig.TmConfig.InterceptorOrder)

assert.NotNil(t, cfg.GettyConfig)
assert.NotNil(t, cfg.GettyConfig.SessionConfig)
assert.Equal(t, 1, cfg.GettyConfig.ReconnectInterval)
assert.Equal(t, 10, cfg.GettyConfig.ConnectionNum)
assert.Equal(t, time.Second*10, cfg.GettyConfig.HeartbeatPeriod)
assert.Equal(t, true, cfg.GettyConfig.SessionConfig.CompressEncoding)
assert.Equal(t, false, cfg.GettyConfig.SessionConfig.TCPNoDelay)
assert.Equal(t, false, cfg.GettyConfig.SessionConfig.TCPKeepAlive)
assert.Equal(t, time.Minute*2, cfg.GettyConfig.SessionConfig.KeepAlivePeriod)
assert.Equal(t, 261120, cfg.GettyConfig.SessionConfig.TCPRBufSize)
assert.Equal(t, 32768, cfg.GettyConfig.SessionConfig.TCPWBufSize)
assert.Equal(t, time.Second*2, cfg.GettyConfig.SessionConfig.TCPReadTimeout)
assert.Equal(t, time.Second*8, cfg.GettyConfig.SessionConfig.TCPWriteTimeout)
assert.Equal(t, time.Second*2, cfg.GettyConfig.SessionConfig.WaitTimeout)
assert.Equal(t, 261120, cfg.GettyConfig.SessionConfig.MaxMsgLen)
assert.Equal(t, "client_test", cfg.GettyConfig.SessionConfig.SessionName)
assert.Equal(t, time.Second*2, cfg.GettyConfig.SessionConfig.CronPeriod)

// reset flag.CommandLine
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
}

+ 38
- 0
pkg/remoting/getty/config.go View File

@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package getty

import (
"flag"
"time"
)

type Config struct {
ReconnectInterval int `yaml:"reconnect-interval" json:"reconnect-interval" koanf:"reconnect-interval"`
ConnectionNum int `yaml:"connection-num" json:"connection-num" koanf:"connection-num"`
HeartbeatPeriod time.Duration `yaml:"heartbeat-period" json:"heartbeat-period" koanf:"heartbeat-period"`
SessionConfig SessionConfig `yaml:"session" json:"session" koanf:"session"`
}

// RegisterFlagsWithPrefix for Config.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.ReconnectInterval, prefix+".reconnect-interval", 0, "Reconnect interval.")
f.IntVar(&cfg.ConnectionNum, prefix+".connection-num", 16, "The getty_session pool.")
f.DurationVar(&cfg.HeartbeatPeriod, prefix+".heartbeat-period", 15*time.Second, "The heartbeat period.")
cfg.SessionConfig.RegisterFlagsWithPrefix(prefix+".session", f)
}

+ 54
- 0
pkg/remoting/getty/session_config.go View File

@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package getty

import (
"flag"
"time"
)

type SessionConfig struct {
CompressEncoding bool `yaml:"compress-encoding" json:"compress-encoding" koanf:"compress-encoding"`
TCPNoDelay bool `yaml:"tcp-no-delay" json:"tcp-no-delay" koanf:"tcp-no-delay"`
TCPKeepAlive bool `yaml:"tcp-keep-alive" json:"tcp-keep-alive" koanf:"tcp-keep-alive"`
KeepAlivePeriod time.Duration `yaml:"keep-alive-period" json:"keep-alive-period" koanf:"keep-alive-period"`
TCPRBufSize int `yaml:"tcp-r-buf-size" json:"tcp-r-buf-size" koanf:"tcp-r-buf-size"`
TCPWBufSize int `yaml:"tcp-w-buf-size" json:"tcp-w-buf-size" koanf:"tcp-w-buf-size"`
TCPReadTimeout time.Duration `yaml:"tcp-read-timeout" json:"tcp-read-timeout" koanf:"tcp-read-timeout"`
TCPWriteTimeout time.Duration `yaml:"tcp-write-timeout" json:"tcp-write-timeout" koanf:"tcp-write-timeout"`
WaitTimeout time.Duration `yaml:"wait-timeout" json:"wait-timeout" koanf:"wait-timeout"`
MaxMsgLen int `yaml:"max-msg-len" json:"max-msg-len" koanf:"max-msg-len"`
SessionName string `yaml:"session-name" json:"session-name" koanf:"session-name"`
CronPeriod time.Duration `yaml:"cron-period" json:"cron-period" koanf:"cron-period"`
}

// RegisterFlagsWithPrefix for Config.
func (cfg *SessionConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.CompressEncoding, prefix+".compress-encoding", false, "Enable eompress encoding")
f.BoolVar(&cfg.TCPNoDelay, prefix+".tcp-no-delay", true, "Disable the nagle algorithm.")
f.BoolVar(&cfg.TCPKeepAlive, prefix+".tcp-keep-alive", true, "Keep connection alive.")
f.DurationVar(&cfg.KeepAlivePeriod, prefix+".keep-alive-period", 3*time.Minute, "Period between keep-alives.")
f.IntVar(&cfg.TCPRBufSize, prefix+".tcp-r-buf-size", 262144, "The size of the socket receive buffer.")
f.IntVar(&cfg.TCPWBufSize, prefix+".tcp-w-buf-size", 65536, "The size of the socket send buffer.")
f.DurationVar(&cfg.TCPReadTimeout, prefix+".tcp-read-timeout", time.Second, "The read timeout of the channel.")
f.DurationVar(&cfg.TCPWriteTimeout, prefix+".tcp-write-timeout", 5*time.Second, "The write timeout of the channel.")
f.DurationVar(&cfg.WaitTimeout, prefix+".wait-timeout", time.Second, "Maximum wait time when session got error or got exit signal.")
f.IntVar(&cfg.MaxMsgLen, prefix+".max-msg-len", 102400, "maximum package length of every package in (EventListener)OnMessage(@pkgs).")
f.StringVar(&cfg.SessionName, prefix+".session-name", "client", "The session name.")
f.DurationVar(&cfg.CronPeriod, prefix+".cron-period", time.Second, "The session heartbeat period.")
}

+ 17
- 12
testdata/conf/seatago.yml View File

@@ -141,15 +141,20 @@ seata:
log-table-name: tcc_fence_log_test
clean-period: 60s
# getty configuration
getty-session-param:
compress-encoding: false
tcp-no-delay: true
tcp-keep-alive: true
keep-alive-period: 120s
tcp-r-buf-size: 262144
tcp-w-buf-size: 65536
tcp-read-timeout: 1s
tcp-write-timeout: 5s
wait-timeout: 1s
max-msg-len: 16498688
session-name: client
getty:
reconnect-interval: 0
connection-num: 16
heartbeat-period: 15s
session:
compress-encoding: false
tcp-no-delay: true
tcp-keep-alive: true
keep-alive-period: 120s
tcp-r-buf-size: 262144
tcp-w-buf-size: 65536
tcp-read-timeout: 1s
tcp-write-timeout: 5s
wait-timeout: 1s
max-msg-len: 16498688
session-name: client_test
cron-period: 1s

+ 17
- 0
testdata/context.go View File

@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package testdata

import (


+ 17
- 0
testdata/mock_tcc.go View File

@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package testdata

import (


Loading…
Cancel
Save