@@ -26,6 +26,8 @@ import ( | |||
"runtime" | |||
"strings" | |||
"github.com/seata/seata-go/pkg/datasource/sql/undo" | |||
"github.com/knadh/koanf" | |||
"github.com/knadh/koanf/parsers/json" | |||
"github.com/knadh/koanf/parsers/toml" | |||
@@ -51,15 +53,15 @@ const ( | |||
) | |||
type ClientConfig struct { | |||
TmConfig tm.TmConfig `yaml:"tm" json:"tm,omitempty" koanf:"tm"` | |||
RmConfig rm.Config `yaml:"rm" json:"rm,omitempty" koanf:"rm"` | |||
TmConfig tm.TmConfig `yaml:"tm" json:"tm,omitempty" koanf:"tm"` | |||
RmConfig rm.Config `yaml:"rm" json:"rm,omitempty" koanf:"rm"` | |||
UndoConfig undo.Config `yaml:"undo" json:"undo,omitempty" koanf:"undo"` | |||
} | |||
func (c *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||
// TODO: Undo RegisterFlagsWithPrefix | |||
// TODO: LoadBalance RegisterFlagsWithPrefix | |||
c.TmConfig.RegisterFlagsWithPrefix(prefix+".tm", f) | |||
c.RmConfig.RegisterFlagsWithPrefix(prefix+".rm", f) | |||
c.UndoConfig.RegisterFlagsWithPrefix(prefix+".undo", f) | |||
} | |||
type Config struct { | |||
@@ -87,7 +89,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { | |||
f.BoolVar(&c.EnableAutoDataSourceProxy, "enable-auto-data-source-proxy", true, "Whether enable auto proxying of datasource bean.") | |||
f.StringVar(&c.DataSourceProxyMode, "data-source-proxy-mode", "AT", "Data source proxy mode.") | |||
c.TCCConfig.FenceConfig.RegisterFlagsWithPrefix("tcc", f) | |||
c.TCCConfig.RegisterFlagsWithPrefix("tcc", f) | |||
c.ClientConfig.RegisterFlagsWithPrefix("client", f) | |||
c.GettyConfig.RegisterFlagsWithPrefix("getty", f) | |||
c.TransportConfig.RegisterFlagsWithPrefix("transport", f) | |||
@@ -66,6 +66,16 @@ func TestLoadPath(t *testing.T) { | |||
assert.Equal(t, time.Second*30, cfg.ClientConfig.RmConfig.LockConfig.RetryTimes) | |||
assert.Equal(t, true, cfg.ClientConfig.RmConfig.LockConfig.RetryPolicyBranchRollbackOnConflict) | |||
assert.NotNil(t, cfg.ClientConfig.UndoConfig) | |||
assert.Equal(t, true, cfg.ClientConfig.UndoConfig.DataValidation) | |||
assert.Equal(t, "jackson", cfg.ClientConfig.UndoConfig.LogSerialization) | |||
assert.Equal(t, "undo_log", cfg.ClientConfig.UndoConfig.LogTable) | |||
assert.Equal(t, true, cfg.ClientConfig.UndoConfig.OnlyCareUpdateColumns) | |||
assert.NotNil(t, cfg.ClientConfig.UndoConfig.CompressConfig) | |||
assert.Equal(t, true, cfg.ClientConfig.UndoConfig.CompressConfig.Enable) | |||
assert.Equal(t, "zip", cfg.ClientConfig.UndoConfig.CompressConfig.Type) | |||
assert.Equal(t, "64k", cfg.ClientConfig.UndoConfig.CompressConfig.Threshold) | |||
assert.NotNil(t, cfg.GettyConfig) | |||
assert.NotNil(t, cfg.GettyConfig.SessionConfig) | |||
assert.Equal(t, 0, cfg.GettyConfig.ReconnectInterval) | |||
@@ -108,12 +118,7 @@ func TestLoadPath(t *testing.T) { | |||
} | |||
func TestLoadJson(t *testing.T) { | |||
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT", | |||
"client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":10,"retry-times":"30s","retry-policy-branch-rollback-on-conflict":true}}, | |||
"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}}, | |||
"getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}}, | |||
"transport":{"shutdown":{"wait":"3s"}, "type":"TCP", "server":"NIO", "heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"}, | |||
"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}` | |||
confJson := `{"enabled":false,"application-id":"application_test","tx-service-group":"default_tx_group","access-key":"test","secret-key":"test","enable-auto-data-source-proxy":false,"data-source-proxy-mode":"AT","client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":10,"retry-times":"30s","retry-policy-branch-rollback-on-conflict":true}},"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648},"undo":{"data-validation":false,"log-serialization":"jackson222","only-care-update-columns":false,"log-table":"undo_log333","compress":{"enable":false,"type":"zip111","threshold":"128k"}}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},"getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"},"service":{"enable-degrade":true,"disable-global-transaction":true,"vgroup-mapping":{"default_tx_group":"default_test"},"grouplist":{"default":"127.0.0.1:8092"}}}` | |||
cfg := LoadJson([]byte(confJson)) | |||
assert.NotNil(t, cfg) | |||
assert.Equal(t, false, cfg.Enabled) | |||
@@ -138,6 +143,16 @@ func TestLoadJson(t *testing.T) { | |||
assert.Equal(t, time.Second*30, cfg.ClientConfig.RmConfig.LockConfig.RetryTimes) | |||
assert.Equal(t, true, cfg.ClientConfig.RmConfig.LockConfig.RetryPolicyBranchRollbackOnConflict) | |||
assert.NotNil(t, cfg.ClientConfig.UndoConfig) | |||
assert.Equal(t, false, cfg.ClientConfig.UndoConfig.DataValidation) | |||
assert.Equal(t, "jackson222", cfg.ClientConfig.UndoConfig.LogSerialization) | |||
assert.Equal(t, "undo_log333", cfg.ClientConfig.UndoConfig.LogTable) | |||
assert.Equal(t, false, cfg.ClientConfig.UndoConfig.OnlyCareUpdateColumns) | |||
assert.NotNil(t, cfg.ClientConfig.UndoConfig.CompressConfig) | |||
assert.Equal(t, false, cfg.ClientConfig.UndoConfig.CompressConfig.Enable) | |||
assert.Equal(t, "zip111", cfg.ClientConfig.UndoConfig.CompressConfig.Type) | |||
assert.Equal(t, "128k", cfg.ClientConfig.UndoConfig.CompressConfig.Threshold) | |||
assert.NotNil(t, cfg.TCCConfig) | |||
assert.NotNil(t, cfg.TCCConfig.FenceConfig) | |||
assert.Equal(t, "tcc_fence_log_test2", cfg.TCCConfig.FenceConfig.LogTableName) | |||
@@ -20,15 +20,15 @@ package at | |||
import ( | |||
"context" | |||
"database/sql/driver" | |||
"github.com/agiledragon/gomonkey" | |||
"github.com/seata/seata-go/pkg/datasource/sql/datasource" | |||
"github.com/seata/seata-go/pkg/datasource/sql/datasource/mysql" | |||
"reflect" | |||
"testing" | |||
"github.com/agiledragon/gomonkey" | |||
"github.com/arana-db/parser/ast" | |||
"github.com/arana-db/parser/model" | |||
"github.com/arana-db/parser/test_driver" | |||
"github.com/seata/seata-go/pkg/datasource/sql/datasource" | |||
"github.com/seata/seata-go/pkg/datasource/sql/datasource/mysql" | |||
"github.com/seata/seata-go/pkg/datasource/sql/exec" | |||
"github.com/seata/seata-go/pkg/datasource/sql/parser" | |||
"github.com/seata/seata-go/pkg/datasource/sql/types" | |||
@@ -0,0 +1,50 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package undo | |||
import ( | |||
"flag" | |||
) | |||
type CompressConfig struct { | |||
Enable bool `yaml:"enable" json:"enable,omitempty" koanf:"enable"` | |||
Type string `yaml:"type" json:"type,omitempty" koanf:"type"` | |||
Threshold string `yaml:"threshold" json:"threshold,omitempty" koanf:"threshold"` | |||
} | |||
type Config struct { | |||
DataValidation bool `yaml:"data-validation" json:"data-validation,omitempty" koanf:"data-validation"` | |||
LogSerialization string `yaml:"log-serialization" json:"log-serialization,omitempty" koanf:"log-serialization"` | |||
LogTable string `yaml:"log-table" json:"log-table,omitempty" koanf:"log-table"` | |||
OnlyCareUpdateColumns bool `yaml:"only-care-update-columns" json:"only-care-update-columns,omitempty" koanf:"only-care-update-columns"` | |||
CompressConfig CompressConfig `yaml:"compress" json:"compress,omitempty" koanf:"compress"` | |||
} | |||
func (u *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||
f.BoolVar(&u.DataValidation, prefix+".data-validation", true, "Judge whether the before image and after image are the same,If it is the same, undo will not be recorded") | |||
f.StringVar(&u.LogSerialization, prefix+".log-serialization", "jackson", "Serialization method.") | |||
f.StringVar(&u.LogTable, prefix+".log-table", "undo_log", "undo log table name.") | |||
f.BoolVar(&u.OnlyCareUpdateColumns, prefix+".only-care-update-columns", true, "The switch for degrade check.") | |||
u.CompressConfig.RegisterFlagsWithPrefix(prefix+".compress", f) | |||
} | |||
func (c *CompressConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||
f.BoolVar(&c.Enable, prefix+".enable", true, "Whether compression is required.") | |||
f.StringVar(&c.Type, prefix+".type", "zip", "Compression type") | |||
f.StringVar(&c.Threshold, prefix+".threshold", "64k", "Compression threshold") | |||
} |
@@ -1,51 +0,0 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package undo | |||
import ( | |||
"flag" | |||
) | |||
type CompressConfig struct { | |||
Enable bool `yaml:"enable" json:"enable,omitempty" ` | |||
Type string `yaml:"type" json:"type,omitempty" ` | |||
Threshold int `yaml:"threshold" json:"threshold,omitempty" ` | |||
} | |||
type UndoConfig struct { | |||
DataValidation bool `yaml:"data-validation" json:"data-validation,omitempty" ` | |||
LogSerialization string `yaml:"log-serialization" json:"log-serialization,omitempty" ` | |||
LogTable string `yaml:"log-table" json:"log-table,omitempty" ` | |||
OnlyCareUpdateColumns bool `yaml:"only-care-update-columns" json:"only-care-update-columns,omitempty" ` | |||
Compress CompressConfig `yaml:"compress" json:"compress,omitempty" ` | |||
} | |||
func (ufg *UndoConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||
f.BoolVar(&ufg.DataValidation, prefix+".data-validation", true, "Judge whether the before image and after image are the same,If it is the same, undo will not be recorded") | |||
f.StringVar(&ufg.LogSerialization, prefix+".log-serialization", "jackson", "Serialization method.") | |||
f.StringVar(&ufg.LogTable, prefix+".log-table", "undo_log", "undo log table name.") | |||
f.BoolVar(&ufg.OnlyCareUpdateColumns, prefix+".only-care-update-columns", true, "The switch for degrade check.") | |||
} | |||
// RegisterFlagsWithPrefix for Compress. | |||
func (cfg *CompressConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { | |||
f.BoolVar(&cfg.Enable, prefix+".log-table-name", true, "Whether compression is required.") | |||
f.StringVar(&cfg.Type, prefix+".clean-period", "zip", "Compression type") | |||
f.IntVar(&cfg.Threshold, prefix+".clean-period", 64, "Compression threshold Unit: k") | |||
} |
@@ -1,54 +0,0 @@ | |||
package undo | |||
import ( | |||
"flag" | |||
"testing" | |||
) | |||
func TestCompressConfig_RegisterFlagsWithPrefix(t *testing.T) { | |||
type fields struct { | |||
Enable bool | |||
Type string | |||
Threshold int | |||
} | |||
type args struct { | |||
prefix string | |||
f *flag.FlagSet | |||
} | |||
tests := []struct { | |||
name string | |||
fields fields | |||
args args | |||
}{ | |||
// TODO: Add test cases. | |||
} | |||
for _, tt := range tests { | |||
t.Run(tt.name, func(t *testing.T) { | |||
}) | |||
} | |||
} | |||
func TestUndoConfig_RegisterFlagsWithPrefix(t *testing.T) { | |||
type fields struct { | |||
DataValidation bool | |||
LogSerialization string | |||
LogTable string | |||
OnlyCareUpdateColumns bool | |||
Compress CompressConfig | |||
} | |||
type args struct { | |||
prefix string | |||
f *flag.FlagSet | |||
} | |||
tests := []struct { | |||
name string | |||
fields fields | |||
args args | |||
}{ | |||
// TODO: Add test cases. | |||
} | |||
for _, tt := range tests { | |||
t.Run(tt.name, func(t *testing.T) { | |||
}) | |||
} | |||
} |
@@ -19,8 +19,9 @@ package tm | |||
import ( | |||
"flag" | |||
"github.com/seata/seata-go/pkg/util/flagext" | |||
"time" | |||
"github.com/seata/seata-go/pkg/util/flagext" | |||
) | |||
type TmConfig struct { | |||
@@ -70,7 +70,7 @@ seata: | |||
# Compression type | |||
type: zip | |||
# Compression threshold Unit: k | |||
threshold: 64 | |||
threshold: 64k | |||
load-balance: | |||
type: RandomLoadBalance | |||
virtual-nodes: 10 | |||