@@ -26,7 +26,7 @@ seata: | |||
heartbeat: true | |||
# the client batch send request enable | |||
enableClientBatchSendRequest: true | |||
compressor: nome | |||
compressor: none | |||
service: | |||
@@ -100,9 +100,9 @@ seata: | |||
# Allow batch sending of requests (RM) | |||
enable-rm-client-batch-send-request: true | |||
# RM send request timeout | |||
rpc-rm-request-timeout: 3s | |||
rpc-rm-request-timeout: 30s | |||
# TM send request timeout | |||
rpc-tm-request-timeout: 3s | |||
rpc-tm-request-timeout: 30s | |||
# Configuration Center | |||
config: | |||
type: file | |||
@@ -61,15 +61,17 @@ func (c *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||
} | |||
type Config struct { | |||
TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"` | |||
ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"` | |||
GettyConfig getty.Config `yaml:"getty" json:"getty" koanf:"getty"` | |||
TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"` | |||
ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"` | |||
GettyConfig getty.Config `yaml:"getty" json:"getty" koanf:"getty"` | |||
TransportConfig getty.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"` | |||
} | |||
func (c *Config) RegisterFlags(f *flag.FlagSet) { | |||
c.TCCConfig.FenceConfig.RegisterFlagsWithPrefix("tcc", f) | |||
c.ClientConfig.RegisterFlagsWithPrefix("client", f) | |||
c.GettyConfig.RegisterFlagsWithPrefix("getty", f) | |||
c.TransportConfig.RegisterFlagsWithPrefix("transport", f) | |||
} | |||
type loaderConf struct { | |||
@@ -63,13 +63,28 @@ func TestLoadPath(t *testing.T) { | |||
assert.Equal(t, "client_test", cfg.GettyConfig.SessionConfig.SessionName) | |||
assert.Equal(t, time.Second, cfg.GettyConfig.SessionConfig.CronPeriod) | |||
assert.NotNil(t, cfg.TransportConfig) | |||
assert.NotNil(t, cfg.TransportConfig.ShutdownConfig) | |||
assert.Equal(t, time.Second*3, cfg.TransportConfig.ShutdownConfig.Wait) | |||
assert.Equal(t, "TCP", cfg.TransportConfig.Type) | |||
assert.Equal(t, "NIO", cfg.TransportConfig.Server) | |||
assert.Equal(t, true, cfg.TransportConfig.Heartbeat) | |||
assert.Equal(t, "seata", cfg.TransportConfig.Serialization) | |||
assert.Equal(t, "none", cfg.TransportConfig.Compressor) | |||
assert.Equal(t, false, cfg.TransportConfig.EnableTmClientBatchSendRequest) | |||
assert.Equal(t, true, cfg.TransportConfig.EnableRmClientBatchSendRequest) | |||
assert.Equal(t, time.Second*30, cfg.TransportConfig.RPCRmRequestTimeout) | |||
assert.Equal(t, time.Second*30, cfg.TransportConfig.RPCTmRequestTimeout) | |||
// reset flag.CommandLine | |||
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) | |||
} | |||
func TestLoadJson(t *testing.T) { | |||
confJson := `{"client":{"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}}, | |||
"getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}}}` | |||
"getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}}, | |||
"transport":{"shutdown":{"wait":"3s"}, "type":"TCP", "server":"NIO", "heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"}}` | |||
cfg := LoadJson([]byte(confJson)) | |||
assert.NotNil(t, cfg) | |||
@@ -106,6 +121,19 @@ func TestLoadJson(t *testing.T) { | |||
assert.Equal(t, "client_test", cfg.GettyConfig.SessionConfig.SessionName) | |||
assert.Equal(t, time.Second*2, cfg.GettyConfig.SessionConfig.CronPeriod) | |||
assert.NotNil(t, cfg.TransportConfig) | |||
assert.NotNil(t, cfg.TransportConfig.ShutdownConfig) | |||
assert.Equal(t, time.Second*3, cfg.TransportConfig.ShutdownConfig.Wait) | |||
assert.Equal(t, "TCP", cfg.TransportConfig.Type) | |||
assert.Equal(t, "NIO", cfg.TransportConfig.Server) | |||
assert.Equal(t, true, cfg.TransportConfig.Heartbeat) | |||
assert.Equal(t, "seata", cfg.TransportConfig.Serialization) | |||
assert.Equal(t, "none", cfg.TransportConfig.Compressor) | |||
assert.Equal(t, false, cfg.TransportConfig.EnableTmClientBatchSendRequest) | |||
assert.Equal(t, true, cfg.TransportConfig.EnableRmClientBatchSendRequest) | |||
assert.Equal(t, time.Second*30, cfg.TransportConfig.RPCRmRequestTimeout) | |||
assert.Equal(t, time.Second*30, cfg.TransportConfig.RPCTmRequestTimeout) | |||
// reset flag.CommandLine | |||
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) | |||
} |
@@ -84,8 +84,8 @@ func TestInit(t *testing.T) { | |||
assert.Equal(t, DefaultSeataConf.Seata.Transport.Compressor, "none") | |||
assert.Equal(t, DefaultSeataConf.Seata.Transport.EnableTmClientBatchSendRequest, false) | |||
assert.Equal(t, DefaultSeataConf.Seata.Transport.EnableRmClientBatchSendRequest, true) | |||
assert.Equal(t, DefaultSeataConf.Seata.Transport.RPCRmRequestTimeout, time.Duration(3_000_000_000)) | |||
assert.Equal(t, DefaultSeataConf.Seata.Transport.RPCTmRequestTimeout, time.Duration(3_000_000_000)) | |||
assert.Equal(t, DefaultSeataConf.Seata.Transport.RPCRmRequestTimeout, time.Duration(30_000_000_000)) | |||
assert.Equal(t, DefaultSeataConf.Seata.Transport.RPCTmRequestTimeout, time.Duration(30_000_000_000)) | |||
// config | |||
assert.Equal(t, DefaultSeataConf.Seata.Config.Type, "file") | |||
@@ -40,12 +40,12 @@ type deleteExecutor struct { | |||
execContent *types.ExecContext | |||
} | |||
//NewDeleteExecutor get delete executor | |||
// NewDeleteExecutor get delete executor | |||
func NewDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) executor { | |||
return &deleteExecutor{parserCtx: parserCtx, execContent: execContent, baseExecutor: baseExecutor{hooks: hooks}} | |||
} | |||
//ExecContext exec SQL, and generate before image and after image | |||
// ExecContext exec SQL, and generate before image and after image | |||
func (d deleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) { | |||
d.beforeHooks(ctx, d.execContent) | |||
defer func() { | |||
@@ -72,7 +72,7 @@ func (d deleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithName | |||
return res, nil | |||
} | |||
//beforeImage build before image | |||
// beforeImage build before image | |||
func (d *deleteExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) { | |||
selectSQL, selectArgs, err := d.buildBeforeImageSQL(d.execContent.Query, d.execContent.NamedValues) | |||
if err != nil { | |||
@@ -148,7 +148,7 @@ func (d *deleteExecutor) buildBeforeImageSQL(query string, args []driver.NamedVa | |||
return sql, d.buildSelectArgs(&selStmt, args), nil | |||
} | |||
//afterImage build after image | |||
// afterImage build after image | |||
func (d *deleteExecutor) afterImage(ctx context.Context) (*types.RecordImage, error) { | |||
tableName, _ := d.parserCtx.GteTableName() | |||
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, d.execContent.DBName, tableName) | |||
@@ -45,7 +45,7 @@ type insertExecutor struct { | |||
businesSQLResult types.ExecResult | |||
} | |||
//NewInsertExecutor get insert executor | |||
// NewInsertExecutor get insert executor | |||
func NewInsertExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) executor { | |||
return &insertExecutor{parserCtx: parserCtx, execContent: execContent, baseExecutor: baseExecutor{hooks: hooks}} | |||
} | |||
@@ -76,7 +76,7 @@ func (i *insertExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNam | |||
return res, nil | |||
} | |||
//beforeImage build before image | |||
// beforeImage build before image | |||
func (i *insertExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) { | |||
tableName, _ := i.parserCtx.GteTableName() | |||
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContent.DBName, tableName) | |||
@@ -86,7 +86,7 @@ func (i *insertExecutor) beforeImage(ctx context.Context) (*types.RecordImage, e | |||
return types.NewEmptyRecordImage(metaData, types.SQLTypeInsert), nil | |||
} | |||
//afterImage build after image | |||
// afterImage build after image | |||
func (i *insertExecutor) afterImage(ctx context.Context) (*types.RecordImage, error) { | |||
if !i.isAstStmtValid() { | |||
return nil, nil | |||
@@ -29,6 +29,23 @@ type Config struct { | |||
SessionConfig SessionConfig `yaml:"session" json:"session" koanf:"session"` | |||
} | |||
type ShutdownConfig struct { | |||
Wait time.Duration `yaml:"wait" json:"wait" konaf:"wait"` | |||
} | |||
type TransportConfig struct { | |||
ShutdownConfig ShutdownConfig `yaml:"shutdown" json:"shutdown" koanf:"shutdown"` | |||
Type string `yaml:"type" json:"type" koanf:"type"` | |||
Server string `yaml:"server" json:"server" koanf:"server"` | |||
Heartbeat bool `yaml:"heartbeat" json:"heartbeat" koanf:"heartbeat"` | |||
Serialization string `yaml:"serialization" json:"serialization" koanf:"serialization"` | |||
Compressor string `yaml:"compressor" json:"compressor" koanf:"compressor"` | |||
EnableTmClientBatchSendRequest bool `yaml:"enable-tm-client-batch-send-request" json:"enable-tm-client-batch-send-request" koanf:"enable-tm-client-batch-send-request"` | |||
EnableRmClientBatchSendRequest bool `yaml:"enable-rm-client-batch-send-request" json:"enable-rm-client-batch-send-request" koanf:"enable-rm-client-batch-send-request"` | |||
RPCRmRequestTimeout time.Duration `yaml:"rpc-rm-request-timeout" json:"rpc-rm-request-timeout" koanf:"rpc-rm-request-timeout"` | |||
RPCTmRequestTimeout time.Duration `yaml:"rpc-tm-request-timeout" json:"rpc-tm-request-timeout" koanf:"rpc-tm-request-timeout"` | |||
} | |||
// RegisterFlagsWithPrefix for Config. | |||
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||
f.IntVar(&cfg.ReconnectInterval, prefix+".reconnect-interval", 0, "Reconnect interval.") | |||
@@ -36,3 +53,20 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||
f.DurationVar(&cfg.HeartbeatPeriod, prefix+".heartbeat-period", 15*time.Second, "The heartbeat period.") | |||
cfg.SessionConfig.RegisterFlagsWithPrefix(prefix+".session", f) | |||
} | |||
func (cfg *ShutdownConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||
f.DurationVar(&cfg.Wait, prefix+".wait", 3*time.Second, "Shutdown wait time.") | |||
} | |||
func (cfg *TransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||
cfg.ShutdownConfig.RegisterFlagsWithPrefix(prefix+".shutdown", f) | |||
f.StringVar(&cfg.Type, prefix+".type", "TCP", "Transport protocol type.") | |||
f.StringVar(&cfg.Server, prefix+".server", "NIO", "Server type.") | |||
f.BoolVar(&cfg.Heartbeat, prefix+".heartbeat", true, "Heartbeat.") | |||
f.StringVar(&cfg.Serialization, prefix+".serialization", "seata", "Encoding and decoding mode.") | |||
f.StringVar(&cfg.Compressor, prefix+".compressor", "none", "Message compression mode.") | |||
f.BoolVar(&cfg.EnableTmClientBatchSendRequest, prefix+".enable-tm-client-batch-send-request", false, "Allow batch sending of requests (TM).") | |||
f.BoolVar(&cfg.EnableRmClientBatchSendRequest, prefix+".enable-rm-client-batch-send-request", true, "Allow batch sending of requests (RM).") | |||
f.DurationVar(&cfg.RPCRmRequestTimeout, prefix+".rpc-rm-request-timeout", 30*time.Second, "RM send request timeout.") | |||
f.DurationVar(&cfg.RPCTmRequestTimeout, prefix+".rpc-tm-request-timeout", 30*time.Second, "TM send request timeout.") | |||
} |
@@ -100,9 +100,9 @@ seata: | |||
# Allow batch sending of requests (RM) | |||
enable-rm-client-batch-send-request: true | |||
# RM send request timeout | |||
rpc-rm-request-timeout: 3s | |||
rpc-rm-request-timeout: 30s | |||
# TM send request timeout | |||
rpc-tm-request-timeout: 3s | |||
rpc-tm-request-timeout: 30s | |||
# Configuration Center | |||
config: | |||
type: file | |||