From 5e858b1f4575feaa6c1543d4f743952f45409be0 Mon Sep 17 00:00:00 2001 From: Yuecai Liu <38887641+luky116@users.noreply.github.com> Date: Wed, 7 Dec 2022 22:42:32 +0800 Subject: [PATCH] add tcc fence config logic (#383) * add tcc fence config logic * add string loader * fix samples * fix samples * fix samples * fix samples * fix samples * fix samples * fix samples * merge Load and LoadPath * fix sample --- go.mod | 2 +- pkg/client/client.go | 34 ++- pkg/client/config.go | 205 ++++++++++++++++++ pkg/client/config_test.go | 52 +++++ .../tm_client.go => rm/tcc/fence/fence.go} | 30 +-- pkg/{client/rm_client.go => rm/tcc/tcc.go} | 17 +- pkg/tm/init.go | 44 ---- testdata/conf/seatago.yml | 155 +++++++++++++ 8 files changed, 468 insertions(+), 71 deletions(-) create mode 100644 pkg/client/config.go create mode 100644 pkg/client/config_test.go rename pkg/{client/tm_client.go => rm/tcc/fence/fence.go} (59%) rename pkg/{client/rm_client.go => rm/tcc/tcc.go} (69%) delete mode 100644 pkg/tm/init.go create mode 100644 testdata/conf/seatago.yml diff --git a/go.mod b/go.mod index afbff7d1..93020fe1 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/goccy/go-json v0.9.7 github.com/golang/mock v1.6.0 github.com/google/uuid v1.3.0 + github.com/knadh/koanf v1.4.3 github.com/mitchellh/copystructure v1.2.0 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/parnurzeal/gorequest v0.2.16 @@ -81,7 +82,6 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/k0kubun/pp v3.0.1+incompatible // indirect - github.com/knadh/koanf v1.4.3 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.6 // indirect diff --git a/pkg/client/client.go b/pkg/client/client.go index 0637e8c7..2a37247a 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -18,13 +18,41 @@ package client import ( + "sync" + _ "github.com/seata/seata-go/pkg/integration" + "github.com/seata/seata-go/pkg/remoting/getty" _ "github.com/seata/seata-go/pkg/remoting/processor/client" _ "github.com/seata/seata-go/pkg/rm/tcc" ) -// Init init seata client +// Init seata client client func Init() { - initRmClient() - initTmClient() + InitPath("") +} + +// Init init client client with config path +func InitPath(configFilePath string) { + cfg := LoadPath(configFilePath) + + initRmClient(cfg) + initTmClient(cfg) +} + +var onceInitTmClient sync.Once + +// InitTmClient init client tm client +func initTmClient(cfg *Config) { + onceInitTmClient.Do(func() { + initRemoting(cfg) + }) +} + +// initRemoting init rpc client +func initRemoting(cfg *Config) { + getty.InitRpcClient() +} + +// InitRmClient init client rm client +func initRmClient(cfg *Config) { } diff --git a/pkg/client/config.go b/pkg/client/config.go new file mode 100644 index 00000000..584423fd --- /dev/null +++ b/pkg/client/config.go @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + "flag" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strings" + + "github.com/knadh/koanf" + "github.com/knadh/koanf/parsers/json" + "github.com/knadh/koanf/parsers/toml" + "github.com/knadh/koanf/parsers/yaml" + "github.com/knadh/koanf/providers/rawbytes" + "github.com/seata/seata-go/pkg/rm/tcc" + "github.com/seata/seata-go/pkg/util/flagext" +) + +const ( + configFileEnvKey = "SEATA_GO_CONFIG_PATH" + configPrefix = "seata" +) + +const ( + jsonSuffix = "json" + tomlSuffix = "toml" + yamlSuffix = "yaml" + ymlSuffix = "yml" +) + +type Config struct { + TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"` +} + +func (c *Config) RegisterFlags(f *flag.FlagSet) { + c.TCCConfig.FenceConfig.RegisterFlagsWithPrefix("tcc", f) +} + +type loaderConf struct { + suffix string // loaderConf file extension default yaml + path string // loaderConf file path default ./conf/seatago.yaml + delim string // loaderConf file delim default . + bytes []byte // config bytes + name string // config file name +} + +// Load parse config from user config path +func LoadPath(configFilePath string) *Config { + if configFilePath == "" { + configFilePath = os.Getenv(configFileEnvKey) + if configFilePath == "" { + panic("system variable SEATA_GO_CONFIG_PATH is empty") + } + } + + var cfg Config + // This sets default values from flags to the config. + // It needs to be called before parsing the config file! + flagext.RegisterFlags(&cfg) + + conf := newLoaderConf(configFilePath) + koan := getConfigResolver(conf) + if err := koan.UnmarshalWithConf(configPrefix, &cfg, koanf.UnmarshalConf{Tag: yamlSuffix}); err != nil { + panic(err) + } + return &cfg +} + +// Load parse config from json bytes +func LoadJson(bytes []byte) *Config { + var cfg Config + // This sets default values from flags to the config. + // It needs to be called before parsing the config file! + flagext.RegisterFlags(&cfg) + + koan := getJsonConfigResolver(bytes) + if err := koan.Unmarshal("", &cfg); err != nil { + panic(err) + } + return &cfg +} + +// getJsonConfigResolver get json config resolver +func getJsonConfigResolver(bytes []byte) *koanf.Koanf { + k := koanf.New(".") + if err := k.Load(rawbytes.Provider(bytes), json.Parser()); err != nil { + panic(err) + } + return k +} + +//resolverFilePath resolver file path +// eg: give a ./conf/seatago.yaml return seatago and yaml +func resolverFilePath(path string) (name, suffix string) { + paths := strings.Split(path, "/") + fileName := strings.Split(paths[len(paths)-1], ".") + if len(fileName) < 2 { + return fileName[0], yamlSuffix + } + return fileName[0], fileName[1] +} + +// getConfigResolver get config resolver +func getConfigResolver(conf *loaderConf) *koanf.Koanf { + var ( + k *koanf.Koanf + err error + ) + if len(conf.suffix) <= 0 { + conf.suffix = yamlSuffix + } + if len(conf.delim) <= 0 { + conf.delim = "." + } + bytes := conf.bytes + if len(bytes) <= 0 { + panic(fmt.Errorf("bytes is nil,please set bytes or file path")) + } + k = koanf.New(conf.delim) + + switch conf.suffix { + case yamlSuffix, ymlSuffix: + err = k.Load(rawbytes.Provider(bytes), yaml.Parser()) + case jsonSuffix: + err = k.Load(rawbytes.Provider(bytes), json.Parser()) + case tomlSuffix: + err = k.Load(rawbytes.Provider(bytes), toml.Parser()) + default: + err = fmt.Errorf("no support %s file suffix", conf.suffix) + } + + if err != nil { + panic(err) + } + return k +} + +func newLoaderConf(configFilePath string) *loaderConf { + name, suffix := resolverFilePath(configFilePath) + conf := &loaderConf{ + suffix: suffix, + path: absolutePath(configFilePath), + delim: ".", + name: name, + } + + if len(conf.bytes) <= 0 { + if bytes, err := ioutil.ReadFile(conf.path); err != nil { + panic(err) + } else { + conf.bytes = bytes + } + } + return conf +} + +// absolutePath get absolut path +func absolutePath(inPath string) string { + + if inPath == "$HOME" || strings.HasPrefix(inPath, "$HOME"+string(os.PathSeparator)) { + inPath = userHomeDir() + inPath[5:] + } + + if filepath.IsAbs(inPath) { + return filepath.Clean(inPath) + } + + p, err := filepath.Abs(inPath) + if err == nil { + return filepath.Clean(p) + } + + return "" +} + +//userHomeDir get gopath +func userHomeDir() string { + if runtime.GOOS == "windows" { + home := os.Getenv("HOMEDRIVE") + os.Getenv("HOMEPATH") + if home == "" { + home = os.Getenv("USERPROFILE") + } + return home + } + return os.Getenv("HOME") +} diff --git a/pkg/client/config_test.go b/pkg/client/config_test.go new file mode 100644 index 00000000..9e53a6f8 --- /dev/null +++ b/pkg/client/config_test.go @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + "flag" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestLoadPath(t *testing.T) { + cfg := LoadPath("../../testdata/conf/seatago.yml") + assert.NotNil(t, cfg) + assert.NotNil(t, cfg.TCCConfig) + assert.NotNil(t, cfg.TCCConfig.FenceConfig) + + assert.Equal(t, "tcc_fence_log_test", cfg.TCCConfig.FenceConfig.LogTableName) + assert.Equal(t, time.Second*60, cfg.TCCConfig.FenceConfig.CleanPeriod) + // reset flag.CommandLine + flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) +} + +func TestLoadJson(t *testing.T) { + confJson := `{"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}}}` + cfg := LoadJson([]byte(confJson)) + assert.NotNil(t, cfg) + assert.NotNil(t, cfg.TCCConfig) + assert.NotNil(t, cfg.TCCConfig.FenceConfig) + + assert.Equal(t, "tcc_fence_log_test2", cfg.TCCConfig.FenceConfig.LogTableName) + assert.Equal(t, time.Second*80, cfg.TCCConfig.FenceConfig.CleanPeriod) + // reset flag.CommandLine + flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) +} diff --git a/pkg/client/tm_client.go b/pkg/rm/tcc/fence/fence.go similarity index 59% rename from pkg/client/tm_client.go rename to pkg/rm/tcc/fence/fence.go index b58f4230..6cd15fb9 100644 --- a/pkg/client/tm_client.go +++ b/pkg/rm/tcc/fence/fence.go @@ -15,30 +15,20 @@ * limitations under the License. */ -package client +package fence import ( - "sync" - - "github.com/seata/seata-go/pkg/remoting/getty" + "flag" + "time" ) -var onceInitTmClient sync.Once - -// InitTmClient init seata tm client -func initTmClient() { - onceInitTmClient.Do(func() { - initConfig() - initRemoting() - }) -} - -// todo -// initConfig init config processor -func initConfig() { +type Config struct { + LogTableName string `yaml:"log-table-name" json:"log-table-name" koanf:"log-table-name"` + CleanPeriod time.Duration `yaml:"clean-period" json:"clean-period" koanf:"clean-period"` } -// initRemoting init rpc client -func initRemoting() { - getty.InitRpcClient() +// RegisterFlagsWithPrefix for Config. +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.LogTableName, prefix+".log-table-name", "tcc_fence_log", "Undo log table name.") + f.DurationVar(&cfg.CleanPeriod, prefix+".clean-period", 24*time.Hour, "Undo log retention time.") } diff --git a/pkg/client/rm_client.go b/pkg/rm/tcc/tcc.go similarity index 69% rename from pkg/client/rm_client.go rename to pkg/rm/tcc/tcc.go index c48e2e01..95b05be9 100644 --- a/pkg/client/rm_client.go +++ b/pkg/rm/tcc/tcc.go @@ -15,8 +15,19 @@ * limitations under the License. */ -package client +package tcc -// InitRmClient init seata rm client -func initRmClient() { +import ( + "flag" + + "github.com/seata/seata-go/pkg/rm/tcc/fence" +) + +type Config struct { + FenceConfig fence.Config `yaml:"fence" json:"fence" koanf:"fence"` +} + +// RegisterFlagsWithPrefix for Config. +func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.FenceConfig.RegisterFlagsWithPrefix(prefix+".fence", f) } diff --git a/pkg/tm/init.go b/pkg/tm/init.go deleted file mode 100644 index 302eb8ef..00000000 --- a/pkg/tm/init.go +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package tm - -import ( - "sync" - - "github.com/seata/seata-go/pkg/remoting/getty" -) - -var onceInitTmClient sync.Once - -// InitTmClient init seata tm client -func InitTmClient() { - onceInitTmClient.Do(func() { - initConfig() - initRemoting() - }) -} - -// initConfig init config processor -func initConfig() { - // todo implement -} - -// initRemoting init rpc client -func initRemoting() { - getty.InitRpcClient() -} diff --git a/testdata/conf/seatago.yml b/testdata/conf/seatago.yml new file mode 100644 index 00000000..1b29f28a --- /dev/null +++ b/testdata/conf/seatago.yml @@ -0,0 +1,155 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# time 时间单位对应的是 time.Duration(1) +seata: + enabled: true + # application id + application-id: applicationName + # service group + tx-service-group: default_tx_group + access-key: aliyunAccessKey + secret-key: aliyunSecretKey + enable-auto-data-source-proxy: true + data-source-proxy-mode: AT + client: + rm: + # Maximum cache length of asynchronous queue + async-commit-buffer-limit: 10000 + # The maximum number of retries when report reports the status + report-retry-count: 5 + # The interval for regularly checking the metadata of the db(AT) + table-meta-check-enable: false + # Whether to report the status if the transaction is successfully executed(AT) + report-success-enable: false + # Whether to allow regular check of db metadata(AT) + saga-branch-register-enable: false + saga-json-parser: fastjson + saga-retry-persist-mode-update: false + saga-compensate-persist-mode-update: false + #Ordered.HIGHEST_PRECEDENCE + 1000 # + tcc-action-interceptor-order: -2147482648 + # Parse SQL parser selection + sql-parser-type: druid + lock: + retry-interval: 10 + retry-times: 30s + retry-policy-branch-rollback-on-conflict: true + tm: + commit-retry-count: 5 + rollback-retry-count: 5 + default-global-transaction-timeout: 10s + degrade-check: false + degrade-check-period: 2000 + degrade-check-allow-times: 10s + interceptor-order: -2147482648 + undo: + # Judge whether the before image and after image are the same,If it is the same, undo will not be recorded + data-validation: true + # Serialization method + log-serialization: jackson + # undo log table name + log-table: undo_log + # Only store modified fields + only-care-update-columns: true + compress: + # Whether compression is required + enable: true + # Compression type + type: zip + # Compression threshold Unit: k + threshold: 64 + load-balance: + type: RandomLoadBalance + virtual-nodes: 10 + service: + vgroup-mapping: + # Prefix for Print Log + default_tx_group: default + grouplist: + default: 127.0.0.1:8091 + enable-degrade: false + # close the transaction + disable-global-transaction: false + transport: + shutdown: + wait: 3s + # Netty related configurations + # type + type: TCP + server: NIO + heartbeat: true + # Encoding and decoding mode + serialization: seata + # Message compression mode + compressor: none + # Allow batch sending of requests (TM) + enable-tm-client-batch-send-request: false + # Allow batch sending of requests (RM) + enable-rm-client-batch-send-request: true + # RM send request timeout + rpc-rm-request-timeout: 3s + # TM send request timeout + rpc-tm-request-timeout: 3s + # Configuration Center + config: + type: file + file: + name: config.conf + nacos: + namespace: "" + server-addr: 127.0.0.1:8848 + group: SEATA_GROUP + username: "" + password: "" + ##if use MSE Nacos with auth, mutex with username/password attribute + #access-key: "" + #secret-key: "" + data-id: seata.properties + # Registration Center + registry: + type: file + file: + name: registry.conf + nacos: + application: seata-server + server-addr: 127.0.0.1:8848 + group: "SEATA_GROUP" + namespace: "" + username: "" + password: "" + ##if use MSE Nacos with auth, mutex with username/password attribute # + #access-key: "" # + #secret-key: "" # + log: + exception-rate: 100 + tcc: + fence: + # Anti suspension table name + log-table-name: tcc_fence_log_test + clean-period: 60s + # getty configuration + getty-session-param: + compress-encoding: false + tcp-no-delay: true + tcp-keep-alive: true + keep-alive-period: 120s + tcp-r-buf-size: 262144 + tcp-w-buf-size: 65536 + tcp-read-timeout: 1s + tcp-write-timeout: 5s + wait-timeout: 1s + max-msg-len: 16498688 + session-name: client