Browse Source

refactor: refactor saga scaffold to break import cycles (#647)

pull/650/head
Xiangkun Yin GitHub 1 year ago
parent
commit
34c5e527c0
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
8 changed files with 115 additions and 106 deletions
  1. +1
    -1
      pkg/saga/statemachine/constant/contant.go
  2. +3
    -3
      pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go
  3. +2
    -3
      pkg/saga/statemachine/engine/process_ctrl/instruction.go
  4. +5
    -87
      pkg/saga/statemachine/engine/process_ctrl/process_context.go
  5. +2
    -3
      pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go
  6. +9
    -9
      pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
  7. +7
    -0
      pkg/saga/statemachine/engine/statemachine_engine_test.go
  8. +86
    -0
      pkg/saga/statemachine/engine/utils.go

pkg/saga/statemachine/engine/contant.go → pkg/saga/statemachine/constant/contant.go View File

@@ -1,4 +1,4 @@
package engine
package constant

const (
VarNameProcessType string = "_ProcessType_"

+ 3
- 3
pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go View File

@@ -3,7 +3,7 @@ package process_ctrl
import (
"context"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/engine"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"sync"
)

@@ -76,9 +76,9 @@ func (d *DefaultBusinessProcessor) getRouterHandler(processType ProcessType) (Ro
}

func (d *DefaultBusinessProcessor) matchProcessType(processContext ProcessContext) ProcessType {
ok := processContext.HasVariable(engine.VarNameProcessType)
ok := processContext.HasVariable(constant.VarNameProcessType)
if ok {
return processContext.GetVariable(engine.VarNameProcessType).(ProcessType)
return processContext.GetVariable(constant.VarNameProcessType).(ProcessType)
}
return StateLang
}


pkg/saga/statemachine/engine/process_ctrl/instruction/instruction.go → pkg/saga/statemachine/engine/process_ctrl/instruction.go View File

@@ -1,7 +1,6 @@
package instruction
package process_ctrl

import (
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

@@ -15,7 +14,7 @@ type StateInstruction struct {
End bool
}

func (s StateInstruction) GetState(context process_ctrl.ProcessContext) (statelang.State, error) {
func (s StateInstruction) GetState(context ProcessContext) (statelang.State, error) {
//TODO implement me
panic("implement me")
}

+ 5
- 87
pkg/saga/statemachine/engine/process_ctrl/process_context.go View File

@@ -1,9 +1,6 @@
package process_ctrl

import (
"github.com/seata/seata-go/pkg/saga/statemachine/engine"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"sync"
)

@@ -20,9 +17,9 @@ type ProcessContext interface {

HasVariable(name string) bool

GetInstruction() instruction.Instruction
GetInstruction() Instruction

SetInstruction(instruction instruction.Instruction)
SetInstruction(instruction Instruction)
}

type HierarchicalProcessContext interface {
@@ -47,7 +44,7 @@ type ProcessContextImpl struct {
parent ProcessContext
mu sync.RWMutex
mp map[string]interface{}
instruction instruction.Instruction
instruction Instruction
}

func (p *ProcessContextImpl) GetVariable(name string) interface{} {
@@ -140,11 +137,11 @@ func (p *ProcessContextImpl) HasVariable(name string) bool {
return false
}

func (p *ProcessContextImpl) GetInstruction() instruction.Instruction {
func (p *ProcessContextImpl) GetInstruction() Instruction {
return p.instruction
}

func (p *ProcessContextImpl) SetInstruction(instruction instruction.Instruction) {
func (p *ProcessContextImpl) SetInstruction(instruction Instruction) {
p.instruction = instruction
}

@@ -203,82 +200,3 @@ func (p *ProcessContextImpl) ClearLocally() {

p.mp = map[string]interface{}{}
}

// ProcessContextBuilder process_ctrl builder
type ProcessContextBuilder struct {
processContext ProcessContext
}

func NewProcessContextBuilder() *ProcessContextBuilder {
processContextImpl := &ProcessContextImpl{}
return &ProcessContextBuilder{processContextImpl}
}

func (p *ProcessContextBuilder) WithProcessType(processType ProcessType) *ProcessContextBuilder {
p.processContext.SetVariable(engine.VarNameProcessType, processType)
return p
}

func (p *ProcessContextBuilder) WithOperationName(operationName string) *ProcessContextBuilder {
p.processContext.SetVariable(engine.VarNameOperationName, operationName)
return p
}

func (p *ProcessContextBuilder) WithAsyncCallback(callBack engine.CallBack) *ProcessContextBuilder {
if callBack != nil {
p.processContext.SetVariable(engine.VarNameAsyncCallback, callBack)
}

return p
}

func (p *ProcessContextBuilder) WithInstruction(instruction instruction.Instruction) *ProcessContextBuilder {
if instruction != nil {
p.processContext.SetInstruction(instruction)
}

return p
}

func (p *ProcessContextBuilder) WithStateMachineInstance(stateMachineInstance statelang.StateMachineInstance) *ProcessContextBuilder {
if stateMachineInstance != nil {
p.processContext.SetVariable(engine.VarNameStateMachineInst, stateMachineInstance)
p.processContext.SetVariable(engine.VarNameStateMachine, stateMachineInstance.StateMachine())
}

return p
}

func (p *ProcessContextBuilder) WithStateMachineEngine(stateMachineEngine engine.StateMachineEngine) *ProcessContextBuilder {
if stateMachineEngine != nil {
p.processContext.SetVariable(engine.VarNameStateMachineEngine, stateMachineEngine)
}

return p
}

func (p *ProcessContextBuilder) WithStateMachineConfig(stateMachineConfig engine.StateMachineConfig) *ProcessContextBuilder {
if stateMachineConfig != nil {
p.processContext.SetVariable(engine.VarNameStateMachineConfig, stateMachineConfig)
}

return p
}

func (p *ProcessContextBuilder) WithStateMachineContextVariables(contextMap map[string]interface{}) *ProcessContextBuilder {
if contextMap != nil {
p.processContext.SetVariable(engine.VarNameStateMachineContext, contextMap)
}

return p
}

func (p *ProcessContextBuilder) WithIsAsyncExecution(async bool) *ProcessContextBuilder {
p.processContext.SetVariable(engine.VarNameIsAsyncExecution, async)

return p
}

func (p *ProcessContextBuilder) Build() ProcessContext {
return p.processContext
}

+ 2
- 3
pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go View File

@@ -3,7 +3,6 @@ package process_ctrl
import (
"context"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction"
"sync"
)

@@ -40,7 +39,7 @@ func NewStateMachineProcessHandler() *StateMachineProcessHandler {
}

func (s *StateMachineProcessHandler) Process(ctx context.Context, processContext ProcessContext) error {
stateInstruction, _ := processContext.GetInstruction().(instruction.StateInstruction)
stateInstruction, _ := processContext.GetInstruction().(StateInstruction)

state, err := stateInstruction.GetState(processContext)
if err != nil {
@@ -107,7 +106,7 @@ type StateMachineRouterHandler struct {
}

func (s *StateMachineRouterHandler) Route(ctx context.Context, processContext ProcessContext) error {
stateInstruction, _ := processContext.GetInstruction().(instruction.StateInstruction)
stateInstruction, _ := processContext.GetInstruction().(StateInstruction)

state, err := stateInstruction.GetState(processContext)
if err != nil {


+ 9
- 9
pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go View File

@@ -3,9 +3,9 @@ package engine
import (
"context"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"time"
)
@@ -29,11 +29,11 @@ func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, stateM
}

// Build the process_ctrl context.
processContextBuilder := process_ctrl.NewProcessContextBuilder().
processContextBuilder := NewProcessContextBuilder().
WithProcessType(process_ctrl.StateLang).
WithOperationName(OperationNameStart).
WithOperationName(constant.OperationNameStart).
WithAsyncCallback(callback).
WithInstruction(instruction.NewStateInstruction(stateMachineName, tenantId)).
WithInstruction(process_ctrl.NewStateInstruction(stateMachineName, tenantId)).
WithStateMachineInstance(stateMachineInstance).
WithStateMachineConfig(p.StateMachineConfig).
WithStateMachineEngine(p).
@@ -53,7 +53,7 @@ func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, stateM
}

if stateMachineInstance.ID() == "" {
stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(SeqEntityStateMachineInst, ""))
stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(constant.SeqEntityStateMachineInst, ""))
}

var eventPublisher events.EventPublisher
@@ -97,16 +97,16 @@ func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName st
stateMachineInstance.SetStartParams(startParams)
if startParams != nil {
if businessKey != "" {
startParams[VarNameBusinesskey] = businessKey
startParams[constant.VarNameBusinesskey] = businessKey
}

if startParams[VarNameParentId] != nil {
parentId, ok := startParams[VarNameParentId].(string)
if startParams[constant.VarNameParentId] != nil {
parentId, ok := startParams[constant.VarNameParentId].(string)
if !ok {

}
stateMachineInstance.SetParentID(parentId)
delete(startParams, VarNameParentId)
delete(startParams, constant.VarNameParentId)
}
}



+ 7
- 0
pkg/saga/statemachine/engine/statemachine_engine_test.go View File

@@ -0,0 +1,7 @@
package engine

import "testing"

func TestEngine(t *testing.T) {

}

+ 86
- 0
pkg/saga/statemachine/engine/utils.go View File

@@ -0,0 +1,86 @@
package engine

import (
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

// ProcessContextBuilder process_ctrl builder
type ProcessContextBuilder struct {
processContext process_ctrl.ProcessContext
}

func NewProcessContextBuilder() *ProcessContextBuilder {
processContextImpl := &process_ctrl.ProcessContextImpl{}
return &ProcessContextBuilder{processContextImpl}
}

func (p *ProcessContextBuilder) WithProcessType(processType process_ctrl.ProcessType) *ProcessContextBuilder {
p.processContext.SetVariable(constant.VarNameProcessType, processType)
return p
}

func (p *ProcessContextBuilder) WithOperationName(operationName string) *ProcessContextBuilder {
p.processContext.SetVariable(constant.VarNameOperationName, operationName)
return p
}

func (p *ProcessContextBuilder) WithAsyncCallback(callBack CallBack) *ProcessContextBuilder {
if callBack != nil {
p.processContext.SetVariable(constant.VarNameAsyncCallback, callBack)
}

return p
}

func (p *ProcessContextBuilder) WithInstruction(instruction process_ctrl.Instruction) *ProcessContextBuilder {
if instruction != nil {
p.processContext.SetInstruction(instruction)
}

return p
}

func (p *ProcessContextBuilder) WithStateMachineInstance(stateMachineInstance statelang.StateMachineInstance) *ProcessContextBuilder {
if stateMachineInstance != nil {
p.processContext.SetVariable(constant.VarNameStateMachineInst, stateMachineInstance)
p.processContext.SetVariable(constant.VarNameStateMachine, stateMachineInstance.StateMachine())
}

return p
}

func (p *ProcessContextBuilder) WithStateMachineEngine(stateMachineEngine StateMachineEngine) *ProcessContextBuilder {
if stateMachineEngine != nil {
p.processContext.SetVariable(constant.VarNameStateMachineEngine, stateMachineEngine)
}

return p
}

func (p *ProcessContextBuilder) WithStateMachineConfig(stateMachineConfig StateMachineConfig) *ProcessContextBuilder {
if stateMachineConfig != nil {
p.processContext.SetVariable(constant.VarNameStateMachineConfig, stateMachineConfig)
}

return p
}

func (p *ProcessContextBuilder) WithStateMachineContextVariables(contextMap map[string]interface{}) *ProcessContextBuilder {
if contextMap != nil {
p.processContext.SetVariable(constant.VarNameStateMachineContext, contextMap)
}

return p
}

func (p *ProcessContextBuilder) WithIsAsyncExecution(async bool) *ProcessContextBuilder {
p.processContext.SetVariable(constant.VarNameIsAsyncExecution, async)

return p
}

func (p *ProcessContextBuilder) Build() process_ctrl.ProcessContext {
return p.processContext
}

Loading…
Cancel
Save