Browse Source

[Refactor] Migrate StateMachineObject to client/config and unify config parsing with koanf (#785)

* optimize saga config reuse

* upd-test

* Modularize SagaConfig into independent configuration structure
pull/808/head
flypiggy GitHub 6 months ago
parent
commit
e70cabe8c7
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
7 changed files with 172 additions and 60 deletions
  1. +4
    -0
      pkg/client/config.go
  2. +34
    -0
      pkg/saga/config.go
  3. +29
    -26
      pkg/saga/statemachine/statelang/parser/statemachine_config_parser.go
  4. +29
    -28
      pkg/saga/statemachine/statelang/parser/statemachine_config_parser_test.go
  5. +2
    -3
      pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go
  6. +27
    -3
      pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go
  7. +47
    -0
      pkg/saga/statemachine/statemachine.go

+ 4
- 0
pkg/client/config.go View File

@@ -20,6 +20,7 @@ package client
import (
"flag"
"fmt"
"github.com/seata/seata-go/pkg/saga"
"io/ioutil"
"os"
"path/filepath"
@@ -84,6 +85,8 @@ type Config struct {
TransportConfig remoteConfig.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"`
ServiceConfig discovery.ServiceConfig `yaml:"service" json:"service" koanf:"service"`
RegistryConfig discovery.RegistryConfig `yaml:"registry" json:"registry" koanf:"registry"`

SagaConfig saga.Config `yaml:"saga" json:"saga" koanf:"saga"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
@@ -102,6 +105,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.TransportConfig.RegisterFlagsWithPrefix("transport", f)
c.RegistryConfig.RegisterFlagsWithPrefix("registry", f)
c.ServiceConfig.RegisterFlagsWithPrefix("service", f)
c.SagaConfig.RegisterFlagsWithPrefix("saga", f)
}

type loaderConf struct {


+ 34
- 0
pkg/saga/config.go View File

@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package saga

import (
"flag"

"github.com/seata/seata-go/pkg/saga/statemachine"
)

type Config struct {
StateMachine *statemachine.StateMachineObject `yaml:"state-machine" json:"state-machine" koanf:"state-machine"`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
if cfg.StateMachine != nil {
cfg.StateMachine.RegisterFlagsWithPrefix(prefix+".state-machine", f)
}
}

+ 29
- 26
pkg/saga/statemachine/statelang/parser/statemachine_config_parser.go View File

@@ -19,16 +19,20 @@ package parser

import (
"bytes"
"encoding/json"
"fmt"
"gopkg.in/yaml.v3"
"github.com/seata/seata-go/pkg/saga/statemachine"
"io"
"os"

"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/json"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
)

// ConfigParser is a general configuration parser interface, used to agree on the implementation of different types of parsers
type ConfigParser interface {
Parse(configContent []byte) (*StateMachineObject, error)
Parse(configContent []byte) (*statemachine.StateMachineObject, error)
}

type JSONConfigParser struct{}
@@ -37,16 +41,21 @@ func NewJSONConfigParser() *JSONConfigParser {
return &JSONConfigParser{}
}

func (p *JSONConfigParser) Parse(configContent []byte) (*StateMachineObject, error) {
func (p *JSONConfigParser) Parse(configContent []byte) (*statemachine.StateMachineObject, error) {
if configContent == nil || len(configContent) == 0 {
return nil, fmt.Errorf("empty JSON config content")
}

var stateMachineObject StateMachineObject
if err := json.Unmarshal(configContent, &stateMachineObject); err != nil {
k := koanf.New(".")
if err := k.Load(rawbytes.Provider(configContent), json.Parser()); err != nil {
return nil, fmt.Errorf("failed to parse JSON config content: %w", err)
}

var stateMachineObject statemachine.StateMachineObject
if err := k.Unmarshal("", &stateMachineObject); err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON config to struct: %w", err)
}

return &stateMachineObject, nil
}

@@ -56,16 +65,21 @@ func NewYAMLConfigParser() *YAMLConfigParser {
return &YAMLConfigParser{}
}

func (p *YAMLConfigParser) Parse(configContent []byte) (*StateMachineObject, error) {
func (p *YAMLConfigParser) Parse(configContent []byte) (*statemachine.StateMachineObject, error) {
if configContent == nil || len(configContent) == 0 {
return nil, fmt.Errorf("empty YAML config content")
}

var stateMachineObject StateMachineObject
if err := yaml.Unmarshal(configContent, &stateMachineObject); err != nil {
k := koanf.New(".")
if err := k.Load(rawbytes.Provider(configContent), yaml.Parser()); err != nil {
return nil, fmt.Errorf("failed to parse YAML config content: %w", err)
}

var stateMachineObject statemachine.StateMachineObject
if err := k.Unmarshal("", &stateMachineObject); err != nil {
return nil, fmt.Errorf("failed to unmarshal YAML config to struct: %w", err)
}

return &stateMachineObject, nil
}

@@ -102,18 +116,20 @@ func (p *StateMachineConfigParser) ReadConfigFile(configFilePath string) ([]byte
}

func (p *StateMachineConfigParser) getParser(content []byte) (ConfigParser, error) {
var obj interface{}
if err := json.Unmarshal(content, &obj); err == nil {
k := koanf.New(".")
if err := k.Load(rawbytes.Provider(content), json.Parser()); err == nil {
return NewJSONConfigParser(), nil
}
if err := yaml.Unmarshal(content, &obj); err == nil {

k = koanf.New(".")
if err := k.Load(rawbytes.Provider(content), yaml.Parser()); err == nil {
return NewYAMLConfigParser(), nil
}

return nil, fmt.Errorf("unsupported config file format")
}

func (p *StateMachineConfigParser) Parse(content []byte) (*StateMachineObject, error) {
func (p *StateMachineConfigParser) Parse(content []byte) (*statemachine.StateMachineObject, error) {
parser, err := p.getParser(content)
if err != nil {
return nil, err
@@ -121,16 +137,3 @@ func (p *StateMachineConfigParser) Parse(content []byte) (*StateMachineObject, e

return parser.Parse(content)
}

type StateMachineObject struct {
Name string `json:"Name" yaml:"Name"`
Comment string `json:"Comment" yaml:"Comment"`
Version string `json:"Version" yaml:"Version"`
StartState string `json:"StartState" yaml:"StartState"`
RecoverStrategy string `json:"RecoverStrategy" yaml:"RecoverStrategy"`
Persist bool `json:"IsPersist" yaml:"IsPersist"`
RetryPersistModeUpdate bool `json:"IsRetryPersistModeUpdate" yaml:"IsRetryPersistModeUpdate"`
CompensatePersistModeUpdate bool `json:"IsCompensatePersistModeUpdate" yaml:"IsCompensatePersistModeUpdate"`
Type string `json:"Type" yaml:"Type"`
States map[string]interface{} `json:"States" yaml:"States"`
}

+ 29
- 28
pkg/saga/statemachine/statelang/parser/statemachine_config_parser_test.go View File

@@ -18,6 +18,7 @@
package parser

import (
"github.com/seata/seata-go/pkg/saga/statemachine"
"github.com/stretchr/testify/assert"
"testing"
)
@@ -26,39 +27,39 @@ func TestStateMachineConfigParser_Parse(t *testing.T) {
parser := NewStateMachineConfigParser()

tests := []struct {
name string
configFilePath string
expectedStateMachineObject *StateMachineObject
name string
configFilePath string
expectedObject *statemachine.StateMachineObject
}{
{
name: "JSON Simple 1",
configFilePath: "../../../../../testdata/saga/statelang/simple_statelang_with_choice.json",
expectedStateMachineObject: GetStateMachineObject1("json"),
name: "JSON Simple 1",
configFilePath: "../../../../../testdata/saga/statelang/simple_statelang_with_choice.json",
expectedObject: GetStateMachineObject1("json"),
},
{
name: "JSON Simple 2",
configFilePath: "../../../../../testdata/saga/statelang/simple_statemachine.json",
expectedStateMachineObject: GetStateMachineObject2("json"),
name: "JSON Simple 2",
configFilePath: "../../../../../testdata/saga/statelang/simple_statemachine.json",
expectedObject: GetStateMachineObject2("json"),
},
{
name: "JSON Simple 3",
configFilePath: "../../../../../testdata/saga/statelang/state_machine_new_designer.json",
expectedStateMachineObject: GetStateMachineObject3("json"),
name: "JSON Simple 3",
configFilePath: "../../../../../testdata/saga/statelang/state_machine_new_designer.json",
expectedObject: GetStateMachineObject3("json"),
},
{
name: "YAML Simple 1",
configFilePath: "../../../../../testdata/saga/statelang/simple_statelang_with_choice.yaml",
expectedStateMachineObject: GetStateMachineObject1("yaml"),
name: "YAML Simple 1",
configFilePath: "../../../../../testdata/saga/statelang/simple_statelang_with_choice.yaml",
expectedObject: GetStateMachineObject1("yaml"),
},
{
name: "YAML Simple 2",
configFilePath: "../../../../../testdata/saga/statelang/simple_statemachine.yaml",
expectedStateMachineObject: GetStateMachineObject2("yaml"),
name: "YAML Simple 2",
configFilePath: "../../../../../testdata/saga/statelang/simple_statemachine.yaml",
expectedObject: GetStateMachineObject2("yaml"),
},
{
name: "YAML Simple 3",
configFilePath: "../../../../../testdata/saga/statelang/state_machine_new_designer.yaml",
expectedStateMachineObject: GetStateMachineObject3("yaml"),
name: "YAML Simple 3",
configFilePath: "../../../../../testdata/saga/statelang/state_machine_new_designer.yaml",
expectedObject: GetStateMachineObject3("yaml"),
},
}

@@ -72,18 +73,18 @@ func TestStateMachineConfigParser_Parse(t *testing.T) {
if err != nil {
t.Error("parse fail: " + err.Error())
}
assert.Equal(t, tt.expectedStateMachineObject, object)
assert.Equal(t, tt.expectedObject, object)
})
}
}

func GetStateMachineObject1(format string) *StateMachineObject {
func GetStateMachineObject1(format string) *statemachine.StateMachineObject {
switch format {
case "json":
case "yaml":
}

return &StateMachineObject{
return &statemachine.StateMachineObject{
Name: "simpleChoiceTestStateMachine",
Comment: "带条件分支的测试状态机定义",
StartState: "FirstState",
@@ -123,7 +124,7 @@ func GetStateMachineObject1(format string) *StateMachineObject {
}
}

func GetStateMachineObject2(format string) *StateMachineObject {
func GetStateMachineObject2(format string) *statemachine.StateMachineObject {
var retryMap map[string]interface{}

switch format {
@@ -147,7 +148,7 @@ func GetStateMachineObject2(format string) *StateMachineObject {
}
}

return &StateMachineObject{
return &statemachine.StateMachineObject{
Name: "simpleTestStateMachine",
Comment: "测试状态机定义",
StartState: "FirstState",
@@ -282,7 +283,7 @@ func GetStateMachineObject2(format string) *StateMachineObject {
}
}

func GetStateMachineObject3(format string) *StateMachineObject {
func GetStateMachineObject3(format string) *statemachine.StateMachineObject {
var (
boundsMap1 map[string]interface{}
boundsMap2 map[string]interface{}
@@ -685,7 +686,7 @@ func GetStateMachineObject3(format string) *StateMachineObject {
}
}

return &StateMachineObject{
return &statemachine.StateMachineObject{
Name: "StateMachineNewDesigner",
Comment: "This state machine is modeled by designer tools.",
Version: "0.0.1",


+ 2
- 3
pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go View File

@@ -106,14 +106,13 @@ func (stateMachineParser JSONStateMachineParser) Parse(content string) (statelan
}

func (stateMachineParser JSONStateMachineParser) setForCompensation(stateValue statelang.State, stateMachine *statelang.StateMachineImpl) {
switch stateValue.Type() {
case stateValue.Type():
if stateValue.Type() == constant.StateTypeServiceTask {
serviceTaskStateImpl, ok := stateValue.(*state.ServiceTaskStateImpl)
if ok {
if serviceTaskStateImpl.CompensateState() != "" {
compState := stateMachine.States()[serviceTaskStateImpl.CompensateState()]
if stateMachineParser.isTaskState(compState.Type()) {
compStateImpl, ok := compState.(state.ServiceTaskStateImpl)
compStateImpl, ok := compState.(*state.ServiceTaskStateImpl)
if ok {
compStateImpl.SetForCompensation(true)
}


+ 27
- 3
pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go View File

@@ -18,9 +18,18 @@
package parser

import (
"os"
"testing"
)

func readFileContent(filePath string) (string, error) {
content, err := os.ReadFile(filePath)
if err != nil {
return "", err
}
return string(content), nil
}

func TestParseChoice(t *testing.T) {
parser := NewJSONStateMachineParser()

@@ -40,7 +49,12 @@ func TestParseChoice(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := parser.Parse(tt.configFilePath)
content, err := readFileContent(tt.configFilePath)
if err != nil {
t.Error("read file fail: " + err.Error())
return
}
_, err = parser.Parse(content)
if err != nil {
t.Error("parse fail: " + err.Error())
}
@@ -67,7 +81,12 @@ func TestParseServiceTaskForSimpleStateMachine(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := parser.Parse(tt.configFilePath)
content, err := readFileContent(tt.configFilePath)
if err != nil {
t.Error("read file fail: " + err.Error())
return
}
_, err = parser.Parse(content)
if err != nil {
t.Error("parse fail: " + err.Error())
}
@@ -94,7 +113,12 @@ func TestParseServiceTaskForNewDesigner(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := parser.Parse(tt.configFilePath)
content, err := readFileContent(tt.configFilePath)
if err != nil {
t.Error("read file fail: " + err.Error())
return
}
_, err = parser.Parse(content)
if err != nil {
t.Error("parse fail: " + err.Error())
}


+ 47
- 0
pkg/saga/statemachine/statemachine.go View File

@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package statemachine

import (
"flag"
)

type StateMachineObject struct {
Name string `json:"Name" yaml:"Name"`
Comment string `json:"Comment" yaml:"Comment"`
Version string `json:"Version" yaml:"Version"`
StartState string `json:"StartState" yaml:"StartState"`
RecoverStrategy string `json:"RecoverStrategy" yaml:"RecoverStrategy"`
Persist bool `json:"IsPersist" yaml:"IsPersist"`
RetryPersistModeUpdate bool `json:"IsRetryPersistModeUpdate" yaml:"IsRetryPersistModeUpdate"`
CompensatePersistModeUpdate bool `json:"IsCompensatePersistModeUpdate" yaml:"IsCompensatePersistModeUpdate"`
Type string `json:"Type" yaml:"Type"`
States map[string]interface{} `json:"States" yaml:"States"`
}

func (smo *StateMachineObject) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&smo.Name, prefix+".name", "", "State machine name.")
f.StringVar(&smo.Comment, prefix+".comment", "", "State machine comment.")
f.StringVar(&smo.Version, prefix+".version", "1.0", "State machine version.")
f.StringVar(&smo.StartState, prefix+".start-state", "", "State machine start state.")
f.StringVar(&smo.RecoverStrategy, prefix+".recover-strategy", "", "State machine recovery strategy.")
f.BoolVar(&smo.Persist, prefix+".persist", false, "Whether to persist state machine.")
f.BoolVar(&smo.RetryPersistModeUpdate, prefix+".retry-persist-mode-update", false, "Whether to use update mode for retry persistence.")
f.BoolVar(&smo.CompensatePersistModeUpdate, prefix+".compensate-persist-mode-update", false, "Whether to use update mode for compensate persistence.")
f.StringVar(&smo.Type, prefix+".type", "", "State machine type.")
}

Loading…
Cancel
Save