Browse Source

add tcc fence config logic (#383)

* add tcc fence config logic

* add string loader

* fix samples

* fix samples

* fix samples

* fix samples

* fix samples

* fix samples

* fix samples

* merge Load and LoadPath

* fix sample
tags/v1.0.3
Yuecai Liu GitHub 2 years ago
parent
commit
5e858b1f45
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 468 additions and 71 deletions
  1. +1
    -1
      go.mod
  2. +31
    -3
      pkg/client/client.go
  3. +205
    -0
      pkg/client/config.go
  4. +52
    -0
      pkg/client/config_test.go
  5. +10
    -20
      pkg/rm/tcc/fence/fence.go
  6. +14
    -3
      pkg/rm/tcc/tcc.go
  7. +0
    -44
      pkg/tm/init.go
  8. +155
    -0
      testdata/conf/seatago.yml

+ 1
- 1
go.mod View File

@@ -14,6 +14,7 @@ require (
github.com/goccy/go-json v0.9.7
github.com/golang/mock v1.6.0
github.com/google/uuid v1.3.0
github.com/knadh/koanf v1.4.3
github.com/mitchellh/copystructure v1.2.0
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/parnurzeal/gorequest v0.2.16
@@ -81,7 +82,6 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/k0kubun/pp v3.0.1+incompatible // indirect
github.com/knadh/koanf v1.4.3 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.6 // indirect


+ 31
- 3
pkg/client/client.go View File

@@ -18,13 +18,41 @@
package client

import (
"sync"

_ "github.com/seata/seata-go/pkg/integration"
"github.com/seata/seata-go/pkg/remoting/getty"
_ "github.com/seata/seata-go/pkg/remoting/processor/client"
_ "github.com/seata/seata-go/pkg/rm/tcc"
)

// Init init seata client
// Init seata client client
func Init() {
initRmClient()
initTmClient()
InitPath("")
}

// Init init client client with config path
func InitPath(configFilePath string) {
cfg := LoadPath(configFilePath)

initRmClient(cfg)
initTmClient(cfg)
}

var onceInitTmClient sync.Once

// InitTmClient init client tm client
func initTmClient(cfg *Config) {
onceInitTmClient.Do(func() {
initRemoting(cfg)
})
}

// initRemoting init rpc client
func initRemoting(cfg *Config) {
getty.InitRpcClient()
}

// InitRmClient init client rm client
func initRmClient(cfg *Config) {
}

+ 205
- 0
pkg/client/config.go View File

@@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package client

import (
"flag"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strings"

"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/json"
"github.com/knadh/koanf/parsers/toml"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/pkg/util/flagext"
)

const (
configFileEnvKey = "SEATA_GO_CONFIG_PATH"
configPrefix = "seata"
)

const (
jsonSuffix = "json"
tomlSuffix = "toml"
yamlSuffix = "yaml"
ymlSuffix = "yml"
)

type Config struct {
TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.TCCConfig.FenceConfig.RegisterFlagsWithPrefix("tcc", f)
}

type loaderConf struct {
suffix string // loaderConf file extension default yaml
path string // loaderConf file path default ./conf/seatago.yaml
delim string // loaderConf file delim default .
bytes []byte // config bytes
name string // config file name
}

// Load parse config from user config path
func LoadPath(configFilePath string) *Config {
if configFilePath == "" {
configFilePath = os.Getenv(configFileEnvKey)
if configFilePath == "" {
panic("system variable SEATA_GO_CONFIG_PATH is empty")
}
}

var cfg Config
// This sets default values from flags to the config.
// It needs to be called before parsing the config file!
flagext.RegisterFlags(&cfg)

conf := newLoaderConf(configFilePath)
koan := getConfigResolver(conf)
if err := koan.UnmarshalWithConf(configPrefix, &cfg, koanf.UnmarshalConf{Tag: yamlSuffix}); err != nil {
panic(err)
}
return &cfg
}

// Load parse config from json bytes
func LoadJson(bytes []byte) *Config {
var cfg Config
// This sets default values from flags to the config.
// It needs to be called before parsing the config file!
flagext.RegisterFlags(&cfg)

koan := getJsonConfigResolver(bytes)
if err := koan.Unmarshal("", &cfg); err != nil {
panic(err)
}
return &cfg
}

// getJsonConfigResolver get json config resolver
func getJsonConfigResolver(bytes []byte) *koanf.Koanf {
k := koanf.New(".")
if err := k.Load(rawbytes.Provider(bytes), json.Parser()); err != nil {
panic(err)
}
return k
}

//resolverFilePath resolver file path
// eg: give a ./conf/seatago.yaml return seatago and yaml
func resolverFilePath(path string) (name, suffix string) {
paths := strings.Split(path, "/")
fileName := strings.Split(paths[len(paths)-1], ".")
if len(fileName) < 2 {
return fileName[0], yamlSuffix
}
return fileName[0], fileName[1]
}

// getConfigResolver get config resolver
func getConfigResolver(conf *loaderConf) *koanf.Koanf {
var (
k *koanf.Koanf
err error
)
if len(conf.suffix) <= 0 {
conf.suffix = yamlSuffix
}
if len(conf.delim) <= 0 {
conf.delim = "."
}
bytes := conf.bytes
if len(bytes) <= 0 {
panic(fmt.Errorf("bytes is nil,please set bytes or file path"))
}
k = koanf.New(conf.delim)

switch conf.suffix {
case yamlSuffix, ymlSuffix:
err = k.Load(rawbytes.Provider(bytes), yaml.Parser())
case jsonSuffix:
err = k.Load(rawbytes.Provider(bytes), json.Parser())
case tomlSuffix:
err = k.Load(rawbytes.Provider(bytes), toml.Parser())
default:
err = fmt.Errorf("no support %s file suffix", conf.suffix)
}

if err != nil {
panic(err)
}
return k
}

func newLoaderConf(configFilePath string) *loaderConf {
name, suffix := resolverFilePath(configFilePath)
conf := &loaderConf{
suffix: suffix,
path: absolutePath(configFilePath),
delim: ".",
name: name,
}

if len(conf.bytes) <= 0 {
if bytes, err := ioutil.ReadFile(conf.path); err != nil {
panic(err)
} else {
conf.bytes = bytes
}
}
return conf
}

// absolutePath get absolut path
func absolutePath(inPath string) string {

if inPath == "$HOME" || strings.HasPrefix(inPath, "$HOME"+string(os.PathSeparator)) {
inPath = userHomeDir() + inPath[5:]
}

if filepath.IsAbs(inPath) {
return filepath.Clean(inPath)
}

p, err := filepath.Abs(inPath)
if err == nil {
return filepath.Clean(p)
}

return ""
}

//userHomeDir get gopath
func userHomeDir() string {
if runtime.GOOS == "windows" {
home := os.Getenv("HOMEDRIVE") + os.Getenv("HOMEPATH")
if home == "" {
home = os.Getenv("USERPROFILE")
}
return home
}
return os.Getenv("HOME")
}

+ 52
- 0
pkg/client/config_test.go View File

@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package client

import (
"flag"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestLoadPath(t *testing.T) {
cfg := LoadPath("../../testdata/conf/seatago.yml")
assert.NotNil(t, cfg)
assert.NotNil(t, cfg.TCCConfig)
assert.NotNil(t, cfg.TCCConfig.FenceConfig)

assert.Equal(t, "tcc_fence_log_test", cfg.TCCConfig.FenceConfig.LogTableName)
assert.Equal(t, time.Second*60, cfg.TCCConfig.FenceConfig.CleanPeriod)
// reset flag.CommandLine
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
}

func TestLoadJson(t *testing.T) {
confJson := `{"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}}}`
cfg := LoadJson([]byte(confJson))
assert.NotNil(t, cfg)
assert.NotNil(t, cfg.TCCConfig)
assert.NotNil(t, cfg.TCCConfig.FenceConfig)

assert.Equal(t, "tcc_fence_log_test2", cfg.TCCConfig.FenceConfig.LogTableName)
assert.Equal(t, time.Second*80, cfg.TCCConfig.FenceConfig.CleanPeriod)
// reset flag.CommandLine
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
}

pkg/client/tm_client.go → pkg/rm/tcc/fence/fence.go View File

@@ -15,30 +15,20 @@
* limitations under the License.
*/

package client
package fence

import (
"sync"

"github.com/seata/seata-go/pkg/remoting/getty"
"flag"
"time"
)

var onceInitTmClient sync.Once

// InitTmClient init seata tm client
func initTmClient() {
onceInitTmClient.Do(func() {
initConfig()
initRemoting()
})
}

// todo
// initConfig init config processor
func initConfig() {
type Config struct {
LogTableName string `yaml:"log-table-name" json:"log-table-name" koanf:"log-table-name"`
CleanPeriod time.Duration `yaml:"clean-period" json:"clean-period" koanf:"clean-period"`
}

// initRemoting init rpc client
func initRemoting() {
getty.InitRpcClient()
// RegisterFlagsWithPrefix for Config.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.LogTableName, prefix+".log-table-name", "tcc_fence_log", "Undo log table name.")
f.DurationVar(&cfg.CleanPeriod, prefix+".clean-period", 24*time.Hour, "Undo log retention time.")
}

pkg/client/rm_client.go → pkg/rm/tcc/tcc.go View File

@@ -15,8 +15,19 @@
* limitations under the License.
*/

package client
package tcc

// InitRmClient init seata rm client
func initRmClient() {
import (
"flag"

"github.com/seata/seata-go/pkg/rm/tcc/fence"
)

type Config struct {
FenceConfig fence.Config `yaml:"fence" json:"fence" koanf:"fence"`
}

// RegisterFlagsWithPrefix for Config.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.FenceConfig.RegisterFlagsWithPrefix(prefix+".fence", f)
}

+ 0
- 44
pkg/tm/init.go View File

@@ -1,44 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package tm

import (
"sync"

"github.com/seata/seata-go/pkg/remoting/getty"
)

var onceInitTmClient sync.Once

// InitTmClient init seata tm client
func InitTmClient() {
onceInitTmClient.Do(func() {
initConfig()
initRemoting()
})
}

// initConfig init config processor
func initConfig() {
// todo implement
}

// initRemoting init rpc client
func initRemoting() {
getty.InitRpcClient()
}

+ 155
- 0
testdata/conf/seatago.yml View File

@@ -0,0 +1,155 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# time 时间单位对应的是 time.Duration(1)
seata:
enabled: true
# application id
application-id: applicationName
# service group
tx-service-group: default_tx_group
access-key: aliyunAccessKey
secret-key: aliyunSecretKey
enable-auto-data-source-proxy: true
data-source-proxy-mode: AT
client:
rm:
# Maximum cache length of asynchronous queue
async-commit-buffer-limit: 10000
# The maximum number of retries when report reports the status
report-retry-count: 5
# The interval for regularly checking the metadata of the db(AT)
table-meta-check-enable: false
# Whether to report the status if the transaction is successfully executed(AT)
report-success-enable: false
# Whether to allow regular check of db metadata(AT)
saga-branch-register-enable: false
saga-json-parser: fastjson
saga-retry-persist-mode-update: false
saga-compensate-persist-mode-update: false
#Ordered.HIGHEST_PRECEDENCE + 1000 #
tcc-action-interceptor-order: -2147482648
# Parse SQL parser selection
sql-parser-type: druid
lock:
retry-interval: 10
retry-times: 30s
retry-policy-branch-rollback-on-conflict: true
tm:
commit-retry-count: 5
rollback-retry-count: 5
default-global-transaction-timeout: 10s
degrade-check: false
degrade-check-period: 2000
degrade-check-allow-times: 10s
interceptor-order: -2147482648
undo:
# Judge whether the before image and after image are the same,If it is the same, undo will not be recorded
data-validation: true
# Serialization method
log-serialization: jackson
# undo log table name
log-table: undo_log
# Only store modified fields
only-care-update-columns: true
compress:
# Whether compression is required
enable: true
# Compression type
type: zip
# Compression threshold Unit: k
threshold: 64
load-balance:
type: RandomLoadBalance
virtual-nodes: 10
service:
vgroup-mapping:
# Prefix for Print Log
default_tx_group: default
grouplist:
default: 127.0.0.1:8091
enable-degrade: false
# close the transaction
disable-global-transaction: false
transport:
shutdown:
wait: 3s
# Netty related configurations
# type
type: TCP
server: NIO
heartbeat: true
# Encoding and decoding mode
serialization: seata
# Message compression mode
compressor: none
# Allow batch sending of requests (TM)
enable-tm-client-batch-send-request: false
# Allow batch sending of requests (RM)
enable-rm-client-batch-send-request: true
# RM send request timeout
rpc-rm-request-timeout: 3s
# TM send request timeout
rpc-tm-request-timeout: 3s
# Configuration Center
config:
type: file
file:
name: config.conf
nacos:
namespace: ""
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
username: ""
password: ""
##if use MSE Nacos with auth, mutex with username/password attribute
#access-key: ""
#secret-key: ""
data-id: seata.properties
# Registration Center
registry:
type: file
file:
name: registry.conf
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: "SEATA_GROUP"
namespace: ""
username: ""
password: ""
##if use MSE Nacos with auth, mutex with username/password attribute #
#access-key: "" #
#secret-key: "" #
log:
exception-rate: 100
tcc:
fence:
# Anti suspension table name
log-table-name: tcc_fence_log_test
clean-period: 60s
# getty configuration
getty-session-param:
compress-encoding: false
tcp-no-delay: true
tcp-keep-alive: true
keep-alive-period: 120s
tcp-r-buf-size: 262144
tcp-w-buf-size: 65536
tcp-read-timeout: 1s
tcp-write-timeout: 5s
wait-timeout: 1s
max-msg-len: 16498688
session-name: client

Loading…
Cancel
Save