Browse Source

feature: sequential execution of state machine in Saga (#681)

pull/769/head
A Cabbage GitHub 1 year ago
parent
commit
8d68ae4494
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
54 changed files with 2533 additions and 503 deletions
  1. +1
    -0
      go.mod
  2. +2
    -0
      go.sum
  3. +68
    -25
      pkg/saga/statemachine/constant/constant.go
  4. +10
    -17
      pkg/saga/statemachine/engine/core/bussiness_processor.go
  5. +63
    -0
      pkg/saga/statemachine/engine/core/compensation_holder.go
  6. +199
    -0
      pkg/saga/statemachine/engine/core/default_statemachine_config.go
  7. +149
    -0
      pkg/saga/statemachine/engine/core/engine_utils.go
  8. +1
    -1
      pkg/saga/statemachine/engine/core/event.go
  9. +113
    -0
      pkg/saga/statemachine/engine/core/event_bus.go
  10. +10
    -5
      pkg/saga/statemachine/engine/core/event_consumer.go
  11. +1
    -1
      pkg/saga/statemachine/engine/core/event_publisher.go
  12. +96
    -0
      pkg/saga/statemachine/engine/core/instruction.go
  13. +92
    -0
      pkg/saga/statemachine/engine/core/loop_context_holder.go
  14. +40
    -0
      pkg/saga/statemachine/engine/core/loop_task_utils.go
  15. +132
    -0
      pkg/saga/statemachine/engine/core/parameter_utils.go
  16. +1
    -1
      pkg/saga/statemachine/engine/core/process_context.go
  17. +31
    -0
      pkg/saga/statemachine/engine/core/process_controller.go
  18. +424
    -0
      pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
  19. +194
    -0
      pkg/saga/statemachine/engine/core/process_router.go
  20. +7
    -44
      pkg/saga/statemachine/engine/core/process_state.go
  21. +151
    -0
      pkg/saga/statemachine/engine/core/state_router.go
  22. +11
    -11
      pkg/saga/statemachine/engine/core/statemachine_config.go
  23. +16
    -0
      pkg/saga/statemachine/engine/core/statemachine_engine.go
  24. +15
    -0
      pkg/saga/statemachine/engine/core/statemachine_engine_test.go
  25. +8
    -7
      pkg/saga/statemachine/engine/core/statemachine_store.go
  26. +188
    -0
      pkg/saga/statemachine/engine/core/status_decision.go
  27. +7
    -7
      pkg/saga/statemachine/engine/core/utils.go
  28. +0
    -123
      pkg/saga/statemachine/engine/default_statemachine_config.go
  29. +0
    -49
      pkg/saga/statemachine/engine/events/event_bus.go
  30. +54
    -0
      pkg/saga/statemachine/engine/exception/exception.go
  31. +83
    -0
      pkg/saga/statemachine/engine/expr/expression.go
  32. +0
    -24
      pkg/saga/statemachine/engine/process_ctrl/instruction.go
  33. +0
    -125
      pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
  34. +0
    -16
      pkg/saga/statemachine/engine/statemachine_engine.go
  35. +0
    -7
      pkg/saga/statemachine/engine/statemachine_engine_test.go
  36. +0
    -4
      pkg/saga/statemachine/engine/status_decision/status_decision.go
  37. +174
    -0
      pkg/saga/statemachine/process_ctrl/handlers/service_task_state_handler.go
  38. +1
    -1
      pkg/saga/statemachine/process_ctrl/process/process_type.go
  39. +1
    -1
      pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go
  40. +22
    -0
      pkg/saga/statemachine/statelang/state/loop_start_state.go
  41. +1
    -1
      pkg/saga/statemachine/statelang/state/sub_state_machine.go
  42. +19
    -1
      pkg/saga/statemachine/statelang/state/task_state.go
  43. +13
    -7
      pkg/saga/statemachine/statelang/statemachine_instance.go
  44. +0
    -0
      pkg/saga/statemachine/store/db/db.go
  45. +0
    -0
      pkg/saga/statemachine/store/db/db_test.go
  46. +0
    -0
      pkg/saga/statemachine/store/db/statelang.go
  47. +0
    -0
      pkg/saga/statemachine/store/db/statelang_test.go
  48. +11
    -11
      pkg/saga/statemachine/store/db/statelog.go
  49. +12
    -12
      pkg/saga/statemachine/store/db/statelog_test.go
  50. +34
    -0
      pkg/saga/statemachine/store/repository/state_machine_repository.go
  51. +1
    -1
      pkg/tm/transaction_executor_test.go
  52. +43
    -1
      pkg/util/collection/collection.go
  53. +23
    -0
      pkg/util/collection/collection_test.go
  54. +11
    -0
      pkg/util/errors/code.go

+ 1
- 0
go.mod View File

@@ -34,6 +34,7 @@ require (
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/agiledragon/gomonkey/v2 v2.9.0
github.com/mattn/go-sqlite3 v1.14.19
golang.org/x/sync v0.6.0
google.golang.org/protobuf v1.30.0
)



+ 2
- 0
go.sum View File

@@ -942,6 +942,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=


+ 68
- 25
pkg/saga/statemachine/constant/constant.go View File

@@ -1,29 +1,72 @@
package constant

const (
VarNameProcessType string = "_ProcessType_"
VarNameOperationName string = "_operation_name_"
OperationNameStart string = "start"
VarNameAsyncCallback string = "_async_callback_"
VarNameStateMachineInst string = "_current_statemachine_instance_"
VarNameStateMachine string = "_current_statemachine_"
VarNameStateMachineEngine string = "_current_statemachine_engine_"
VarNameStateMachineConfig string = "_statemachine_config_"
VarNameStateMachineContext string = "context"
VarNameIsAsyncExecution string = "_is_async_execution_"
VarNameStateInst string = "_current_state_instance_"
SeqEntityStateMachineInst string = "STATE_MACHINE_INST"
SeqEntityStateInst string = "STATE_INST"
VarNameBusinesskey string = "_business_key_"
VarNameParentId string = "_parent_id_"
StateTypeServiceTask string = "ServiceTask"
StateTypeChoice string = "Choice"
StateTypeSubStateMachine string = "SubStateMachine"
CompensateSubMachine string = "CompensateSubMachine"
StateTypeSucceed string = "Succeed"
StateTypeFail string = "Fail"
StateTypeCompensationTrigger string = "CompensationTrigger"
StateTypeScriptTask string = "ScriptTask"
CompensateSubMachineStateNamePrefix string = "_compensate_sub_machine_state_"
DefaultScriptType string = "groovy"
// region State Types
StateTypeServiceTask string = "ServiceTask"
StateTypeChoice string = "Choice"
StateTypeFail string = "Fail"
StateTypeSucceed string = "Succeed"
StateTypeCompensationTrigger string = "CompensationTrigger"
StateTypeSubStateMachine string = "SubStateMachine"
StateTypeCompensateSubMachine string = "CompensateSubMachine"
StateTypeScriptTask string = "ScriptTask"
StateTypeLoopStart string = "LoopStart"
// end region

// region Service Types
ServiceTypeGRPC string = "GRPC"
// end region

// region System Variables
VarNameOutputParams string = "outputParams"
VarNameProcessType string = "_ProcessType_"
VarNameOperationName string = "_operation_name_"
OperationNameStart string = "start"
OperationNameCompensate string = "compensate"
VarNameAsyncCallback string = "_async_callback_"
VarNameCurrentExceptionRoute string = "_current_exception_route_"
VarNameIsExceptionNotCatch string = "_is_exception_not_catch_"
VarNameSubMachineParentId string = "_sub_machine_parent_id_"
VarNameCurrentChoice string = "_current_choice_"
VarNameStateMachineInst string = "_current_statemachine_instance_"
VarNameStateMachine string = "_current_statemachine_"
VarNameStateMachineEngine string = "_current_statemachine_engine_"
VarNameStateMachineConfig string = "_statemachine_config_"
VarNameStateMachineContext string = "context"
VarNameIsAsyncExecution string = "_is_async_execution_"
VarNameStateInst string = "_current_state_instance_"
VarNameBusinesskey string = "_business_key_"
VarNameParentId string = "_parent_id_"
VarNameCurrentException string = "currentException"
CompensateSubMachineStateNamePrefix string = "_compensate_sub_machine_state_"
DefaultScriptType string = "groovy"
VarNameSyncExeStack string = "_sync_execution_stack_"
VarNameInputParams string = "inputParams"
VarNameIsLoopState string = "_is_loop_state_"
VarNameCurrentCompensateTriggerState string = "_is_compensating_"
VarNameCurrentCompensationHolder string = "_current_compensation_holder_"
VarNameFirstCompensationStateStarted string = "_first_compensation_state_started"
VarNameCurrentLoopContextHolder string = "_current_loop_context_holder_"
// TODO: this lock in process context only has one, try to add more to add concurrent
VarNameProcessContextMutexLock string = "_current_context_mutex_lock"
VarNameFailEndStateFlag string = "_fail_end_state_flag_"
// end region

// region of loop
LoopCounter string = "loopCounter"
LoopSemaphore string = "loopSemaphore"
LoopResult string = "loopResult"
NumberOfInstances string = "nrOfInstances"
NumberOfActiveInstances string = "nrOfActiveInstances"
NumberOfCompletedInstances string = "nrOfCompletedInstances"
// end region

// region others
SeqEntityStateMachineInst string = "STATE_MACHINE_INST"
SeqEntityStateInst string = "STATE_INST"
OperationNameForward string = "forward"
LoopStateNamePattern string = "-loop-"
// end region

SeperatorParentId string = ":"
)

pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go → pkg/saga/statemachine/engine/core/bussiness_processor.go View File

@@ -1,9 +1,10 @@
package process_ctrl
package core

import (
"context"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
"sync"
)

@@ -19,14 +20,14 @@ type DefaultBusinessProcessor struct {
mu sync.RWMutex
}

func (d *DefaultBusinessProcessor) RegistryProcessHandler(processType ProcessType, processHandler ProcessHandler) {
func (d *DefaultBusinessProcessor) RegistryProcessHandler(processType process.ProcessType, processHandler ProcessHandler) {
d.mu.Lock()
defer d.mu.Unlock()

d.processHandlers[string(processType)] = processHandler
}

func (d *DefaultBusinessProcessor) RegistryRouterHandler(processType ProcessType, routerHandler RouterHandler) {
func (d *DefaultBusinessProcessor) RegistryRouterHandler(processType process.ProcessType, routerHandler RouterHandler) {
d.mu.Lock()
defer d.mu.Unlock()

@@ -55,17 +56,17 @@ func (d *DefaultBusinessProcessor) Route(ctx context.Context, processContext Pro
return routerHandler.Route(ctx, processContext)
}

func (d *DefaultBusinessProcessor) getProcessHandler(processType ProcessType) (ProcessHandler, error) {
func (d *DefaultBusinessProcessor) getProcessHandler(processType process.ProcessType) (ProcessHandler, error) {
d.mu.RLock()
defer d.mu.RUnlock()
processHandler, ok := d.processHandlers[string(processType)]
if !ok {
return nil, errors.New("Cannot find process handler by type " + string(processType))
return nil, errors.New("Cannot find Process handler by type " + string(processType))
}
return processHandler, nil
}

func (d *DefaultBusinessProcessor) getRouterHandler(processType ProcessType) (RouterHandler, error) {
func (d *DefaultBusinessProcessor) getRouterHandler(processType process.ProcessType) (RouterHandler, error) {
d.mu.RLock()
defer d.mu.RUnlock()
routerHandler, ok := d.routerHandlers[string(processType)]
@@ -75,18 +76,10 @@ func (d *DefaultBusinessProcessor) getRouterHandler(processType ProcessType) (Ro
return routerHandler, nil
}

func (d *DefaultBusinessProcessor) matchProcessType(processContext ProcessContext) ProcessType {
func (d *DefaultBusinessProcessor) matchProcessType(processContext ProcessContext) process.ProcessType {
ok := processContext.HasVariable(constant.VarNameProcessType)
if ok {
return processContext.GetVariable(constant.VarNameProcessType).(ProcessType)
return processContext.GetVariable(constant.VarNameProcessType).(process.ProcessType)
}
return StateLang
}

type ProcessHandler interface {
Process(ctx context.Context, processContext ProcessContext) error
}

type RouterHandler interface {
Route(ctx context.Context, processContext ProcessContext) error
return process.StateLang
}

+ 63
- 0
pkg/saga/statemachine/engine/core/compensation_holder.go View File

@@ -0,0 +1,63 @@
package core

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/seata/seata-go/pkg/util/collection"
"sync"
)

type CompensationHolder struct {
statesNeedCompensation *sync.Map
statesForCompensation *sync.Map
stateStackNeedCompensation *collection.Stack
}

func (c *CompensationHolder) StatesNeedCompensation() *sync.Map {
return c.statesNeedCompensation
}

func (c *CompensationHolder) SetStatesNeedCompensation(statesNeedCompensation *sync.Map) {
c.statesNeedCompensation = statesNeedCompensation
}

func (c *CompensationHolder) StatesForCompensation() *sync.Map {
return c.statesForCompensation
}

func (c *CompensationHolder) SetStatesForCompensation(statesForCompensation *sync.Map) {
c.statesForCompensation = statesForCompensation
}

func (c *CompensationHolder) StateStackNeedCompensation() *collection.Stack {
return c.stateStackNeedCompensation
}

func (c *CompensationHolder) SetStateStackNeedCompensation(stateStackNeedCompensation *collection.Stack) {
c.stateStackNeedCompensation = stateStackNeedCompensation
}

func (c *CompensationHolder) AddToBeCompensatedState(stateName string, toBeCompensatedState statelang.StateInstance) {
c.statesNeedCompensation.Store(stateName, toBeCompensatedState)
}

func NewCompensationHolder() *CompensationHolder {
return &CompensationHolder{
statesNeedCompensation: &sync.Map{},
statesForCompensation: &sync.Map{},
stateStackNeedCompensation: collection.NewStack(),
}
}

func GetCurrentCompensationHolder(ctx context.Context, processContext ProcessContext, forceCreate bool) *CompensationHolder {
compensationholder := processContext.GetVariable(constant.VarNameCurrentCompensationHolder).(*CompensationHolder)
lock := processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex)
lock.Lock()
defer lock.Unlock()
if compensationholder == nil && forceCreate {
compensationholder = NewCompensationHolder()
processContext.SetVariable(constant.VarNameCurrentCompensationHolder, compensationholder)
}
return compensationholder
}

+ 199
- 0
pkg/saga/statemachine/engine/core/default_statemachine_config.go View File

@@ -0,0 +1,199 @@
package core

import (
"github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
"sync"
)

const (
DefaultTransOperTimeout = 60000 * 30
DefaultServiceInvokeTimeout = 60000 * 5
)

type DefaultStateMachineConfig struct {
// Configuration
transOperationTimeout int
serviceInvokeTimeout int
charset string
defaultTenantId string

// Components

// Event publisher
syncProcessCtrlEventPublisher EventPublisher
asyncProcessCtrlEventPublisher EventPublisher

// Store related components
stateLogRepository StateLogRepository
stateLogStore StateLogStore
stateLangStore StateLangStore
stateMachineRepository StateMachineRepository

// Expression related components
expressionFactoryManager expr.ExpressionFactoryManager
expressionResolver expr.ExpressionResolver

// Invoker related components
serviceInvokerManager invoker.ServiceInvokerManager
scriptInvokerManager invoker.ScriptInvokerManager

// Other components
statusDecisionStrategy StatusDecisionStrategy
seqGenerator sequence.SeqGenerator
componentLock *sync.Mutex
}

func (c *DefaultStateMachineConfig) ComponentLock() *sync.Mutex {
return c.componentLock
}

func (c *DefaultStateMachineConfig) SetComponentLock(componentLock *sync.Mutex) {
c.componentLock = componentLock
}

func (c *DefaultStateMachineConfig) SetTransOperationTimeout(transOperationTimeout int) {
c.transOperationTimeout = transOperationTimeout
}

func (c *DefaultStateMachineConfig) SetServiceInvokeTimeout(serviceInvokeTimeout int) {
c.serviceInvokeTimeout = serviceInvokeTimeout
}

func (c *DefaultStateMachineConfig) SetCharset(charset string) {
c.charset = charset
}

func (c *DefaultStateMachineConfig) SetDefaultTenantId(defaultTenantId string) {
c.defaultTenantId = defaultTenantId
}

func (c *DefaultStateMachineConfig) SetSyncProcessCtrlEventPublisher(syncProcessCtrlEventPublisher EventPublisher) {
c.syncProcessCtrlEventPublisher = syncProcessCtrlEventPublisher
}

func (c *DefaultStateMachineConfig) SetAsyncProcessCtrlEventPublisher(asyncProcessCtrlEventPublisher EventPublisher) {
c.asyncProcessCtrlEventPublisher = asyncProcessCtrlEventPublisher
}

func (c *DefaultStateMachineConfig) SetStateLogRepository(stateLogRepository StateLogRepository) {
c.stateLogRepository = stateLogRepository
}

func (c *DefaultStateMachineConfig) SetStateLogStore(stateLogStore StateLogStore) {
c.stateLogStore = stateLogStore
}

func (c *DefaultStateMachineConfig) SetStateLangStore(stateLangStore StateLangStore) {
c.stateLangStore = stateLangStore
}

func (c *DefaultStateMachineConfig) SetStateMachineRepository(stateMachineRepository StateMachineRepository) {
c.stateMachineRepository = stateMachineRepository
}

func (c *DefaultStateMachineConfig) SetExpressionFactoryManager(expressionFactoryManager expr.ExpressionFactoryManager) {
c.expressionFactoryManager = expressionFactoryManager
}

func (c *DefaultStateMachineConfig) SetExpressionResolver(expressionResolver expr.ExpressionResolver) {
c.expressionResolver = expressionResolver
}

func (c *DefaultStateMachineConfig) SetServiceInvokerManager(serviceInvokerManager invoker.ServiceInvokerManager) {
c.serviceInvokerManager = serviceInvokerManager
}

func (c *DefaultStateMachineConfig) SetScriptInvokerManager(scriptInvokerManager invoker.ScriptInvokerManager) {
c.scriptInvokerManager = scriptInvokerManager
}

func (c *DefaultStateMachineConfig) SetStatusDecisionStrategy(statusDecisionStrategy StatusDecisionStrategy) {
c.statusDecisionStrategy = statusDecisionStrategy
}

func (c *DefaultStateMachineConfig) SetSeqGenerator(seqGenerator sequence.SeqGenerator) {
c.seqGenerator = seqGenerator
}

func (c *DefaultStateMachineConfig) StateLogRepository() StateLogRepository {
return c.stateLogRepository
}

func (c *DefaultStateMachineConfig) StateMachineRepository() StateMachineRepository {
return c.stateMachineRepository
}

func (c *DefaultStateMachineConfig) StateLogStore() StateLogStore {
return c.stateLogStore
}

func (c *DefaultStateMachineConfig) StateLangStore() StateLangStore {
return c.stateLangStore
}

func (c *DefaultStateMachineConfig) ExpressionFactoryManager() expr.ExpressionFactoryManager {
return c.expressionFactoryManager
}

func (c *DefaultStateMachineConfig) ExpressionResolver() expr.ExpressionResolver {
return c.expressionResolver
}

func (c *DefaultStateMachineConfig) SeqGenerator() sequence.SeqGenerator {
return c.seqGenerator
}

func (c *DefaultStateMachineConfig) StatusDecisionStrategy() StatusDecisionStrategy {
return c.statusDecisionStrategy
}

func (c *DefaultStateMachineConfig) EventPublisher() EventPublisher {
return c.syncProcessCtrlEventPublisher
}

func (c *DefaultStateMachineConfig) AsyncEventPublisher() EventPublisher {
return c.asyncProcessCtrlEventPublisher
}

func (c *DefaultStateMachineConfig) ServiceInvokerManager() invoker.ServiceInvokerManager {
return c.serviceInvokerManager
}

func (c *DefaultStateMachineConfig) ScriptInvokerManager() invoker.ScriptInvokerManager {
return c.scriptInvokerManager
}

func (c *DefaultStateMachineConfig) CharSet() string {
return c.charset
}

func (c *DefaultStateMachineConfig) SetCharSet(charset string) {
c.charset = charset
}

func (c *DefaultStateMachineConfig) DefaultTenantId() string {
return c.defaultTenantId
}

func (c *DefaultStateMachineConfig) TransOperationTimeout() int {
return c.transOperationTimeout
}

func (c *DefaultStateMachineConfig) ServiceInvokeTimeout() int {
return c.serviceInvokeTimeout
}

func NewDefaultStateMachineConfig() *DefaultStateMachineConfig {
c := &DefaultStateMachineConfig{
transOperationTimeout: DefaultTransOperTimeout,
serviceInvokeTimeout: DefaultServiceInvokeTimeout,
charset: "UTF-8",
defaultTenantId: "000001",
componentLock: &sync.Mutex{},
}

// TODO: init config
return c
}

+ 149
- 0
pkg/saga/statemachine/engine/core/engine_utils.go View File

@@ -0,0 +1,149 @@
package core

import (
"context"
"errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
"github.com/seata/seata-go/pkg/util/log"
"golang.org/x/sync/semaphore"
"reflect"
"strings"
"sync"
"time"
)

func EndStateMachine(ctx context.Context, processContext ProcessContext) error {
if processContext.HasVariable(constant.VarNameIsLoopState) {
if processContext.HasVariable(constant.LoopSemaphore) {
weighted, ok := processContext.GetVariable(constant.LoopSemaphore).(semaphore.Weighted)
if !ok {
return errors.New("semaphore type is not weighted")
}
weighted.Release(1)
}
}

stateMachineInstance, ok := processContext.GetVariable(constant.VarNameStateMachineInst).(statelang.StateMachineInstance)
if !ok {
return errors.New("state machine instance type is not statelang.StateMachineInstance")
}

stateMachineInstance.SetEndTime(time.Now())

exp, ok := processContext.GetVariable(constant.VarNameCurrentException).(error)
if !ok {
return errors.New("exception type is not error")
}

if exp != nil {
stateMachineInstance.SetException(exp)
log.Debugf("Exception Occurred: %s", exp)
}

stateMachineConfig, ok := processContext.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig)

if err := stateMachineConfig.StatusDecisionStrategy().DecideOnEndState(ctx, processContext, stateMachineInstance, exp); err != nil {
return err
}

contextParams, ok := processContext.GetVariable(constant.VarNameStateMachineContext).(map[string]interface{})
if !ok {
return errors.New("state machine context type is not map[string]interface{}")
}
endParams := stateMachineInstance.EndParams()
for k, v := range contextParams {
endParams[k] = v
}
stateMachineInstance.SetEndParams(endParams)

stateInstruction, ok := processContext.GetInstruction().(StateInstruction)
if !ok {
return errors.New("state instruction type is not process_ctrl.StateInstruction")
}
stateInstruction.SetEnd(true)

stateMachineInstance.SetRunning(false)
stateMachineInstance.SetEndTime(time.Now())

if stateMachineInstance.StateMachine().IsPersist() && stateMachineConfig.StateLangStore() != nil {
err := stateMachineConfig.StateLogStore().RecordStateMachineFinished(ctx, stateMachineInstance, processContext)
if err != nil {
return err
}
}

callBack, ok := processContext.GetVariable(constant.VarNameAsyncCallback).(CallBack)
if ok {
if exp != nil {
callBack.OnError(ctx, processContext, stateMachineInstance, exp)
} else {
callBack.OnFinished(ctx, processContext, stateMachineInstance)
}
}

return nil
}

func HandleException(processContext ProcessContext, abstractTaskState *state.AbstractTaskState, err error) {
catches := abstractTaskState.Catches()
if catches != nil && len(catches) != 0 {
for _, exceptionMatch := range catches {
exceptions := exceptionMatch.Exceptions()
exceptionTypes := exceptionMatch.ExceptionTypes()
if exceptions != nil && len(exceptions) != 0 {
if exceptionTypes == nil {
lock := processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex)
lock.Lock()
defer lock.Unlock()
error := errors.New("")
for i := 0; i < len(exceptions); i++ {
exceptionTypes = append(exceptionTypes, reflect.TypeOf(error))
}
}

exceptionMatch.SetExceptionTypes(exceptionTypes)
}

for i, _ := range exceptionTypes {
if reflect.TypeOf(err) == exceptionTypes[i] {
// HACK: we can not get error type in config file during runtime, so we use exception str
if strings.Contains(err.Error(), exceptions[i]) {
hierarchicalProcessContext := processContext.(HierarchicalProcessContext)
hierarchicalProcessContext.SetVariable(constant.VarNameCurrentExceptionRoute, exceptionMatch.Next())
return
}
}
}
}
}

log.Error("Task execution failed and no catches configured")
hierarchicalProcessContext := processContext.(HierarchicalProcessContext)
hierarchicalProcessContext.SetVariable(constant.VarNameIsExceptionNotCatch, true)
}

// GetOriginStateName get origin state name without suffix like fork
func GetOriginStateName(stateInstance statelang.StateInstance) string {
stateName := stateInstance.Name()
if stateName != "" {
end := strings.LastIndex(stateName, constant.LoopStateNamePattern)
if end > -1 {
return stateName[:end+1]
}
}
return stateName
}

// IsTimeout test if is timeout
func IsTimeout(gmtUpdated time.Time, timeoutMillis int) bool {
if timeoutMillis < 0 {
return false
}
return time.Now().Unix()-gmtUpdated.Unix() > int64(timeoutMillis)
}

func GenerateParentId(stateInstance statelang.StateInstance) string {
return stateInstance.MachineInstanceID() + constant.SeperatorParentId + stateInstance.ID()
}

pkg/saga/statemachine/engine/events/event.go → pkg/saga/statemachine/engine/core/event.go View File

@@ -1,4 +1,4 @@
package events
package core

type Event interface {
}

+ 113
- 0
pkg/saga/statemachine/engine/core/event_bus.go View File

@@ -0,0 +1,113 @@
package core

import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/util/collection"
"github.com/seata/seata-go/pkg/util/log"
)

type EventBus interface {
Offer(ctx context.Context, event Event) (bool, error)

EventConsumerList(event Event) []EventConsumer

RegisterEventConsumer(consumer EventConsumer)
}

type BaseEventBus struct {
eventConsumerList []EventConsumer
}

func (b *BaseEventBus) RegisterEventConsumer(consumer EventConsumer) {
if b.eventConsumerList == nil {
b.eventConsumerList = make([]EventConsumer, 0)
}
b.eventConsumerList = append(b.eventConsumerList, consumer)
}

func (b *BaseEventBus) EventConsumerList(event Event) []EventConsumer {
var acceptedConsumerList = make([]EventConsumer, 0)
for i := range b.eventConsumerList {
eventConsumer := b.eventConsumerList[i]
if eventConsumer.Accept(event) {
acceptedConsumerList = append(acceptedConsumerList, eventConsumer)
}
}
return acceptedConsumerList
}

type DirectEventBus struct {
BaseEventBus
}

func (d DirectEventBus) Offer(ctx context.Context, event Event) (bool, error) {
eventConsumerList := d.EventConsumerList(event)
if len(eventConsumerList) == 0 {
log.Debugf("cannot find event handler by type: %T", event)
return false, nil
}

isFirstEvent := true
processContext, ok := event.(ProcessContext)
if !ok {
log.Errorf("event %T is illegal, required process_ctrl.ProcessContext", event)
return false, nil
}

stack := processContext.GetVariable(constant.VarNameSyncExeStack).(*collection.Stack)
if stack == nil {
stack = collection.NewStack()
processContext.SetVariable(constant.VarNameSyncExeStack, stack)
isFirstEvent = true
}

stack.Push(processContext)
if isFirstEvent {
for stack.Len() > 0 {
currentContext := stack.Pop().(ProcessContext)
for _, eventConsumer := range eventConsumerList {
err := eventConsumer.Process(ctx, currentContext)
if err != nil {
log.Errorf("process event %T error: %s", event, err.Error())
return false, err
}
}
}
}

return true, nil
}

type AsyncEventBus struct {
BaseEventBus
}

func (a AsyncEventBus) Offer(ctx context.Context, event Event) (bool, error) {
eventConsumerList := a.EventConsumerList(event)
if len(eventConsumerList) == 0 {
errStr := fmt.Sprintf("cannot find event handler by type: %T", event)
log.Errorf(errStr)
return false, errors.New(errStr)
}

processContext, ok := event.(ProcessContext)
if !ok {
errStr := fmt.Sprintf("event %T is illegal, required process_ctrl.ProcessContext", event)
log.Errorf(errStr)
return false, errors.New(errStr)
}

for _, eventConsumer := range eventConsumerList {
go func() {
err := eventConsumer.Process(ctx, processContext)
if err != nil {
log.Errorf("process event %T error: %s", event, err.Error())
}
}()
}

return true, nil
}

pkg/saga/statemachine/engine/events/event_consumer.go → pkg/saga/statemachine/engine/core/event_consumer.go View File

@@ -1,8 +1,9 @@
package events
package core

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"fmt"
"github.com/pkg/errors"
)

type EventConsumer interface {
@@ -12,6 +13,7 @@ type EventConsumer interface {
}

type ProcessCtrlEventConsumer struct {
processController ProcessController
}

func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
@@ -19,11 +21,14 @@ func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
return false
}

_, ok := event.(process_ctrl.ProcessContext)
_, ok := event.(ProcessContext)
return ok
}

func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event) error {
//TODO implement me
panic("implement me")
processContext, ok := event.(ProcessContext)
if !ok {
return errors.New(fmt.Sprint("event %T is illegal, required process_ctrl.ProcessContext", event))
}
return p.processController.Process(ctx, processContext)
}

pkg/saga/statemachine/engine/events/event_publisher.go → pkg/saga/statemachine/engine/core/event_publisher.go View File

@@ -1,4 +1,4 @@
package events
package core

import "context"


+ 96
- 0
pkg/saga/statemachine/engine/core/instruction.go View File

@@ -0,0 +1,96 @@
package core

import (
"errors"
"fmt"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

type Instruction interface {
}

type StateInstruction struct {
stateName string
stateMachineName string
tenantId string
end bool
temporaryState statelang.State
}

func NewStateInstruction(stateMachineName string, tenantId string) *StateInstruction {
return &StateInstruction{stateMachineName: stateMachineName, tenantId: tenantId}
}

func (s *StateInstruction) StateName() string {
return s.stateName
}

func (s *StateInstruction) SetStateName(stateName string) {
s.stateName = stateName
}

func (s *StateInstruction) StateMachineName() string {
return s.stateMachineName
}

func (s *StateInstruction) SetStateMachineName(stateMachineName string) {
s.stateMachineName = stateMachineName
}

func (s *StateInstruction) TenantId() string {
return s.tenantId
}

func (s *StateInstruction) SetTenantId(tenantId string) {
s.tenantId = tenantId
}

func (s *StateInstruction) End() bool {
return s.end
}

func (s *StateInstruction) SetEnd(end bool) {
s.end = end
}

func (s *StateInstruction) TemporaryState() statelang.State {
return s.temporaryState
}

func (s *StateInstruction) SetTemporaryState(temporaryState statelang.State) {
s.temporaryState = temporaryState
}

func (s *StateInstruction) GetState(context ProcessContext) (statelang.State, error) {
if s.temporaryState != nil {
return s.temporaryState, nil
}

if s.stateMachineName == "" {
return nil, errors.New("stateMachineName is required")
}

stateMachineConfig, ok := context.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig)
if !ok {
return nil, errors.New("stateMachineConfig is required in context")
}
stateMachine, err := stateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(s.stateMachineName, s.tenantId)
if err != nil {
return nil, errors.New("get stateMachine in state machine repository error")
}
if stateMachine == nil {
return nil, errors.New(fmt.Sprintf("stateMachine [%s] is not exist", s.stateMachineName))
}

if s.stateName == "" {
s.stateName = stateMachine.StartState()
}

state := stateMachine.States()[s.stateName]
if state == nil {
return nil, errors.New(fmt.Sprintf("state [%s] is not exist", s.stateName))
}

return state, nil
}

+ 92
- 0
pkg/saga/statemachine/engine/core/loop_context_holder.go View File

@@ -0,0 +1,92 @@
package core

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"sync"
)

type LoopContextHolder struct {
nrOfInstances int32
nrOfActiveInstances int32
nrOfCompletedInstances int32
failEnd bool
completionConditionSatisfied bool
loopCounterStack []int
forwardCounterStack []int
collection interface{}
}

func NewLoopContextHolder() *LoopContextHolder {
return &LoopContextHolder{
nrOfInstances: 0,
nrOfActiveInstances: 0,
nrOfCompletedInstances: 0,
failEnd: false,
completionConditionSatisfied: false,
loopCounterStack: make([]int, 0),
forwardCounterStack: make([]int, 0),
collection: nil,
}
}

func GetCurrentLoopContextHolder(ctx context.Context, processContext ProcessContext, forceCreate bool) *LoopContextHolder {
mutex := processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex)
mutex.Lock()
defer mutex.Unlock()

loopContextHolder := processContext.GetVariable(constant.VarNameCurrentLoopContextHolder).(*LoopContextHolder)
if loopContextHolder == nil && forceCreate {
loopContextHolder = &LoopContextHolder{}
processContext.SetVariable(constant.VarNameCurrentLoopContextHolder, loopContextHolder)
}
return loopContextHolder
}

func ClearCurrent(ctx context.Context, processContext ProcessContext) {
processContext.RemoveVariable(constant.VarNameCurrentLoopContextHolder)
}

func (l *LoopContextHolder) NrOfInstances() int32 {
return l.nrOfInstances
}

func (l *LoopContextHolder) NrOfActiveInstances() int32 {
return l.nrOfActiveInstances
}

func (l *LoopContextHolder) NrOfCompletedInstances() int32 {
return l.nrOfCompletedInstances
}

func (l *LoopContextHolder) FailEnd() bool {
return l.failEnd
}

func (l *LoopContextHolder) SetFailEnd(failEnd bool) {
l.failEnd = failEnd
}

func (l *LoopContextHolder) CompletionConditionSatisfied() bool {
return l.completionConditionSatisfied
}

func (l *LoopContextHolder) SetCompletionConditionSatisfied(completionConditionSatisfied bool) {
l.completionConditionSatisfied = completionConditionSatisfied
}

func (l *LoopContextHolder) LoopCounterStack() []int {
return l.loopCounterStack
}

func (l *LoopContextHolder) ForwardCounterStack() []int {
return l.forwardCounterStack
}

func (l *LoopContextHolder) Collection() interface{} {
return l.collection
}

func (l *LoopContextHolder) SetCollection(collection interface{}) {
l.collection = collection
}

+ 40
- 0
pkg/saga/statemachine/engine/core/loop_task_utils.go View File

@@ -0,0 +1,40 @@
package core

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
"github.com/seata/seata-go/pkg/util/log"
)

func GetLoopConfig(ctx context.Context, processContext ProcessContext, currentState statelang.State) state.Loop {
if matchLoop(currentState) {
taskState := currentState.(state.AbstractTaskState)
stateMachineInstance := processContext.GetVariable(constant.VarNameStateMachineInst).(statelang.StateMachineInstance)
stateMachineConfig := processContext.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig)

if taskState.Loop() != nil {
loop := taskState.Loop()
collectionName := loop.Collection()
if collectionName != "" {
expression := CreateValueExpression(stateMachineConfig.ExpressionResolver(), collectionName)
collection := GetValue(expression, stateMachineInstance.Context(), nil)
collectionList := collection.([]any)
if len(collectionList) > 0 {
current := GetCurrentLoopContextHolder(ctx, processContext, true)
current.SetCollection(collection)
return loop
}
}
log.Warn("State [{}] loop collection param [{}] invalid", currentState.Name(), collectionName)
}

}
return nil
}

func matchLoop(currentState statelang.State) bool {
return currentState != nil && (constant.StateTypeServiceTask == currentState.Type() ||
constant.StateTypeScriptTask == currentState.Type() || constant.StateTypeSubStateMachine == currentState.Type())
}

+ 132
- 0
pkg/saga/statemachine/engine/core/parameter_utils.go View File

@@ -0,0 +1,132 @@
package core

import (
"fmt"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
"strings"
"sync"
)

func CreateInputParams(processContext ProcessContext, expressionResolver expr.ExpressionResolver,
stateInstance *statelang.StateInstanceImpl, serviceTaskState *state.AbstractTaskState, variablesFrom any) []any {
inputAssignments := serviceTaskState.Input()
if inputAssignments == nil || len(inputAssignments) == 0 {
return inputAssignments
}

inputExpressions := serviceTaskState.InputExpressions()
if inputExpressions == nil || len(inputExpressions) == 0 {
lock := processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex)
lock.Lock()
defer lock.Unlock()
inputExpressions = serviceTaskState.InputExpressions()
if inputExpressions == nil || len(inputExpressions) == 0 {
inputExpressions = make([]any, 0, len(inputAssignments))

for _, assignment := range inputAssignments {
inputExpressions = append(inputExpressions, CreateValueExpression(expressionResolver, assignment))
}
}
serviceTaskState.SetInputExpressions(inputExpressions)
}
inputValues := make([]any, 0, len(inputExpressions))
for _, valueExpression := range inputExpressions {
value := GetValue(valueExpression, variablesFrom, stateInstance)
inputValues = append(inputValues, value)
}

return inputValues
}

func CreateOutputParams(config StateMachineConfig, expressionResolver expr.ExpressionResolver,
serviceTaskState *state.AbstractTaskState, variablesFrom any) (map[string]any, error) {
outputAssignments := serviceTaskState.Output()
if outputAssignments == nil || len(outputAssignments) == 0 {
return make(map[string]any, 0), nil
}

outputExpressions := serviceTaskState.OutputExpressions()
if outputExpressions == nil {
config.ComponentLock().Lock()
defer config.ComponentLock().Unlock()
outputExpressions = serviceTaskState.OutputExpressions()
if outputExpressions == nil {
outputExpressions = make(map[string]any, len(outputAssignments))
for key, value := range outputAssignments {
outputExpressions[key] = CreateValueExpression(expressionResolver, value)
}
}
serviceTaskState.SetOutputExpressions(outputExpressions)
}
outputValues := make(map[string]any, len(outputExpressions))
for paramName, _ := range outputExpressions {
outputValues[paramName] = GetValue(outputExpressions[paramName], variablesFrom, nil)
}
return outputValues, nil
}

func CreateValueExpression(expressionResolver expr.ExpressionResolver, paramAssignment any) any {
var valueExpression any

switch paramAssignment.(type) {
case expr.Expression:
valueExpression = paramAssignment
case map[string]any:
paramMapAssignment := paramAssignment.(map[string]any)
paramMap := make(map[string]any, len(paramMapAssignment))
for key, value := range paramMapAssignment {
paramMap[key] = CreateValueExpression(expressionResolver, value)
}
valueExpression = paramMap
case []any:
paramListAssignment := paramAssignment.([]any)
paramList := make([]any, 0, len(paramListAssignment))
for _, value := range paramListAssignment {
paramList = append(paramList, CreateValueExpression(expressionResolver, value))
}
valueExpression = paramList
case string:
value := paramAssignment.(string)
if !strings.HasPrefix(value, "$") {
valueExpression = paramAssignment
}
valueExpression = expressionResolver.Expression(value)
default:
valueExpression = paramAssignment
}
return valueExpression
}

func GetValue(valueExpression any, variablesFrom any, stateInstance statelang.StateInstance) any {
switch valueExpression.(type) {
case expr.Expression:
expression := valueExpression.(expr.Expression)
value := expression.Value(variablesFrom)
if _, ok := valueExpression.(expr.SequenceExpression); value != nil && stateInstance != nil && stateInstance.BusinessKey() == "" && ok {
stateInstance.SetBusinessKey(fmt.Sprintf("%v", value))
}
return value
case map[string]any:
mapValueExpression := valueExpression.(map[string]any)
mapValue := make(map[string]any, len(mapValueExpression))
for key, value := range mapValueExpression {
value = GetValue(value, variablesFrom, stateInstance)
if value != nil {
mapValue[key] = value
}
}
return mapValue
case []any:
valueExpressionList := valueExpression.([]any)
listValue := make([]any, 0, len(valueExpression.([]any)))
for i, _ := range valueExpressionList {
listValue = append(listValue, GetValue(valueExpressionList[i], variablesFrom, stateInstance))
}
return listValue
default:
return valueExpression
}
}

pkg/saga/statemachine/engine/process_ctrl/process_context.go → pkg/saga/statemachine/engine/core/process_context.go View File

@@ -1,4 +1,4 @@
package process_ctrl
package core

import (
"sync"

+ 31
- 0
pkg/saga/statemachine/engine/core/process_controller.go View File

@@ -0,0 +1,31 @@
package core

import (
"context"
)

type ProcessController interface {
Process(ctx context.Context, context ProcessContext) error
}

type ProcessControllerImpl struct {
businessProcessor BusinessProcessor
}

func (p *ProcessControllerImpl) Process(ctx context.Context, context ProcessContext) error {
if err := p.businessProcessor.Process(ctx, context); err != nil {
return err
}
if err := p.businessProcessor.Route(ctx, context); err != nil {
return err
}
return nil
}

func (p *ProcessControllerImpl) BusinessProcessor() BusinessProcessor {
return p.businessProcessor
}

func (p *ProcessControllerImpl) SetBusinessProcessor(businessProcessor BusinessProcessor) {
p.businessProcessor = businessProcessor
}

+ 424
- 0
pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go View File

@@ -0,0 +1,424 @@
package core

import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/exception"
"github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
seataErrors "github.com/seata/seata-go/pkg/util/errors"
"github.com/seata/seata-go/pkg/util/log"
"time"
)

type ProcessCtrlStateMachineEngine struct {
StateMachineConfig StateMachineConfig
}

func NewProcessCtrlStateMachineEngine() *ProcessCtrlStateMachineEngine {
return &ProcessCtrlStateMachineEngine{
StateMachineConfig: NewDefaultStateMachineConfig(),
}
}

func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) {
return p.startInternal(ctx, stateMachineName, tenantId, "", startParams, false, nil)
}

func (p ProcessCtrlStateMachineEngine) Compensate(ctx context.Context, stateMachineInstId string,
replaceParams map[string]any) (statelang.StateMachineInstance, error) {
return p.compensateInternal(ctx, stateMachineInstId, replaceParams, false, nil)
}

func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, stateMachineName string, tenantId string,
businessKey string, startParams map[string]interface{}, async bool, callback CallBack) (statelang.StateMachineInstance, error) {
if tenantId == "" {
tenantId = p.StateMachineConfig.DefaultTenantId()
}

stateMachineInstance, err := p.createMachineInstance(stateMachineName, tenantId, businessKey, startParams)
if err != nil {
return nil, err
}

// Build the process_ctrl context.
processContextBuilder := NewProcessContextBuilder().
WithProcessType(process.StateLang).
WithOperationName(constant.OperationNameStart).
WithAsyncCallback(callback).
WithInstruction(NewStateInstruction(stateMachineName, tenantId)).
WithStateMachineInstance(stateMachineInstance).
WithStateMachineConfig(p.StateMachineConfig).
WithStateMachineEngine(p).
WithIsAsyncExecution(async)

contextMap := p.copyMap(startParams)

stateMachineInstance.SetContext(contextMap)

processContext := processContextBuilder.WithStateMachineContextVariables(contextMap).Build()

if stateMachineInstance.StateMachine().IsPersist() && p.StateMachineConfig.StateLogStore() != nil {
err := p.StateMachineConfig.StateLogStore().RecordStateMachineStarted(ctx, stateMachineInstance, processContext)
if err != nil {
return nil, err
}
}

if stateMachineInstance.ID() == "" {
stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(constant.SeqEntityStateMachineInst, ""))
}

var eventPublisher EventPublisher
if async {
eventPublisher = p.StateMachineConfig.AsyncEventPublisher()
} else {
eventPublisher = p.StateMachineConfig.EventPublisher()
}

_, err = eventPublisher.PushEvent(ctx, processContext)
if err != nil {
return nil, err
}

return stateMachineInstance, nil
}

// copyMap not deep copy, so best practice: Don’t pass by reference
func (p ProcessCtrlStateMachineEngine) copyMap(startParams map[string]interface{}) map[string]interface{} {
copyMap := make(map[string]interface{}, len(startParams))
for k, v := range startParams {
copyMap[k] = v
}
return copyMap
}

func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName string, tenantId string, businessKey string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) {
stateMachine, err := p.StateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(stateMachineName, tenantId)
if err != nil {
return nil, err
}

if stateMachine == nil {
return nil, errors.New("StateMachine [" + stateMachineName + "] is not exists")
}

stateMachineInstance := statelang.NewStateMachineInstanceImpl()
stateMachineInstance.SetStateMachine(stateMachine)
stateMachineInstance.SetTenantID(tenantId)
stateMachineInstance.SetBusinessKey(businessKey)
stateMachineInstance.SetStartParams(startParams)
if startParams != nil {
if businessKey != "" {
startParams[constant.VarNameBusinesskey] = businessKey
}

if startParams[constant.VarNameParentId] != nil {
parentId, ok := startParams[constant.VarNameParentId].(string)
if !ok {

}
stateMachineInstance.SetParentID(parentId)
delete(startParams, constant.VarNameParentId)
}
}

stateMachineInstance.SetStatus(statelang.RU)
stateMachineInstance.SetRunning(true)

now := time.Now()
stateMachineInstance.SetStartedTime(now)
stateMachineInstance.SetUpdatedTime(now)
return stateMachineInstance, nil
}

func (p ProcessCtrlStateMachineEngine) compensateInternal(ctx context.Context, stateMachineInstId string, replaceParams map[string]any,
async bool, callback CallBack) (statelang.StateMachineInstance, error) {
stateMachineInstance, err := p.reloadStateMachineInstance(ctx, stateMachineInstId)
if err != nil {
return nil, err
}

if stateMachineInstance == nil {
return nil, exception.NewEngineExecutionException(seataErrors.StateMachineInstanceNotExists,
"StateMachineInstance is not exits", nil)
}

if statelang.SU == stateMachineInstance.CompensationStatus() {
return stateMachineInstance, nil
}

if stateMachineInstance.CompensationStatus() != "" {
denyStatus := make([]statelang.ExecutionStatus, 0)
denyStatus = append(denyStatus, statelang.SU)
p.checkStatus(ctx, stateMachineInstance, nil, denyStatus, "", stateMachineInstance.CompensationStatus(),
"compensate")
}

if replaceParams != nil {
for key, value := range replaceParams {
stateMachineInstance.EndParams()[key] = value
}
}

contextBuilder := NewProcessContextBuilder().WithProcessType(process.StateLang).
WithOperationName(constant.OperationNameCompensate).WithAsyncCallback(callback).
WithStateMachineInstance(stateMachineInstance).
WithStateMachineConfig(p.StateMachineConfig).WithStateMachineEngine(p).WithIsAsyncExecution(async)

context := contextBuilder.Build()

contextVariables, err := p.getStateMachineContextVariables(ctx, stateMachineInstance)

if replaceParams != nil {
for key, value := range replaceParams {
contextVariables[key] = value
}
}

p.putBusinesskeyToContextariables(stateMachineInstance, contextVariables)

// TODO: Here is not use sync.map, make sure whether to use it
concurrentContextVariables := make(map[string]any)
p.nullSafeCopy(contextVariables, concurrentContextVariables)

context.SetVariable(constant.VarNameStateMachineContext, concurrentContextVariables)
stateMachineInstance.SetContext(concurrentContextVariables)

tempCompensationTriggerState := state.NewCompensationTriggerStateImpl()
tempCompensationTriggerState.SetStateMachine(stateMachineInstance.StateMachine())

stateMachineInstance.SetRunning(true)

log.Info("Operation [compensate] start. stateMachineInstance[id:" + stateMachineInstance.ID() + "]")

if stateMachineInstance.StateMachine().IsPersist() {
err := p.StateMachineConfig.StateLogStore().RecordStateMachineRestarted(ctx, stateMachineInstance, context)
if err != nil {
return nil, err
}
}

inst := NewStateInstruction(stateMachineInstance.TenantID(), stateMachineInstance.StateMachine().Name())
inst.SetTemporaryState(tempCompensationTriggerState)
context.SetInstruction(inst)

if async {
_, err := p.StateMachineConfig.AsyncEventPublisher().PushEvent(ctx, context)
if err != nil {
return nil, err
}
} else {
_, err := p.StateMachineConfig.EventPublisher().PushEvent(ctx, context)
if err != nil {
return nil, err
}
}

return stateMachineInstance, nil
}

func (p ProcessCtrlStateMachineEngine) reloadStateMachineInstance(ctx context.Context, instId string) (statelang.StateMachineInstance, error) {
instance, err := p.StateMachineConfig.StateLogStore().GetStateMachineInstance(instId)
if err != nil {
return nil, err
}
if instance != nil {
stateMachine := instance.StateMachine()
if stateMachine == nil {
stateMachine, err = p.StateMachineConfig.StateMachineRepository().GetStateMachineById(instance.MachineID())
if err != nil {
return nil, err
}
instance.SetStateMachine(stateMachine)
}
if stateMachine == nil {
return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
"StateMachine[id:"+instance.MachineID()+"] not exist.", nil)
}

stateList := instance.StateList()
if stateList == nil || len(stateList) == 0 {
stateList, err = p.StateMachineConfig.StateLogStore().GetStateInstanceListByMachineInstanceId(instId)
if err != nil {
return nil, err
}
if stateList != nil && len(stateList) > 0 {
for _, tmpStateInstance := range stateList {
instance.PutState(tmpStateInstance.ID(), tmpStateInstance)
}
}
}

if instance.EndParams() == nil || len(instance.EndParams()) == 0 {
variables, err := p.replayContextVariables(ctx, instance)
if err != nil {
return nil, err
}
instance.SetEndParams(variables)
}
}
return instance, nil
}

func (p ProcessCtrlStateMachineEngine) replayContextVariables(ctx context.Context, stateMachineInstance statelang.StateMachineInstance) (map[string]any, error) {
contextVariables := make(map[string]any)
if stateMachineInstance.StartParams() != nil {
for key, value := range stateMachineInstance.StartParams() {
contextVariables[key] = value
}
}

stateInstanceList := stateMachineInstance.StateList()
if stateInstanceList == nil || len(stateInstanceList) == 0 {
return contextVariables, nil
}

for _, stateInstance := range stateInstanceList {
serviceOutputParams := stateInstance.OutputParams()
if serviceOutputParams != nil {
serviceTaskStateImpl, ok := stateMachineInstance.StateMachine().State(GetOriginStateName(stateInstance)).(*state.ServiceTaskStateImpl)
if !ok {
return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
"Cannot find State by state name ["+stateInstance.Name()+"], may be this is a bug", nil)
}

if serviceTaskStateImpl.Output() != nil && len(serviceTaskStateImpl.Output()) != 0 {
outputVariablesToContext, err := CreateOutputParams(p.StateMachineConfig,
p.StateMachineConfig.ExpressionResolver(), serviceTaskStateImpl.AbstractTaskState, serviceOutputParams)
if err != nil {
return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
"Context variable replay failed", err)
}
if outputVariablesToContext != nil && len(outputVariablesToContext) != 0 {
for key, value := range outputVariablesToContext {
contextVariables[key] = value
}
}
if len(stateInstance.BusinessKey()) > 0 {
contextVariables[serviceTaskStateImpl.Name()+constant.VarNameBusinesskey] = stateInstance.BusinessKey()
}
}
}
}

return contextVariables, nil
}

func (p ProcessCtrlStateMachineEngine) checkStatus(ctx context.Context, stateMachineInstance statelang.StateMachineInstance,
acceptStatus []statelang.ExecutionStatus, denyStatus []statelang.ExecutionStatus, status statelang.ExecutionStatus,
compenStatus statelang.ExecutionStatus, operation string) (bool, error) {
if status != "" && compenStatus != "" {
return false, exception.NewEngineExecutionException(seataErrors.InvalidParameter,
"status and compensationStatus are not supported at the same time", nil)
}
if status == "" && compenStatus == "" {
return false, exception.NewEngineExecutionException(seataErrors.InvalidParameter,
"status and compensationStatus must input at least one", nil)
}
if statelang.SU == compenStatus {
message := p.buildExceptionMessage(stateMachineInstance, nil, nil, "", statelang.SU, operation)
return false, exception.NewEngineExecutionException(seataErrors.OperationDenied,
message, nil)
}

if stateMachineInstance.IsRunning() &&
!IsTimeout(stateMachineInstance.UpdatedTime(), p.StateMachineConfig.TransOperationTimeout()) {
return false, exception.NewEngineExecutionException(seataErrors.OperationDenied,
"StateMachineInstance [id:"+stateMachineInstance.ID()+"] is running, operation["+operation+
"] denied", nil)
}

if (denyStatus == nil || len(denyStatus) == 0) && (acceptStatus == nil || len(acceptStatus) == 0) {
return false, exception.NewEngineExecutionException(seataErrors.InvalidParameter,
"StateMachineInstance[id:"+stateMachineInstance.ID()+
"], acceptable status and deny status must input at least one", nil)
}

currentStatus := compenStatus
if status != "" {
currentStatus = status
}

if denyStatus != nil && len(denyStatus) == 0 {
for _, tempDenyStatus := range denyStatus {
if tempDenyStatus == currentStatus {
message := p.buildExceptionMessage(stateMachineInstance, acceptStatus, denyStatus, status,
compenStatus, operation)
return false, exception.NewEngineExecutionException(seataErrors.OperationDenied,
message, nil)
}
}
}

if acceptStatus == nil || len(acceptStatus) == 0 {
return true, nil
} else {
for _, tempStatus := range acceptStatus {
if tempStatus == currentStatus {
return true, nil
}
}
}

message := p.buildExceptionMessage(stateMachineInstance, acceptStatus, denyStatus, status, compenStatus,
operation)
return false, exception.NewEngineExecutionException(seataErrors.OperationDenied,
message, nil)
}

func (p ProcessCtrlStateMachineEngine) getStateMachineContextVariables(ctx context.Context,
stateMachineInstance statelang.StateMachineInstance) (map[string]any, error) {
contextVariables := stateMachineInstance.EndParams()
if contextVariables == nil || len(contextVariables) == 0 {
return p.replayContextVariables(ctx, stateMachineInstance)
}
return contextVariables, nil
}

func (p ProcessCtrlStateMachineEngine) buildExceptionMessage(instance statelang.StateMachineInstance,
acceptStatus []statelang.ExecutionStatus, denyStatus []statelang.ExecutionStatus, status statelang.ExecutionStatus,
compenStatus statelang.ExecutionStatus, operation string) string {
message := fmt.Sprintf("StateMachineInstance[id:%s]", instance.ID())
if len(acceptStatus) > 0 {
message += ",acceptable status :"
for _, tempStatus := range acceptStatus {
message += string(tempStatus) + " "
}
}

if len(denyStatus) > 0 {
message += ",deny status:"
for _, tempStatus := range denyStatus {
message += string(tempStatus) + " "
}
}

if status != "" {
message += ",current status:" + string(status)
}

if compenStatus != "" {
message += ",current compensation status:" + string(compenStatus)
}

message += fmt.Sprintf(",so operation [%s] denied", operation)
return message
}

func (p ProcessCtrlStateMachineEngine) putBusinesskeyToContextariables(instance statelang.StateMachineInstance, variables map[string]any) {
if instance.BusinessKey() != "" && variables[constant.VarNameBusinesskey] == "" {
variables[constant.VarNameBusinesskey] = instance.BusinessKey()
}
}

func (p ProcessCtrlStateMachineEngine) nullSafeCopy(srcMap map[string]any, destMap map[string]any) {
for key, value := range srcMap {
if value == nil {
destMap[key] = value
}
}
}

+ 194
- 0
pkg/saga/statemachine/engine/core/process_router.go View File

@@ -0,0 +1,194 @@
package core

import (
"context"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/seata/seata-go/pkg/util/log"
)

type RouterHandler interface {
Route(ctx context.Context, processContext ProcessContext) error
}

type ProcessRouter interface {
Route(ctx context.Context, processContext ProcessContext) error
}

type InterceptAbleStateRouter interface {
StateRouter
StateRouterInterceptor() []StateRouterInterceptor
RegistryStateRouterInterceptor(stateRouterInterceptor StateRouterInterceptor)
}

type StateRouter interface {
Route(ctx context.Context, processContext ProcessContext, state statelang.State) (Instruction, error)
}

type StateRouterInterceptor interface {
PreRoute(ctx context.Context, processContext ProcessContext, state statelang.State) error
PostRoute(ctx context.Context, processContext ProcessContext, instruction Instruction, err error) error
Match(stateType string) bool
}

type DefaultRouterHandler struct {
eventPublisher EventPublisher
processRouters map[string]ProcessRouter
}

func (d *DefaultRouterHandler) Route(ctx context.Context, processContext ProcessContext) error {
processType := d.matchProcessType(ctx, processContext)
if processType == "" {
log.Warnf("Process type not found, context= %s", processContext)
return errors.New("Process type not found")
}

processRouter := d.processRouters[string(processType)]
if processRouter == nil {
log.Errorf("Cannot find process router by type %s, context = %s", processType, processContext)
return errors.New("Process router not found")
}

instruction := processRouter.Route(ctx, processContext)
if instruction == nil {
log.Info("route instruction is null, process end")
} else {
processContext.SetInstruction(instruction)
_, err := d.eventPublisher.PushEvent(ctx, processContext)
if err != nil {
return err
}
}

return nil
}

func (d *DefaultRouterHandler) matchProcessType(ctx context.Context, processContext ProcessContext) process.ProcessType {
processType, ok := processContext.GetVariable(constant.VarNameProcessType).(process.ProcessType)
if !ok || processType == "" {
processType = process.StateLang
}
return processType
}

func (d *DefaultRouterHandler) EventPublisher() EventPublisher {
return d.eventPublisher
}

func (d *DefaultRouterHandler) SetEventPublisher(eventPublisher EventPublisher) {
d.eventPublisher = eventPublisher
}

func (d *DefaultRouterHandler) ProcessRouters() map[string]ProcessRouter {
return d.processRouters
}

func (d *DefaultRouterHandler) SetProcessRouters(processRouters map[string]ProcessRouter) {
d.processRouters = processRouters
}

type StateMachineProcessRouter struct {
stateRouters map[string]StateRouter
}

func (s *StateMachineProcessRouter) Route(ctx context.Context, processContext ProcessContext) (Instruction, error) {
stateInstruction, ok := processContext.GetInstruction().(StateInstruction)
if !ok {
return nil, errors.New("instruction is not a state instruction")
}

var state statelang.State
if stateInstruction.TemporaryState() != nil {
state = stateInstruction.TemporaryState()
stateInstruction.SetTemporaryState(nil)
} else {
stateMachineConfig, ok := processContext.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig)
if !ok {
return nil, errors.New("state machine config not found")
}

stateMachine, err := stateMachineConfig.StateMachineRepository().GetStateMachineByNameAndTenantId(stateInstruction.StateMachineName(),
stateInstruction.TenantId())
if err != nil {
return nil, err
}

state = stateMachine.States()[stateInstruction.StateName()]
}

stateType := state.Type()
router := s.stateRouters[stateType]

var interceptors []StateRouterInterceptor
if interceptAbleStateRouter, ok := router.(InterceptAbleStateRouter); ok {
interceptors = interceptAbleStateRouter.StateRouterInterceptor()
}

var executedInterceptors []StateRouterInterceptor
var exception error
instruction, exception := func() (Instruction, error) {
if interceptors == nil || len(executedInterceptors) == 0 {
executedInterceptors = make([]StateRouterInterceptor, 0, len(interceptors))
for _, interceptor := range interceptors {
executedInterceptors = append(executedInterceptors, interceptor)
err := interceptor.PreRoute(ctx, processContext, state)
if err != nil {
return nil, err
}
}
}

instruction, err := router.Route(ctx, processContext, state)
if err != nil {
return nil, err
}
return instruction, nil
}()

if interceptors == nil || len(executedInterceptors) == 0 {
for i := len(executedInterceptors) - 1; i >= 0; i-- {
err := executedInterceptors[i].PostRoute(ctx, processContext, instruction, exception)
if err != nil {
return nil, err
}
}

// if 'Succeed' or 'Fail' State did not configured, we must end the state machine
if instruction == nil && !stateInstruction.End() {
err := EndStateMachine(ctx, processContext)
if err != nil {
return nil, err
}
}
}

return instruction, nil
}

func (s *StateMachineProcessRouter) InitDefaultStateRouters() {
if s.stateRouters == nil || len(s.stateRouters) == 0 {
s.stateRouters = make(map[string]StateRouter)
taskStateRouter := &TaskStateRouter{}
s.stateRouters[constant.StateTypeServiceTask] = taskStateRouter
s.stateRouters[constant.StateTypeScriptTask] = taskStateRouter
s.stateRouters[constant.StateTypeChoice] = taskStateRouter
s.stateRouters[constant.StateTypeCompensationTrigger] = taskStateRouter
s.stateRouters[constant.StateTypeSubStateMachine] = taskStateRouter
s.stateRouters[constant.StateTypeCompensateSubMachine] = taskStateRouter
s.stateRouters[constant.StateTypeLoopStart] = taskStateRouter

endStateRouter := &EndStateRouter{}
s.stateRouters[constant.StateTypeSucceed] = endStateRouter
s.stateRouters[constant.StateTypeFail] = endStateRouter
}
}

func (s *StateMachineProcessRouter) StateRouters() map[string]StateRouter {
return s.stateRouters
}

func (s *StateMachineProcessRouter) SetStateRouters(stateRouters map[string]StateRouter) {
s.stateRouters = stateRouters
}

pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go → pkg/saga/statemachine/engine/core/process_state.go View File

@@ -1,8 +1,8 @@
package process_ctrl
package core

import (
"context"
"github.com/pkg/errors"
"errors"
"sync"
)

@@ -11,20 +11,20 @@ type StateHandler interface {
ProcessHandler
}

type StateRouter interface {
State() string
RouterHandler
}

type InterceptAbleStateHandler interface {
StateHandler
StateHandlerInterceptorList() []StateHandlerInterceptor
RegistryStateHandlerInterceptor(stateHandlerInterceptor StateHandlerInterceptor)
}

type ProcessHandler interface {
Process(ctx context.Context, processContext ProcessContext) error
}

type StateHandlerInterceptor interface {
PreProcess(ctx context.Context, processContext ProcessContext) error
PostProcess(ctx context.Context, processContext ProcessContext) error
Match(stateType string) bool
}

type StateMachineProcessHandler struct {
@@ -99,40 +99,3 @@ func (s *StateMachineProcessHandler) RegistryStateHandler(stateType string, stat
}
s.mp[stateType] = stateHandler
}

type StateMachineRouterHandler struct {
mu sync.RWMutex
mp map[string]StateRouter
}

func (s *StateMachineRouterHandler) Route(ctx context.Context, processContext ProcessContext) error {
stateInstruction, _ := processContext.GetInstruction().(StateInstruction)

state, err := stateInstruction.GetState(processContext)
if err != nil {
return err
}

stateType := state.Type()
stateRouter := s.GetStateRouter(stateType)
if stateRouter == nil {
return errors.New("Not support [" + stateType + "] state router")
}

return stateRouter.Route(ctx, processContext)
}

func (s *StateMachineRouterHandler) GetStateRouter(stateType string) StateRouter {
s.mu.RLock()
defer s.mu.RUnlock()
return s.mp[stateType]
}

func (s *StateMachineRouterHandler) RegistryStateRouter(stateType string, stateRouter StateRouter) {
s.mu.Lock()
defer s.mu.Unlock()
if s.mp == nil {
s.mp = make(map[string]StateRouter)
}
s.mp[stateType] = stateRouter
}

+ 151
- 0
pkg/saga/statemachine/engine/core/state_router.go View File

@@ -0,0 +1,151 @@
package core

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/exception"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
sagaState "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
seataErrors "github.com/seata/seata-go/pkg/util/errors"
"github.com/seata/seata-go/pkg/util/log"
)

type EndStateRouter struct {
}

func (e EndStateRouter) Route(ctx context.Context, processContext ProcessContext, state statelang.State) (Instruction, error) {
return nil, nil
}

type TaskStateRouter struct {
}

func (t TaskStateRouter) Route(ctx context.Context, processContext ProcessContext, state statelang.State) (Instruction, error) {
stateInstruction, _ := processContext.GetInstruction().(StateInstruction)
if stateInstruction.End() {
log.Infof("StateInstruction is ended, Stop the StateMachine executing. StateMachine[%s] Current State[%s]",
stateInstruction.StateMachineName(), stateInstruction.StateName())
}

// check if in loop async condition
isLoop, ok := processContext.GetVariable(constant.VarNameIsLoopState).(bool)
if ok && isLoop {
log.Infof("StateMachine[%s] Current State[%s] is in loop async condition, skip route processing.",
stateInstruction.StateMachineName(), stateInstruction.StateName())
return nil, nil
}

// The current CompensationTriggerState can mark the compensation process is started and perform compensation
// route processing.
compensationTriggerState, ok := processContext.GetVariable(constant.VarNameCurrentCompensateTriggerState).(statelang.State)
if ok {
return t.compensateRoute(ctx, processContext, compensationTriggerState)
}

// There is an exception route, indicating that an exception is thrown, and the exception route is prioritized.
next := processContext.GetVariable(constant.VarNameCurrentExceptionRoute).(string)

if next != "" {
processContext.RemoveVariable(constant.VarNameCurrentExceptionRoute)
} else {
next = state.Next()
}

// If next is empty, the state selected by the Choice state was taken.
if next == "" && processContext.HasVariable(constant.VarNameCurrentChoice) {
next = processContext.GetVariable(constant.VarNameCurrentChoice).(string)
processContext.RemoveVariable(constant.VarNameCurrentChoice)
}

if next == "" {
return nil, nil
}

stateMachine := state.StateMachine()
nextState := stateMachine.State(next)
if nextState == nil {
return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
"Next state["+next+"] is not exits", nil)
}

stateInstruction.SetStateName(next)

if nil != GetLoopConfig(ctx, processContext, nextState) {
stateInstruction.SetTemporaryState(sagaState.NewLoopStartStateImpl())
}

return stateInstruction, nil
}

func (t *TaskStateRouter) compensateRoute(ctx context.Context, processContext ProcessContext,
compensationTriggerState statelang.State) (Instruction, error) {
//If there is already a compensation state that has been executed,
// it is judged whether it is wrong or unsuccessful,
// and the compensation process is interrupted.
isFirstCompensationStateStart := processContext.GetVariable(constant.VarNameFirstCompensationStateStarted).(bool)
if isFirstCompensationStateStart {
exception := processContext.GetVariable(constant.VarNameCurrentException).(error)
if exception != nil {
return nil, EndStateMachine(ctx, processContext)
}

stateInstance := processContext.GetVariable(constant.VarNameStateInst).(statelang.StateInstance)
if stateInstance != nil && statelang.SU != stateInstance.Status() {
return nil, EndStateMachine(ctx, processContext)
}
}

stateStackToBeCompensated := GetCurrentCompensationHolder(ctx, processContext, true).StateStackNeedCompensation()
if stateStackToBeCompensated != nil {
stateToBeCompensated := stateStackToBeCompensated.Pop().(statelang.StateInstance)

stateMachine := processContext.GetVariable(constant.VarNameStateMachine).(statelang.StateMachine)
state := stateMachine.State(GetOriginStateName(stateToBeCompensated))
if taskState, ok := state.(sagaState.AbstractTaskState); ok {
instruction := processContext.GetInstruction().(StateInstruction)

var compensateState statelang.State
compensateStateName := taskState.CompensateState()
if len(compensateStateName) != 0 {
compensateState = stateMachine.State(compensateStateName)
}

if subStateMachine, ok := state.(sagaState.SubStateMachine); compensateState == nil && ok {
compensateState = subStateMachine.CompensateStateImpl()
instruction.SetTemporaryState(compensateState)
}

if compensateState == nil {
return nil, EndStateMachine(ctx, processContext)
}

instruction.SetStateName(compensateState.Name())

GetCurrentCompensationHolder(ctx, processContext, true).AddToBeCompensatedState(compensateState.Name(),
stateToBeCompensated)

hierarchicalProcessContext := processContext.(HierarchicalProcessContext)
hierarchicalProcessContext.SetVariableLocally(constant.VarNameFirstCompensationStateStarted, true)

if _, ok := compensateState.(sagaState.CompensateSubStateMachineState); ok {
hierarchicalProcessContext = processContext.(HierarchicalProcessContext)
hierarchicalProcessContext.SetVariableLocally(
compensateState.Name()+constant.VarNameSubMachineParentId,
GenerateParentId(stateToBeCompensated))
}

return instruction, nil
}
}

processContext.RemoveVariable(constant.VarNameCurrentCompensateTriggerState)

compensationTriggerStateNext := compensationTriggerState.Next()
if compensationTriggerStateNext == "" {
return nil, EndStateMachine(ctx, processContext)
}

instruction := processContext.GetInstruction().(StateInstruction)
instruction.SetStateName(compensationTriggerStateNext)
return instruction, nil
}

pkg/saga/statemachine/engine/statemachine_config.go → pkg/saga/statemachine/engine/core/statemachine_config.go View File

@@ -1,22 +1,20 @@
package engine
package core

import (
"github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/status_decision"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/store"
"sync"
)

type StateMachineConfig interface {
StateLogRepository() store.StateLogRepository
StateLogRepository() StateLogRepository

StateMachineRepository() store.StateMachineRepository
StateMachineRepository() StateMachineRepository

StateLogStore() store.StateLogStore
StateLogStore() StateLogStore

StateLangStore() store.StateLangStore
StateLangStore() StateLangStore

ExpressionFactoryManager() expr.ExpressionFactoryManager

@@ -24,11 +22,11 @@ type StateMachineConfig interface {

SeqGenerator() sequence.SeqGenerator

StatusDecisionStrategy() status_decision.StatusDecisionStrategy
StatusDecisionStrategy() StatusDecisionStrategy

EventPublisher() events.EventPublisher
EventPublisher() EventPublisher

AsyncEventPublisher() events.EventPublisher
AsyncEventPublisher() EventPublisher

ServiceInvokerManager() invoker.ServiceInvokerManager

@@ -41,4 +39,6 @@ type StateMachineConfig interface {
TransOperationTimeout() int

ServiceInvokeTimeout() int

ComponentLock() *sync.Mutex
}

+ 16
- 0
pkg/saga/statemachine/engine/core/statemachine_engine.go View File

@@ -0,0 +1,16 @@
package core

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

type StateMachineEngine interface {
Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error)
Compensate(ctx context.Context, stateMachineInstId string, replaceParams map[string]any) (statelang.StateMachineInstance, error)
}

type CallBack interface {
OnFinished(ctx context.Context, context ProcessContext, stateMachineInstance statelang.StateMachineInstance)
OnError(ctx context.Context, context ProcessContext, stateMachineInstance statelang.StateMachineInstance, err error)
}

+ 15
- 0
pkg/saga/statemachine/engine/core/statemachine_engine_test.go View File

@@ -0,0 +1,15 @@
package core

import (
"context"
"testing"
)

func TestEngine(t *testing.T) {

}

func TestSimpleStateMachine(t *testing.T) {
engine := NewProcessCtrlStateMachineEngine()
engine.Start(context.Background(), "simpleStateMachine", "tenantId", nil)
}

pkg/saga/statemachine/engine/store/statemachine_store.go → pkg/saga/statemachine/engine/core/statemachine_store.go View File

@@ -1,8 +1,7 @@
package store
package core

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"io"
)
@@ -20,15 +19,15 @@ type StateLogRepository interface {
}

type StateLogStore interface {
RecordStateMachineStarted(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
RecordStateMachineStarted(ctx context.Context, machineInstance statelang.StateMachineInstance, context ProcessContext) error

RecordStateMachineFinished(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
RecordStateMachineFinished(ctx context.Context, machineInstance statelang.StateMachineInstance, context ProcessContext) error

RecordStateMachineRestarted(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error
RecordStateMachineRestarted(ctx context.Context, machineInstance statelang.StateMachineInstance, context ProcessContext) error

RecordStateStarted(ctx context.Context, stateInstance statelang.StateInstance, context process_ctrl.ProcessContext) error
RecordStateStarted(ctx context.Context, stateInstance statelang.StateInstance, context ProcessContext) error

RecordStateFinished(ctx context.Context, stateInstance statelang.StateInstance, context process_ctrl.ProcessContext) error
RecordStateFinished(ctx context.Context, stateInstance statelang.StateInstance, context ProcessContext) error

GetStateMachineInstance(stateMachineInstanceId string) (statelang.StateMachineInstance, error)

@@ -44,6 +43,8 @@ type StateLogStore interface {
type StateMachineRepository interface {
GetStateMachineById(stateMachineId string) (statelang.StateMachine, error)

GetStateMachineByNameAndTenantId(stateMachineName string, tenantId string) (statelang.StateMachine, error)

GetLastVersionStateMachine(stateMachineName string, tenantId string) (statelang.StateMachine, error)

RegistryStateMachine(statelang.StateMachine) error

+ 188
- 0
pkg/saga/statemachine/engine/core/status_decision.go View File

@@ -0,0 +1,188 @@
package core

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/seata/seata-go/pkg/util/log"
)

type StatusDecisionStrategy interface {
// DecideOnEndState Determine state machine execution status when executing to EndState
DecideOnEndState(ctx context.Context, processContext ProcessContext,
stateMachineInstance statelang.StateMachineInstance, exp error) error
// DecideOnTaskStateFail Determine state machine execution status when executing TaskState error
DecideOnTaskStateFail(ctx context.Context, processContext ProcessContext,
stateMachineInstance statelang.StateMachineInstance, exp error) error
// DecideMachineForwardExecutionStatus Determine the forward execution state of the state machine
DecideMachineForwardExecutionStatus(ctx context.Context,
stateMachineInstance statelang.StateMachineInstance, exp error, specialPolicy bool) error
}

type DefaultStatusDecisionStrategy struct {
}

func NewDefaultStatusDecisionStrategy() *DefaultStatusDecisionStrategy {
return &DefaultStatusDecisionStrategy{}
}

func (d DefaultStatusDecisionStrategy) DecideOnEndState(ctx context.Context, processContext ProcessContext,
stateMachineInstance statelang.StateMachineInstance, exp error) error {
if statelang.RU == stateMachineInstance.CompensationStatus() {
compensationHolder := GetCurrentCompensationHolder(ctx, processContext, true)
if err := decideMachineCompensateStatus(ctx, stateMachineInstance, compensationHolder); err != nil {
return err
}
} else {
failEndStateFlag, ok := processContext.GetVariable(constant.VarNameFailEndStateFlag).(bool)
if !ok {
failEndStateFlag = false
}
if _, err := decideMachineForwardExecutionStatus(ctx, stateMachineInstance, exp, failEndStateFlag); err != nil {
return err
}
}

if stateMachineInstance.CompensationStatus() != "" && constant.OperationNameForward ==
processContext.GetVariable(constant.VarNameOperationName).(string) && statelang.SU == stateMachineInstance.Status() {
stateMachineInstance.SetCompensationStatus(statelang.FA)
}

log.Debugf("StateMachine Instance[id:%s,name:%s] execute finish with status[%s], compensation status [%s].",
stateMachineInstance.ID(), stateMachineInstance.StateMachine().Name(),
stateMachineInstance.Status(), stateMachineInstance.CompensationStatus())

return nil
}

func decideMachineCompensateStatus(ctx context.Context, stateMachineInstance statelang.StateMachineInstance, compensationHolder *CompensationHolder) error {
if stateMachineInstance.Status() == "" || statelang.RU == stateMachineInstance.Status() {
stateMachineInstance.SetStatus(statelang.UN)
}
if !compensationHolder.StateStackNeedCompensation().Empty() {
hasCompensateSUorUN := false
compensationHolder.StatesForCompensation().Range(
func(key, value any) bool {
stateInstance, ok := value.(statelang.StateInstance)
if !ok {
return false
}
if statelang.UN == stateInstance.Status() || statelang.SU == stateInstance.Status() {
hasCompensateSUorUN = true
return true
}
return false
})

if hasCompensateSUorUN {
stateMachineInstance.SetCompensationStatus(statelang.UN)
} else {
stateMachineInstance.SetCompensationStatus(statelang.FA)
}
} else {
hasCompensateError := false
compensationHolder.StatesForCompensation().Range(
func(key, value any) bool {
stateInstance, ok := value.(statelang.StateInstance)
if !ok {
return false
}
if statelang.SU != stateInstance.Status() {
hasCompensateError = true
return true
}
return false
})

if hasCompensateError {
stateMachineInstance.SetCompensationStatus(statelang.UN)
} else {
stateMachineInstance.SetCompensationStatus(statelang.SU)
}
}
return nil
}

func decideMachineForwardExecutionStatus(ctx context.Context, stateMachineInstance statelang.StateMachineInstance, exp error, specialPolicy bool) (bool, error) {
result := false

if stateMachineInstance.Status() == "" || statelang.RU == stateMachineInstance.Status() {
result = true
stateList := stateMachineInstance.StateList()
setMachineStatusBasedOnStateListAndException(stateMachineInstance, stateList, exp)

if specialPolicy && statelang.SU == stateMachineInstance.Status() {
for _, stateInstance := range stateMachineInstance.StateList() {
if !stateInstance.IsIgnoreStatus() && (stateInstance.IsForUpdate() || stateInstance.IsForCompensation()) {
stateMachineInstance.SetStatus(statelang.UN)
break
}
}
if statelang.SU == stateMachineInstance.Status() {
stateMachineInstance.SetStatus(statelang.FA)
}
}
}
return result, nil
}

func setMachineStatusBasedOnStateListAndException(stateMachineInstance statelang.StateMachineInstance,
stateList []statelang.StateInstance, exp error) {
hasSetStatus := false
hasSuccessUpdateService := false
if stateList != nil && len(stateList) > 0 {
hasUnsuccessService := false

for i := len(stateList) - 1; i >= 0; i-- {
stateInstance := stateList[i]

if stateInstance.IsIgnoreStatus() || stateInstance.IsForCompensation() {
continue
}
if statelang.UN == stateInstance.Status() {
stateMachineInstance.SetStatus(statelang.UN)
hasSetStatus = true
} else if statelang.SU == stateInstance.Status() {
if constant.StateTypeServiceTask == stateInstance.Type() {
if stateInstance.IsForUpdate() && !stateInstance.IsForCompensation() {
hasSuccessUpdateService = true
}
}
} else if statelang.SK == stateInstance.Status() {
// ignore
} else {
hasUnsuccessService = true
}
}

if !hasSetStatus && hasUnsuccessService {
if hasSuccessUpdateService {
stateMachineInstance.SetStatus(statelang.UN)
} else {
stateMachineInstance.SetStatus(statelang.FA)
}
hasSetStatus = true
}
}

if !hasSetStatus {
setMachineStatusBasedOnException(stateMachineInstance, exp, hasSuccessUpdateService)
}
}

func setMachineStatusBasedOnException(stateMachineInstance statelang.StateMachineInstance, exp error, hasSuccessUpdateService bool) {
//TODO implement me
panic("implement me")
}

func (d DefaultStatusDecisionStrategy) DecideOnTaskStateFail(ctx context.Context, processContext ProcessContext,
stateMachineInstance statelang.StateMachineInstance, exp error) error {
//TODO implement me
panic("implement me")
}

func (d DefaultStatusDecisionStrategy) DecideMachineForwardExecutionStatus(ctx context.Context,
stateMachineInstance statelang.StateMachineInstance, exp error, specialPolicy bool) error {
//TODO implement me
panic("implement me")
}

pkg/saga/statemachine/engine/utils.go → pkg/saga/statemachine/engine/core/utils.go View File

@@ -1,22 +1,22 @@
package engine
package core

import (
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

// ProcessContextBuilder process_ctrl builder
type ProcessContextBuilder struct {
processContext process_ctrl.ProcessContext
processContext ProcessContext
}

func NewProcessContextBuilder() *ProcessContextBuilder {
processContextImpl := process_ctrl.NewProcessContextImpl()
processContextImpl := NewProcessContextImpl()
return &ProcessContextBuilder{processContextImpl}
}

func (p *ProcessContextBuilder) WithProcessType(processType process_ctrl.ProcessType) *ProcessContextBuilder {
func (p *ProcessContextBuilder) WithProcessType(processType process.ProcessType) *ProcessContextBuilder {
p.processContext.SetVariable(constant.VarNameProcessType, processType)
return p
}
@@ -34,7 +34,7 @@ func (p *ProcessContextBuilder) WithAsyncCallback(callBack CallBack) *ProcessCon
return p
}

func (p *ProcessContextBuilder) WithInstruction(instruction process_ctrl.Instruction) *ProcessContextBuilder {
func (p *ProcessContextBuilder) WithInstruction(instruction Instruction) *ProcessContextBuilder {
if instruction != nil {
p.processContext.SetInstruction(instruction)
}
@@ -81,6 +81,6 @@ func (p *ProcessContextBuilder) WithIsAsyncExecution(async bool) *ProcessContext
return p
}

func (p *ProcessContextBuilder) Build() process_ctrl.ProcessContext {
func (p *ProcessContextBuilder) Build() ProcessContext {
return p.processContext
}

+ 0
- 123
pkg/saga/statemachine/engine/default_statemachine_config.go View File

@@ -1,123 +0,0 @@
package engine

import (
"github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/status_decision"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/store"
)

const (
DefaultTransOperTimeout = 60000 * 30
DefaultServiceInvokeTimeout = 60000 * 5
)

type DefaultStateMachineConfig struct {
// Configuration
transOperationTimeout int
serviceInvokeTimeout int
charset string
defaultTenantId string

// Components

// Store related components
stateLogRepository store.StateLogRepository
stateLogStore store.StateLogStore
stateLangStore store.StateLangStore
stateMachineRepository store.StateMachineRepository

// Expression related components
expressionFactoryManager expr.ExpressionFactoryManager
expressionResolver expr.ExpressionResolver

// Invoker related components
serviceInvokerManager invoker.ServiceInvokerManager
scriptInvokerManager invoker.ScriptInvokerManager

// Other components
statusDecisionStrategy status_decision.StatusDecisionStrategy
seqGenerator sequence.SeqGenerator
}

func (c *DefaultStateMachineConfig) StateLogRepository() store.StateLogRepository {
return c.stateLogRepository
}

func (c *DefaultStateMachineConfig) StateMachineRepository() store.StateMachineRepository {
return c.stateMachineRepository
}

func (c *DefaultStateMachineConfig) StateLogStore() store.StateLogStore {
return c.stateLogStore
}

func (c *DefaultStateMachineConfig) StateLangStore() store.StateLangStore {
return c.stateLangStore
}

func (c *DefaultStateMachineConfig) ExpressionFactoryManager() expr.ExpressionFactoryManager {
return c.expressionFactoryManager
}

func (c *DefaultStateMachineConfig) ExpressionResolver() expr.ExpressionResolver {
return c.expressionResolver
}

func (c *DefaultStateMachineConfig) SeqGenerator() sequence.SeqGenerator {
return c.seqGenerator
}

func (c *DefaultStateMachineConfig) StatusDecisionStrategy() status_decision.StatusDecisionStrategy {
return c.statusDecisionStrategy
}

func (c *DefaultStateMachineConfig) EventPublisher() events.EventPublisher {
//TODO implement me
panic("implement me")
}

func (c *DefaultStateMachineConfig) AsyncEventPublisher() events.EventPublisher {
//TODO implement me
panic("implement me")
}

func (c *DefaultStateMachineConfig) ServiceInvokerManager() invoker.ServiceInvokerManager {
return c.serviceInvokerManager
}

func (c *DefaultStateMachineConfig) ScriptInvokerManager() invoker.ScriptInvokerManager {
return c.scriptInvokerManager
}

func (c *DefaultStateMachineConfig) CharSet() string {
return c.charset
}

func (c *DefaultStateMachineConfig) SetCharSet(charset string) {
c.charset = charset
}

func (c *DefaultStateMachineConfig) DefaultTenantId() string {
return c.defaultTenantId
}

func (c *DefaultStateMachineConfig) TransOperationTimeout() int {
return c.transOperationTimeout
}

func (c *DefaultStateMachineConfig) ServiceInvokeTimeout() int {
return c.serviceInvokeTimeout
}

func NewDefaultStateMachineConfig() *DefaultStateMachineConfig {
c := &DefaultStateMachineConfig{
transOperationTimeout: DefaultTransOperTimeout,
serviceInvokeTimeout: DefaultServiceInvokeTimeout,
charset: "UTF-8",
defaultTenantId: "000001",
}
return c
}

+ 0
- 49
pkg/saga/statemachine/engine/events/event_bus.go View File

@@ -1,49 +0,0 @@
package events

import (
"context"
)

type EventBus interface {
Offer(ctx context.Context, event Event) (bool, error)

RegisterEventConsumer(consumer EventConsumer)
}

type BaseEventBus struct {
eventConsumerList []EventConsumer
}

func (b *BaseEventBus) RegisterEventConsumer(consumer EventConsumer) {
if b.eventConsumerList == nil {
b.eventConsumerList = make([]EventConsumer, 0)
}
b.eventConsumerList = append(b.eventConsumerList, consumer)
}

func (b *BaseEventBus) GetEventConsumerList(event Event) []EventConsumer {
var acceptedConsumerList = make([]EventConsumer, 0)
for i := range b.eventConsumerList {
eventConsumer := b.eventConsumerList[i]
if eventConsumer.Accept(event) {
acceptedConsumerList = append(acceptedConsumerList, eventConsumer)
}
}
return acceptedConsumerList
}

type DirectEventBus struct {
BaseEventBus
}

func (d DirectEventBus) Offer(ctx context.Context, event Event) (bool, error) {
eventConsumerList := d.GetEventConsumerList(event)
if len(eventConsumerList) == 0 {
//TODO logger
return false, nil
}

//processContext, _ := event.(process_ctrl.ProcessContext)

return true, nil
}

+ 54
- 0
pkg/saga/statemachine/engine/exception/exception.go View File

@@ -0,0 +1,54 @@
package exception

import "github.com/seata/seata-go/pkg/util/errors"

type EngineExecutionException struct {
errors.SeataError
stateName string
stateMachineName string
stateMachineInstanceId string
stateInstanceId string
}

func NewEngineExecutionException(code errors.TransactionErrorCode, msg string, parent error) *EngineExecutionException {
seataError := errors.New(code, msg, parent)
return &EngineExecutionException{
SeataError: *seataError,
}
}

func (e *EngineExecutionException) StateName() string {
return e.stateName
}

func (e *EngineExecutionException) SetStateName(stateName string) {
e.stateName = stateName
}

func (e *EngineExecutionException) StateMachineName() string {
return e.stateMachineName
}

func (e *EngineExecutionException) SetStateMachineName(stateMachineName string) {
e.stateMachineName = stateMachineName
}

func (e *EngineExecutionException) StateMachineInstanceId() string {
return e.stateMachineInstanceId
}

func (e *EngineExecutionException) SetStateMachineInstanceId(stateMachineInstanceId string) {
e.stateMachineInstanceId = stateMachineInstanceId
}

func (e *EngineExecutionException) StateInstanceId() string {
return e.stateInstanceId
}

func (e *EngineExecutionException) SetStateInstanceId(stateInstanceId string) {
e.stateInstanceId = stateInstanceId
}

type ForwardInvalidException struct {
EngineExecutionException
}

+ 83
- 0
pkg/saga/statemachine/engine/expr/expression.go View File

@@ -1,10 +1,93 @@
package expr

import (
"github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
"strings"
)

const DefaultExpressionType string = "Default"

type ExpressionResolver interface {
Expression(expressionStr string) Expression
ExpressionFactoryManager() ExpressionFactoryManager
SetExpressionFactoryManager(expressionFactoryManager ExpressionFactoryManager)
}

type Expression interface {
Value(elContext any) any
SetValue(value any, elContext any)
ExpressionString() string
}

type ExpressionFactory interface {
CreateExpression(expression string) Expression
}

type ExpressionFactoryManager struct {
expressionFactoryMap map[string]ExpressionFactory
}

func NewExpressionFactoryManager() *ExpressionFactoryManager {
return &ExpressionFactoryManager{
expressionFactoryMap: make(map[string]ExpressionFactory),
}
}

func (e *ExpressionFactoryManager) GetExpressionFactory(expressionType string) ExpressionFactory {
if strings.TrimSpace(expressionType) == "" {
expressionType = DefaultExpressionType
}
return e.expressionFactoryMap[expressionType]
}

func (e *ExpressionFactoryManager) SetExpressionFactoryMap(expressionFactoryMap map[string]ExpressionFactory) {
for k, v := range expressionFactoryMap {
e.expressionFactoryMap[k] = v
}
}

func (e *ExpressionFactoryManager) PutExpressionFactory(expressionType string, factory ExpressionFactory) {
e.expressionFactoryMap[expressionType] = factory
}

type SequenceExpression struct {
seqGenerator sequence.SeqGenerator
entity string
rule string
}

func (s *SequenceExpression) SeqGenerator() sequence.SeqGenerator {
return s.seqGenerator
}

func (s *SequenceExpression) SetSeqGenerator(seqGenerator sequence.SeqGenerator) {
s.seqGenerator = seqGenerator
}

func (s *SequenceExpression) Entity() string {
return s.entity
}

func (s *SequenceExpression) SetEntity(entity string) {
s.entity = entity
}

func (s *SequenceExpression) Rule() string {
return s.rule
}

func (s *SequenceExpression) SetRule(rule string) {
s.rule = rule
}

func (s SequenceExpression) Value(elContext any) any {
return s.seqGenerator.GenerateId(s.entity, s.rule)
}

func (s SequenceExpression) SetValue(value any, elContext any) {

}

func (s SequenceExpression) ExpressionString() string {
return s.entity + "|" + s.rule
}

+ 0
- 24
pkg/saga/statemachine/engine/process_ctrl/instruction.go View File

@@ -1,24 +0,0 @@
package process_ctrl

import (
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

type Instruction interface {
}

type StateInstruction struct {
StateName string
StateMachineName string
TenantId string
End bool
}

func (s StateInstruction) GetState(context ProcessContext) (statelang.State, error) {
//TODO implement me
panic("implement me")
}

func NewStateInstruction(stateMachineName string, tenantId string) *StateInstruction {
return &StateInstruction{StateMachineName: stateMachineName, TenantId: tenantId}
}

+ 0
- 125
pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go View File

@@ -1,125 +0,0 @@
package engine

import (
"context"
"time"

"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

type ProcessCtrlStateMachineEngine struct {
StateMachineConfig StateMachineConfig
}

func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) {
return p.startInternal(ctx, stateMachineName, tenantId, "", startParams, false, nil)
}

func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, stateMachineName string, tenantId string, businessKey string, startParams map[string]interface{}, async bool, callback CallBack) (statelang.StateMachineInstance, error) {
if tenantId == "" {
tenantId = p.StateMachineConfig.DefaultTenantId()
}

stateMachineInstance, err := p.createMachineInstance(stateMachineName, tenantId, businessKey, startParams)
if err != nil {
return nil, err
}

// Build the process_ctrl context.
processContextBuilder := NewProcessContextBuilder().
WithProcessType(process_ctrl.StateLang).
WithOperationName(constant.OperationNameStart).
WithAsyncCallback(callback).
WithInstruction(process_ctrl.NewStateInstruction(stateMachineName, tenantId)).
WithStateMachineInstance(stateMachineInstance).
WithStateMachineConfig(p.StateMachineConfig).
WithStateMachineEngine(p).
WithIsAsyncExecution(async)

contextMap := p.copyMap(startParams)

stateMachineInstance.SetContext(contextMap)

processContext := processContextBuilder.WithStateMachineContextVariables(contextMap).Build()

if stateMachineInstance.StateMachine().IsPersist() && p.StateMachineConfig.StateLogStore() != nil {
err := p.StateMachineConfig.StateLogStore().RecordStateMachineStarted(ctx, stateMachineInstance, processContext)
if err != nil {
return nil, err
}
}

if stateMachineInstance.ID() == "" {
stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(constant.SeqEntityStateMachineInst, ""))
}

var eventPublisher events.EventPublisher
if async {
eventPublisher = p.StateMachineConfig.AsyncEventPublisher()
} else {
eventPublisher = p.StateMachineConfig.EventPublisher()
}

_, err = eventPublisher.PushEvent(ctx, processContext)
if err != nil {
return nil, err
}

return stateMachineInstance, nil
}

// copyMap not deep copy, so best practice: Don’t pass by reference
func (p ProcessCtrlStateMachineEngine) copyMap(startParams map[string]interface{}) map[string]interface{} {
copyMap := make(map[string]interface{}, len(startParams))
for k, v := range startParams {
copyMap[k] = v
}
return copyMap
}

func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName string, tenantId string, businessKey string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) {
stateMachine, err := p.StateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(stateMachineName, tenantId)
if err != nil {
return nil, err
}

if stateMachine == nil {
return nil, errors.New("StateMachine [" + stateMachineName + "] is not exists")
}

stateMachineInstance := statelang.NewStateMachineInstanceImpl()
stateMachineInstance.SetStateMachine(stateMachine)
stateMachineInstance.SetTenantID(tenantId)
stateMachineInstance.SetBusinessKey(businessKey)
stateMachineInstance.SetStartParams(startParams)
if startParams != nil {
if businessKey != "" {
startParams[constant.VarNameBusinesskey] = businessKey
}

if startParams[constant.VarNameParentId] != nil {
parentId, ok := startParams[constant.VarNameParentId].(string)
if !ok {

}
stateMachineInstance.SetParentID(parentId)
delete(startParams, constant.VarNameParentId)
}
}

stateMachineInstance.SetStatus(statelang.RU)
stateMachineInstance.SetRunning(true)

now := time.Now()
stateMachineInstance.SetStartedTime(now)
stateMachineInstance.SetUpdatedTime(now)
return stateMachineInstance, nil
}

func NewProcessCtrlStateMachineEngine(stateMachineConfig StateMachineConfig) *ProcessCtrlStateMachineEngine {
return &ProcessCtrlStateMachineEngine{StateMachineConfig: stateMachineConfig}
}

+ 0
- 16
pkg/saga/statemachine/engine/statemachine_engine.go View File

@@ -1,16 +0,0 @@
package engine

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

type StateMachineEngine interface {
Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error)
}

type CallBack interface {
OnFinished(ctx context.Context, context process_ctrl.ProcessContext, stateMachineInstance statelang.StateMachineInstance)
OnError(ctx context.Context, context process_ctrl.ProcessContext, stateMachineInstance statelang.StateMachineInstance, err error)
}

+ 0
- 7
pkg/saga/statemachine/engine/statemachine_engine_test.go View File

@@ -1,7 +0,0 @@
package engine

import "testing"

func TestEngine(t *testing.T) {

}

+ 0
- 4
pkg/saga/statemachine/engine/status_decision/status_decision.go View File

@@ -1,4 +0,0 @@
package status_decision

type StatusDecisionStrategy interface {
}

+ 174
- 0
pkg/saga/statemachine/process_ctrl/handlers/service_task_state_handler.go View File

@@ -0,0 +1,174 @@
package handlers

import (
"context"
"errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/core"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/exception"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
seataErrors "github.com/seata/seata-go/pkg/util/errors"
"github.com/seata/seata-go/pkg/util/log"
)

type ServiceTaskStateHandler struct {
interceptors []core.StateHandlerInterceptor
}

func NewServiceTaskStateHandler() *ServiceTaskStateHandler {
return &ServiceTaskStateHandler{}
}

func (s *ServiceTaskStateHandler) State() string {
return constant.StateTypeServiceTask
}

func (s *ServiceTaskStateHandler) Process(ctx context.Context, processContext core.ProcessContext) error {
stateInstruction, ok := processContext.GetInstruction().(core.StateInstruction)
if !ok {
return errors.New("invalid state instruction from processContext")
}
stateInterface, err := stateInstruction.GetState(processContext)
if err != nil {
return err
}
serviceTaskStateImpl, ok := stateInterface.(*state.ServiceTaskStateImpl)

serviceName := serviceTaskStateImpl.ServiceName()
methodName := serviceTaskStateImpl.ServiceMethod()
stateInstance, ok := processContext.GetVariable(constant.VarNameStateInst).(statelang.StateInstance)
if !ok {
return errors.New("invalid state instance type from processContext")
}

// invoke service task and record
var result any
var resultErr error
handleResultErr := func(err error) {
log.Error("<<<<<<<<<<<<<<<<<<<<<< State[%s], ServiceName[%s], Method[%s] Execute failed.",
serviceTaskStateImpl.Name(), serviceName, methodName, err)

hierarchicalProcessContext, ok := processContext.(core.HierarchicalProcessContext)
if !ok {
return
}
hierarchicalProcessContext.SetVariable(constant.VarNameCurrentException, err)
core.HandleException(processContext, serviceTaskStateImpl.AbstractTaskState, err)
}

input, ok := processContext.GetVariable(constant.VarNameInputParams).([]any)
if !ok {
handleResultErr(errors.New("invalid input params type from processContext"))
return nil
}

stateInstance.SetStatus(statelang.RU)
log.Debugf(">>>>>>>>>>>>>>>>>>>>>> Start to execute State[%s], ServiceName[%s], Method[%s], Input:%s",
serviceTaskStateImpl.Name(), serviceName, methodName, input)

if _, ok := stateInterface.(state.CompensateSubStateMachineState); ok {
// If it is the compensation of the subState machine,
// directly call the state machine's compensate method
stateMachineEngine, ok := processContext.GetVariable(constant.VarNameStateMachineEngine).(core.StateMachineEngine)
if !ok {
handleResultErr(errors.New("invalid stateMachineEngine type from processContext"))
return nil
}

result, resultErr = s.compensateSubStateMachine(ctx, processContext, serviceTaskStateImpl, input,
stateInstance, stateMachineEngine)
if resultErr != nil {
handleResultErr(resultErr)
return nil
}
} else {
stateMachineConfig, ok := processContext.GetVariable(constant.VarNameStateMachineConfig).(core.StateMachineConfig)
if !ok {
handleResultErr(errors.New("invalid stateMachineConfig type from processContext"))
return nil
}

serviceInvoker := stateMachineConfig.ServiceInvokerManager().ServiceInvoker(serviceTaskStateImpl.ServiceType())
if serviceInvoker == nil {
resultErr = exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
"No such ServiceInvoker["+serviceTaskStateImpl.ServiceType()+"]", nil)
handleResultErr(resultErr)
return nil
}

result, resultErr = serviceInvoker.Invoke(ctx, input, serviceTaskStateImpl)
if resultErr != nil {
handleResultErr(resultErr)
return nil
}
}

log.Debugf("<<<<<<<<<<<<<<<<<<<<<< State[%s], ServiceName[%s], Method[%s] Execute finish. result: %s",
serviceTaskStateImpl.Name(), serviceName, methodName, result)

if result != nil {
stateInstance.SetOutputParams(result)
hierarchicalProcessContext, ok := processContext.(core.HierarchicalProcessContext)
if !ok {
handleResultErr(errors.New("invalid hierarchical process context type from processContext"))
return nil
}

hierarchicalProcessContext.SetVariable(constant.VarNameOutputParams, result)
}

return nil
}

func (s *ServiceTaskStateHandler) StateHandlerInterceptorList() []core.StateHandlerInterceptor {
return s.interceptors
}

func (s *ServiceTaskStateHandler) RegistryStateHandlerInterceptor(stateHandlerInterceptor core.StateHandlerInterceptor) {
s.interceptors = append(s.interceptors, stateHandlerInterceptor)
}

func (s *ServiceTaskStateHandler) compensateSubStateMachine(ctx context.Context, processContext core.ProcessContext,
serviceTaskState state.ServiceTaskState, input any, instance statelang.StateInstance,
machineEngine core.StateMachineEngine) (any, error) {
subStateMachineParentId, ok := processContext.GetVariable(serviceTaskState.Name() + constant.VarNameSubMachineParentId).(string)
if !ok {
return nil, errors.New("invalid subStateMachineParentId type from processContext")
}

if subStateMachineParentId == "" {
return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
"sub statemachine parentId is required", nil)
}

stateMachineConfig := processContext.GetVariable(constant.VarNameStateMachineConfig).(core.StateMachineConfig)
subInst, err := stateMachineConfig.StateLogStore().GetStateMachineInstanceByParentId(subStateMachineParentId)
if err != nil {
return nil, err
}

if subInst == nil || len(subInst) == 0 {
return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
"cannot find sub statemachine instance by parentId:"+subStateMachineParentId, nil)
}

subStateMachineInstId := subInst[0].ID()
log.Debugf(">>>>>>>>>>>>>>>>>>>>>> Start to compensate sub statemachine [id:%s]", subStateMachineInstId)

startParams := make(map[string]any)

if inputList, ok := input.([]any); ok {
if len(inputList) > 0 {
startParams = inputList[0].(map[string]any)
}
} else if inputMap, ok := input.(map[string]any); ok {
startParams = inputMap
}

compensateInst, err := machineEngine.Compensate(ctx, subStateMachineInstId, startParams)
instance.SetStatus(compensateInst.CompensationStatus())
log.Debugf("<<<<<<<<<<<<<<<<<<<<<< Compensate sub statemachine [id:%s] finished with status[%s], "+"compensateState[%s]",
subStateMachineInstId, compensateInst.Status(), compensateInst.CompensationStatus())
return compensateInst.EndParams(), nil
}

pkg/saga/statemachine/engine/process_ctrl/process_type.go → pkg/saga/statemachine/process_ctrl/process/process_type.go View File

@@ -1,4 +1,4 @@
package process_ctrl
package process

type ProcessType string


+ 1
- 1
pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go View File

@@ -64,7 +64,7 @@ func NewCompensateSubStateMachineStateParser() *CompensateSubStateMachineStatePa
}

func (c CompensateSubStateMachineStateParser) StateType() string {
return constant.CompensateSubMachine
return constant.StateTypeCompensateSubMachine
}

func (c CompensateSubStateMachineStateParser) Parse(stateName string, stateMap map[string]interface{}) (statelang.State, error) {


+ 22
- 0
pkg/saga/statemachine/statelang/state/loop_start_state.go View File

@@ -0,0 +1,22 @@
package state

import (
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

type LoopStartState interface {
statelang.State
}

type LoopStartStateImpl struct {
*statelang.BaseState
}

func NewLoopStartStateImpl() *LoopStartStateImpl {
baseState := statelang.NewBaseState()
baseState.SetType(constant.StateTypeLoopStart)
return &LoopStartStateImpl{
BaseState: baseState,
}
}

+ 1
- 1
pkg/saga/statemachine/statelang/state/sub_state_machine.go View File

@@ -56,7 +56,7 @@ func NewCompensateSubStateMachineStateImpl() *CompensateSubStateMachineStateImpl
ServiceTaskStateImpl: NewServiceTaskStateImpl(),
hashcode: uuid.String(),
}
c.SetType(constant.CompensateSubMachine)
c.SetType(constant.StateTypeCompensateSubMachine)
return c
}



+ 19
- 1
pkg/saga/statemachine/statelang/state/task_state.go View File

@@ -38,7 +38,7 @@ type Loop interface {

type ExceptionMatch interface {
Exceptions() []string
// TODO: go dose not support get reflect.Type by string, not use it now
ExceptionTypes() []reflect.Type

SetExceptionTypes(ExceptionTypes []reflect.Type)
@@ -79,7 +79,9 @@ type AbstractTaskState struct {
loop Loop
catches []ExceptionMatch
input []interface{}
inputExpressions []interface{}
output map[string]interface{}
outputExpressions map[string]interface{}
compensatePersistModeUpdate bool
retryPersistModeUpdate bool
forCompensation bool
@@ -96,6 +98,22 @@ func NewAbstractTaskState() *AbstractTaskState {
}
}

func (a *AbstractTaskState) InputExpressions() []interface{} {
return a.inputExpressions
}

func (a *AbstractTaskState) SetInputExpressions(inputExpressions []interface{}) {
a.inputExpressions = inputExpressions
}

func (a *AbstractTaskState) OutputExpressions() map[string]interface{} {
return a.outputExpressions
}

func (a *AbstractTaskState) SetOutputExpressions(outputExpressions map[string]interface{}) {
a.outputExpressions = outputExpressions
}

func (a *AbstractTaskState) Input() []interface{} {
return a.input
}


+ 13
- 7
pkg/saga/statemachine/statelang/statemachine_instance.go View File

@@ -71,9 +71,9 @@ type StateMachineInstance interface {

SetBusinessKey(businessKey string)

Error() error
Exception() error

SetError(err error)
SetException(err error)

StartParams() map[string]interface{}

@@ -83,6 +83,8 @@ type StateMachineInstance interface {

SetEndParams(endParams map[string]interface{})

Context() map[string]interface{}

PutContext(key string, value interface{})

SetContext(context map[string]interface{})
@@ -115,7 +117,7 @@ type StateMachineInstanceImpl struct {
startedTime time.Time
endTime time.Time
updatedTime time.Time
err error
exception error
serializedError interface{}
endParams map[string]interface{}
serializedEndParams interface{}
@@ -247,12 +249,12 @@ func (s *StateMachineInstanceImpl) SetBusinessKey(businessKey string) {
s.businessKey = businessKey
}

func (s *StateMachineInstanceImpl) Error() error {
return s.err
func (s *StateMachineInstanceImpl) Exception() error {
return s.exception
}

func (s *StateMachineInstanceImpl) SetError(err error) {
s.err = err
func (s *StateMachineInstanceImpl) SetException(err error) {
s.exception = err
}

func (s *StateMachineInstanceImpl) StartParams() map[string]interface{} {
@@ -271,6 +273,10 @@ func (s *StateMachineInstanceImpl) SetEndParams(endParams map[string]interface{}
s.endParams = endParams
}

func (s *StateMachineInstanceImpl) Context() map[string]interface{} {
return s.context
}

func (s *StateMachineInstanceImpl) PutContext(key string, value interface{}) {
s.contextMutex.Lock()
defer s.contextMutex.Unlock()


pkg/saga/statemachine/engine/store/db/db.go → pkg/saga/statemachine/store/db/db.go View File


pkg/saga/statemachine/engine/store/db/db_test.go → pkg/saga/statemachine/store/db/db_test.go View File


pkg/saga/statemachine/engine/store/db/statelang.go → pkg/saga/statemachine/store/db/statelang.go View File


pkg/saga/statemachine/engine/store/db/statelang_test.go → pkg/saga/statemachine/store/db/statelang_test.go View File


pkg/saga/statemachine/engine/store/db/statelog.go → pkg/saga/statemachine/store/db/statelog.go View File

@@ -6,7 +6,7 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/core"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/serializer"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
@@ -89,7 +89,7 @@ func NewStateLogStore(db *sql.DB, tablePrefix string) *StateLogStore {
}

func (s *StateLogStore) RecordStateMachineStarted(ctx context.Context, machineInstance statelang.StateMachineInstance,
context process_ctrl.ProcessContext) error {
context core.ProcessContext) error {
if machineInstance == nil {
return nil
}
@@ -133,15 +133,15 @@ func (s *StateLogStore) RecordStateMachineStarted(ctx context.Context, machineIn
}

func (s *StateLogStore) RecordStateMachineFinished(ctx context.Context, machineInstance statelang.StateMachineInstance,
context process_ctrl.ProcessContext) error {
context core.ProcessContext) error {
if machineInstance == nil {
return nil
}

endParams := machineInstance.EndParams()

if statelang.SU == machineInstance.Status() && machineInstance.Error() != nil {
machineInstance.SetError(nil)
if statelang.SU == machineInstance.Status() && machineInstance.Exception() != nil {
machineInstance.SetException(nil)
}

serializedEndParams, err := s.paramsSerializer.Serialize(endParams)
@@ -149,7 +149,7 @@ func (s *StateLogStore) RecordStateMachineFinished(ctx context.Context, machineI
return err
}
machineInstance.SetSerializedEndParams(serializedEndParams)
serializedError, err := s.errorSerializer.Serialize(machineInstance.Error())
serializedError, err := s.errorSerializer.Serialize(machineInstance.Exception())
if err != nil {
return err
}
@@ -171,7 +171,7 @@ func (s *StateLogStore) RecordStateMachineFinished(ctx context.Context, machineI
}

func (s *StateLogStore) RecordStateMachineRestarted(ctx context.Context, machineInstance statelang.StateMachineInstance,
context process_ctrl.ProcessContext) error {
context core.ProcessContext) error {
if machineInstance == nil {
return nil
}
@@ -190,7 +190,7 @@ func (s *StateLogStore) RecordStateMachineRestarted(ctx context.Context, machine
}

func (s *StateLogStore) RecordStateStarted(ctx context.Context, stateInstance statelang.StateInstance,
context process_ctrl.ProcessContext) error {
context core.ProcessContext) error {
if stateInstance == nil {
return nil
}
@@ -235,7 +235,7 @@ func (s *StateLogStore) RecordStateStarted(ctx context.Context, stateInstance st
return nil
}

func (s *StateLogStore) isUpdateMode(instance statelang.StateInstance, context process_ctrl.ProcessContext) bool {
func (s *StateLogStore) isUpdateMode(instance statelang.StateInstance, context core.ProcessContext) bool {
//TODO implement me, add forward logic
return false
}
@@ -293,7 +293,7 @@ func (s *StateLogStore) getIdIndex(stateInstanceId string, separator string) int
}

func (s *StateLogStore) RecordStateFinished(ctx context.Context, stateInstance statelang.StateInstance,
context process_ctrl.ProcessContext) error {
context core.ProcessContext) error {
if stateInstance == nil {
return nil
}
@@ -372,7 +372,7 @@ func (s *StateLogStore) deserializeStateMachineParamsAndException(stateMachineIn
if err != nil {
return err
}
stateMachineInstance.SetError(deserializedError)
stateMachineInstance.SetException(deserializedError)
}

serializedStartParams := stateMachineInstance.SerializedStartParams()

pkg/saga/statemachine/engine/store/db/statelog_test.go → pkg/saga/statemachine/store/db/statelog_test.go View File

@@ -5,19 +5,19 @@ import (
"fmt"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/constant"
"github.com/seata/seata-go/pkg/saga/statemachine/engine"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/core"
"github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func mockProcessContext(stateMachineName string, stateMachineInstance statelang.StateMachineInstance) process_ctrl.ProcessContext {
ctx := engine.NewProcessContextBuilder().
WithProcessType(process_ctrl.StateLang).
func mockProcessContext(stateMachineName string, stateMachineInstance statelang.StateMachineInstance) core.ProcessContext {
ctx := core.NewProcessContextBuilder().
WithProcessType(process.StateLang).
WithOperationName(constant.OperationNameStart).
WithInstruction(process_ctrl.NewStateInstruction(stateMachineName, "000001")).
WithInstruction(core.NewStateInstruction(stateMachineName, "000001")).
WithStateMachineInstance(stateMachineInstance).
Build()
return ctx
@@ -55,7 +55,7 @@ func TestStateLogStore_RecordStateMachineStarted(t *testing.T) {
assert.Equal(t, expected.ID(), actual.ID())
assert.Equal(t, expected.MachineID(), actual.MachineID())
assert.Equal(t, fmt.Sprint(expected.StartParams()), fmt.Sprint(actual.StartParams()))
assert.Nil(t, actual.Error())
assert.Nil(t, actual.Exception())
assert.Nil(t, actual.SerializedError())
assert.Equal(t, expected.Status(), actual.Status())
assert.Equal(t, expected.StartedTime().UnixNano(), actual.StartedTime().UnixNano())
@@ -73,7 +73,7 @@ func TestStateLogStore_RecordStateMachineFinished(t *testing.T) {
err := stateLogStore.RecordStateMachineStarted(context.Background(), expected, ctx)
assert.Nil(t, err)
expected.SetEndParams(map[string]any{"end": 100})
expected.SetError(errors.New("this is a test error"))
expected.SetException(errors.New("this is a test error"))
expected.SetStatus(statelang.FA)
expected.SetEndTime(time.Now())
expected.SetRunning(false)
@@ -86,7 +86,7 @@ func TestStateLogStore_RecordStateMachineFinished(t *testing.T) {
assert.Equal(t, expected.ID(), actual.ID())
assert.Equal(t, expected.MachineID(), actual.MachineID())
assert.Equal(t, fmt.Sprint(expected.StartParams()), fmt.Sprint(actual.StartParams()))
assert.Equal(t, "this is a test error", actual.Error().Error())
assert.Equal(t, "this is a test error", actual.Exception().Error())
assert.Equal(t, expected.Status(), actual.Status())
assert.Equal(t, expected.IsRunning(), actual.IsRunning())
assert.Equal(t, expected.StartedTime().UnixNano(), actual.StartedTime().UnixNano())
@@ -240,7 +240,7 @@ func TestStateLogStore_GetStateMachineInstanceByBusinessKey(t *testing.T) {
assert.Equal(t, expected.ID(), actual.ID())
assert.Equal(t, expected.MachineID(), actual.MachineID())
assert.Equal(t, fmt.Sprint(expected.StartParams()), fmt.Sprint(actual.StartParams()))
assert.Nil(t, actual.Error())
assert.Nil(t, actual.Exception())
assert.Nil(t, actual.SerializedError())
assert.Equal(t, expected.Status(), actual.Status())
assert.Equal(t, expected.StartedTime().UnixNano(), actual.StartedTime().UnixNano())
@@ -269,9 +269,9 @@ func TestStateLogStore_GetStateMachineInstanceByParentId(t *testing.T) {
actual := actualList[0]
assert.Equal(t, expected.ID(), actual.ID())
assert.Equal(t, expected.MachineID(), actual.MachineID())
// no startParams, endParams and Error
// no startParams, endParams and Exception
assert.NotEqual(t, fmt.Sprint(expected.StartParams()), fmt.Sprint(actual.StartParams()))
assert.Nil(t, actual.Error())
assert.Nil(t, actual.Exception())
assert.Nil(t, actual.SerializedError())
assert.Equal(t, expected.Status(), actual.Status())
assert.Equal(t, expected.StartedTime().UnixNano(), actual.StartedTime().UnixNano())

+ 34
- 0
pkg/saga/statemachine/store/repository/state_machine_repository.go View File

@@ -0,0 +1,34 @@
package repository

import (
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"io"
)

type StateMachineRepositoryImpl struct {
}

func (s StateMachineRepositoryImpl) GetStateMachineById(stateMachineId string) (statelang.StateMachine, error) {
//TODO implement me
panic("implement me")
}

func (s StateMachineRepositoryImpl) GetStateMachineByNameAndTenantId(stateMachineName string, tenantId string) (statelang.StateMachine, error) {
//TODO implement me
panic("implement me")
}

func (s StateMachineRepositoryImpl) GetLastVersionStateMachine(stateMachineName string, tenantId string) (statelang.StateMachine, error) {
//TODO implement me
panic("implement me")
}

func (s StateMachineRepositoryImpl) RegistryStateMachine(machine statelang.StateMachine) error {
//TODO implement me
panic("implement me")
}

func (s StateMachineRepositoryImpl) RegistryStateMachineByReader(reader io.Reader) error {
//TODO implement me
panic("implement me")
}

+ 1
- 1
pkg/tm/transaction_executor_test.go View File

@@ -361,7 +361,7 @@ func TestBeginNewGtx(t *testing.T) {
assert.Equal(t, message.GlobalStatusBegin, *GetTxStatus(ctx))

// case return error
err := errors.New("Mock Error")
err := errors.New("Mock Exception")
gomonkey.ApplyMethod(reflect.TypeOf(GetGlobalTransactionManager()), "Begin",
func(_ *GlobalTransactionManager, ctx context.Context, timeout time.Duration) error {
return err


+ 43
- 1
pkg/util/collection/collection.go View File

@@ -17,7 +17,10 @@

package collection

import "strings"
import (
"container/list"
"strings"
)

const (
KvSplit = "="
@@ -81,3 +84,42 @@ func DecodeMap(data []byte) map[string]string {

return ctxMap
}

type Stack struct {
list *list.List
}

func NewStack() *Stack {
list := list.New()
return &Stack{list}
}

func (stack *Stack) Push(value interface{}) {
stack.list.PushBack(value)
}

func (stack *Stack) Pop() interface{} {
e := stack.list.Back()
if e != nil {
stack.list.Remove(e)
return e.Value
}
return nil
}

func (stack *Stack) Peak() interface{} {
e := stack.list.Back()
if e != nil {
return e.Value
}

return nil
}

func (stack *Stack) Len() int {
return stack.list.Len()
}

func (stack *Stack) Empty() bool {
return stack.list.Len() == 0
}

+ 23
- 0
pkg/util/collection/collection_test.go View File

@@ -57,3 +57,26 @@ func TestEncodeDecodeMap(t *testing.T) {
})
}
}

func TestStack(t *testing.T) {
stack := NewStack()
stack.Push(1)
stack.Push(2)
stack.Push(3)
stack.Push(4)

len := stack.Len()
if len != 4 {
t.Errorf("stack.Len() failed. Got %d, expected 4.", len)
}

value := stack.Peak().(int)
if value != 4 {
t.Errorf("stack.Peak() failed. Got %d, expected 4.", value)
}

value = stack.Pop().(int)
if value != 4 {
t.Errorf("stack.Pop() failed. Got %d, expected 4.", value)
}
}

+ 11
- 0
pkg/util/errors/code.go View File

@@ -100,4 +100,15 @@ const (

// FencePhaseError have fence phase but is not illegal value
FencePhaseError

// ObjectNotExists object not exists
ObjectNotExists
// StateMachineInstanceNotExists State machine instance not exists
StateMachineInstanceNotExists
// ContextVariableReplayFailed Context variable replay failed
ContextVariableReplayFailed
// InvalidParameter Context variable replay failed
InvalidParameter
// OperationDenied Operation denied
OperationDenied
)

Loading…
Cancel
Save