Browse Source

Feature/saga interface optimization (#778)

feature: saga Interface optimization
pull/812/head
Jingliu GitHub 4 months ago
parent
commit
d3a4cb1689
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
5 changed files with 315 additions and 2 deletions
  1. +2
    -0
      pkg/saga/statemachine/constant/constant.go
  2. +277
    -1
      pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go
  3. +26
    -1
      pkg/saga/statemachine/engine/core/statemachine_engine.go
  4. +8
    -0
      pkg/saga/statemachine/engine/core/utils.go
  5. +2
    -0
      pkg/util/errors/code.go

+ 2
- 0
pkg/saga/statemachine/constant/constant.go View File

@@ -64,6 +64,8 @@ const (
VarNameCurrentCompensationHolder string = "_current_compensation_holder_"
VarNameFirstCompensationStateStarted string = "_first_compensation_state_started"
VarNameCurrentLoopContextHolder string = "_current_loop_context_holder_"
VarNameRetriedStateInstId string = "_retried_state_instance_id"
VarNameIsForSubStatMachineForward string = "_is_for_sub_statemachine_forward_"
// TODO: this lock in process context only has one, try to add more to add concurrent
VarNameProcessContextMutexLock string = "_current_context_mutex_lock"
VarNameFailEndStateFlag string = "_fail_end_state_flag_"


+ 277
- 1
pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go View File

@@ -41,15 +41,99 @@ func NewProcessCtrlStateMachineEngine() *ProcessCtrlStateMachineEngine {
}
}

func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) {
func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, stateMachineName string, tenantId string,
startParams map[string]interface{}) (statelang.StateMachineInstance, error) {
return p.startInternal(ctx, stateMachineName, tenantId, "", startParams, false, nil)
}

func (p ProcessCtrlStateMachineEngine) StartAsync(ctx context.Context, stateMachineName string, tenantId string,
startParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error) {
return p.startInternal(ctx, stateMachineName, tenantId, "", startParams, true, callback)
}

func (p ProcessCtrlStateMachineEngine) StartWithBusinessKey(ctx context.Context, stateMachineName string,
tenantId string, businessKey string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) {
return p.startInternal(ctx, stateMachineName, tenantId, businessKey, startParams, false, nil)
}

func (p ProcessCtrlStateMachineEngine) StartWithBusinessKeyAsync(ctx context.Context, stateMachineName string,
tenantId string, businessKey string, startParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error) {
return p.startInternal(ctx, stateMachineName, tenantId, businessKey, startParams, true, callback)
}

func (p ProcessCtrlStateMachineEngine) Forward(ctx context.Context, stateMachineInstId string,
replaceParams map[string]interface{}) (statelang.StateMachineInstance, error) {
return p.forwardInternal(ctx, stateMachineInstId, replaceParams, false, false, nil)
}

func (p ProcessCtrlStateMachineEngine) ForwardAsync(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error) {
return p.forwardInternal(ctx, stateMachineInstId, replaceParams, false, true, callback)
}

func (p ProcessCtrlStateMachineEngine) Compensate(ctx context.Context, stateMachineInstId string,
replaceParams map[string]any) (statelang.StateMachineInstance, error) {
return p.compensateInternal(ctx, stateMachineInstId, replaceParams, false, nil)
}

func (p ProcessCtrlStateMachineEngine) CompensateAsync(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error) {
return p.compensateInternal(ctx, stateMachineInstId, replaceParams, true, callback)
}

func (p ProcessCtrlStateMachineEngine) SkipAndForward(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}) (statelang.StateMachineInstance, error) {
return p.forwardInternal(ctx, stateMachineInstId, replaceParams, true, false, nil)
}

func (p ProcessCtrlStateMachineEngine) SkipAndForwardAsync(ctx context.Context, stateMachineInstId string, callback CallBack) (statelang.StateMachineInstance, error) {
return p.forwardInternal(ctx, stateMachineInstId, nil, true, true, callback)
}

func (p ProcessCtrlStateMachineEngine) GetStateMachineConfig() StateMachineConfig {
return p.StateMachineConfig
}

func (p ProcessCtrlStateMachineEngine) ReloadStateMachineInstance(ctx context.Context, instId string) (statelang.StateMachineInstance, error) {
inst, err := p.StateMachineConfig.StateLogStore().GetStateMachineInstance(instId)
if err != nil {
return nil, err
}
if inst != nil {
stateMachine := inst.StateMachine()
if stateMachine == nil {
stateMachine, err = p.StateMachineConfig.StateMachineRepository().GetStateMachineById(inst.MachineID())
if err != nil {
return nil, err
}
inst.SetStateMachine(stateMachine)
}
if stateMachine == nil {
return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists,
"StateMachine[id:"+inst.MachineID()+"] not exist.", nil)
}

stateList := inst.StateList()
if len(stateList) == 0 {
stateList, err = p.StateMachineConfig.StateLogStore().GetStateInstanceListByMachineInstanceId(instId)
if err != nil {
return nil, err
}
if len(stateList) > 0 {
for _, tmpStateInstance := range stateList {
inst.PutState(tmpStateInstance.ID(), tmpStateInstance)
}
}
}

if len(inst.EndParams()) == 0 {
endParams, err := p.replayContextVariables(ctx, inst)
if err != nil {
return nil, err
}
inst.SetEndParams(endParams)
}
}
return inst, nil
}

func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, stateMachineName string, tenantId string,
businessKey string, startParams map[string]interface{}, async bool, callback CallBack) (statelang.StateMachineInstance, error) {
if tenantId == "" {
@@ -104,6 +188,184 @@ func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, stateM
return stateMachineInstance, nil
}

func (p ProcessCtrlStateMachineEngine) forwardInternal(ctx context.Context, stateMachineInstId string,
replaceParams map[string]interface{}, skip bool, async bool, callback CallBack) (statelang.StateMachineInstance, error) {
stateMachineInstance, err := p.reloadStateMachineInstance(ctx, stateMachineInstId)
if err != nil {
return nil, err
}

if stateMachineInstance == nil {
return nil, exception.NewEngineExecutionException(seataErrors.StateMachineInstanceNotExists, "StateMachineInstance is not exists", nil)
}

if stateMachineInstance.Status() == statelang.SU && stateMachineInstance.CompensationStatus() == "" {
return stateMachineInstance, nil
}

acceptStatus := []statelang.ExecutionStatus{statelang.FA, statelang.UN, statelang.RU}
if _, err := p.checkStatus(ctx, stateMachineInstance, acceptStatus, nil, stateMachineInstance.Status(), "", "forward"); err != nil {
return nil, err
}

actList := stateMachineInstance.StateList()
if len(actList) == 0 {
return nil, exception.NewEngineExecutionException(seataErrors.OperationDenied,
fmt.Sprintf("StateMachineInstance[id:%s] has no stateInstance, please start a new StateMachine execution instead", stateMachineInstId), nil)
}

lastForwardState, err := p.findOutLastForwardStateInstance(actList)
if err != nil {
return nil, err
}
if lastForwardState == nil {
return nil, exception.NewEngineExecutionException(seataErrors.OperationDenied,
fmt.Sprintf("StateMachineInstance[id:%s] Cannot find last forward execution stateInstance", stateMachineInstId), nil)
}

contextBuilder := NewProcessContextBuilder().
WithProcessType(process.StateLang).
WithOperationName(constant.OperationNameForward).
WithAsyncCallback(callback).
WithStateMachineInstance(stateMachineInstance).
WithStateInstance(lastForwardState).
WithStateMachineConfig(p.StateMachineConfig).
WithStateMachineEngine(p).
WithIsAsyncExecution(async)

context := contextBuilder.Build()

contextVariables, err := p.getStateMachineContextVariables(ctx, stateMachineInstance)
if err != nil {
return nil, err
}

if replaceParams != nil {
for k, v := range replaceParams {
contextVariables[k] = v
}
}
p.putBusinesskeyToContextariables(stateMachineInstance, contextVariables)

concurrentContextVariables := p.copyMap(contextVariables)

context.SetVariable(constant.VarNameStateMachineContext, concurrentContextVariables)
stateMachineInstance.SetContext(concurrentContextVariables)

originStateName := GetOriginStateName(lastForwardState)
lastState := stateMachineInstance.StateMachine().State(originStateName)
loop := GetLoopConfig(ctx, context, lastState)
if loop != nil && lastForwardState.Status() == statelang.SU {
lastForwardState = p.findOutLastNeedForwardStateInstance(ctx, context)
}

context.SetVariable(lastForwardState.Name()+constant.VarNameRetriedStateInstId, lastForwardState.ID())
if lastForwardState.Type() == constant.StateTypeSubStateMachine && lastForwardState.CompensationStatus() != statelang.SU {
context.SetVariable(constant.VarNameIsForSubStatMachineForward, true)
}

if lastForwardState.Status() != statelang.SU {
lastForwardState.SetIgnoreStatus(true)
}

inst := NewStateInstruction(stateMachineInstance.StateMachine().Name(), stateMachineInstance.TenantID())
if skip || lastForwardState.Status() == statelang.SU {
next := ""
curState := stateMachineInstance.StateMachine().State(GetOriginStateName(lastForwardState))
if taskState, ok := curState.(*state.AbstractTaskState); ok {
next = taskState.Next()
}
if next == "" {
log.Warn(fmt.Sprintf("Last Forward execution StateInstance was succeed, and it has not Next State, skip forward operation"))
return stateMachineInstance, nil
}
inst.SetStateName(next)
} else {
if lastForwardState.Status() == statelang.RU && !IsTimeout(lastForwardState.StartedTime(), p.StateMachineConfig.ServiceInvokeTimeout()) {
return nil, exception.NewEngineExecutionException(seataErrors.OperationDenied,
fmt.Sprintf("State [%s] is running, operation[forward] denied", lastForwardState.Name()), nil)
}
inst.SetStateName(GetOriginStateName(lastForwardState))
}
context.SetInstruction(inst)

stateMachineInstance.SetStatus(statelang.RU)
stateMachineInstance.SetRunning(true)

log.Info(fmt.Sprintf("Operation [forward] started stateMachineInstance[id:%s]", stateMachineInstance.ID()))

if stateMachineInstance.StateMachine().IsPersist() {
if err := p.StateMachineConfig.StateLogStore().RecordStateMachineRestarted(ctx, stateMachineInstance, context); err != nil {
return nil, err
}
}

curState, err := inst.GetState(context)
if err != nil {
return nil, err
}
loop = GetLoopConfig(ctx, context, curState)
if loop != nil {
inst.SetTemporaryState(state.NewLoopStartStateImpl())
}

if async {
if _, err := p.StateMachineConfig.AsyncEventPublisher().PushEvent(ctx, context); err != nil {
return nil, err
}
} else {
if _, err := p.StateMachineConfig.EventPublisher().PushEvent(ctx, context); err != nil {
return nil, err
}
}

return stateMachineInstance, nil
}

func (p ProcessCtrlStateMachineEngine) findOutLastForwardStateInstance(stateInstanceList []statelang.StateInstance) (statelang.StateInstance, error) {
var lastForwardStateInstance statelang.StateInstance
var err error
for i := len(stateInstanceList) - 1; i >= 0; i-- {
stateInstance := stateInstanceList[i]
if !stateInstance.IsForCompensation() {
if stateInstance.CompensationStatus() == statelang.SU {
continue
}

if stateInstance.Type() == constant.StateTypeSubStateMachine {
finalState := stateInstance
for finalState.StateIDRetriedFor() != "" {
if finalState, err = p.StateMachineConfig.StateLogStore().GetStateInstance(finalState.StateIDRetriedFor(),
finalState.MachineInstanceID()); err != nil {
return nil, err
}
}

subInst, _ := p.StateMachineConfig.StateLogStore().GetStateMachineInstanceByParentId(GenerateParentId(finalState))
if len(subInst) > 0 {
if subInst[0].CompensationStatus() == statelang.SU {
continue
}

if subInst[0].CompensationStatus() == statelang.UN {
return nil, exception.NewEngineExecutionException(seataErrors.ForwardInvalid,
"Last forward execution state instance is SubStateMachine and compensation status is [UN], Operation[forward] denied, stateInstanceId:"+stateInstance.ID(),
nil)
}
}
} else if stateInstance.CompensationStatus() == statelang.UN {
return nil, exception.NewEngineExecutionException(seataErrors.ForwardInvalid,
"Last forward execution state instance compensation status is [UN], Operation[forward] denied, stateInstanceId:"+stateInstance.ID(),
nil)
}

lastForwardStateInstance = stateInstance
break
}
}
return lastForwardStateInstance, nil
}

// copyMap not deep copy, so best practice: Don’t pass by reference
func (p ProcessCtrlStateMachineEngine) copyMap(startParams map[string]interface{}) map[string]interface{} {
copyMap := make(map[string]interface{}, len(startParams))
@@ -439,3 +701,17 @@ func (p ProcessCtrlStateMachineEngine) nullSafeCopy(srcMap map[string]any, destM
}
}
}

func (p ProcessCtrlStateMachineEngine) findOutLastNeedForwardStateInstance(ctx context.Context, processContext ProcessContext) statelang.StateInstance {
stateMachineInstance := processContext.GetVariable(constant.VarNameStateMachineInst).(statelang.StateMachineInstance)
lastForwardState := processContext.GetVariable(constant.VarNameStateInst).(statelang.StateInstance)

actList := stateMachineInstance.StateList()
for i := len(actList) - 1; i >= 0; i-- {
stateInstance := actList[i]
if GetOriginStateName(stateInstance) == GetOriginStateName(lastForwardState) && stateInstance.Status() != statelang.SU {
return stateInstance
}
}
return lastForwardState
}

+ 26
- 1
pkg/saga/statemachine/engine/core/statemachine_engine.go View File

@@ -23,8 +23,33 @@ import (
)

type StateMachineEngine interface {
// Start starts a state machine instance
Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error)
Compensate(ctx context.Context, stateMachineInstId string, replaceParams map[string]any) (statelang.StateMachineInstance, error)
// StartAsync start a state machine instance asynchronously
StartAsync(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{},
callback CallBack) (statelang.StateMachineInstance, error)
// StartWithBusinessKey starts a state machine instance with a business key
StartWithBusinessKey(ctx context.Context, stateMachineName string, tenantId string, businessKey string,
startParams map[string]interface{}) (statelang.StateMachineInstance, error)
// StartWithBusinessKeyAsync starts a state machine instance with a business key asynchronously
StartWithBusinessKeyAsync(ctx context.Context, stateMachineName string, tenantId string, businessKey string,
startParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error)
// Forward restart a failed state machine instance
Forward(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}) (statelang.StateMachineInstance, error)
// ForwardAsync restart a failed state machine instance asynchronously
ForwardAsync(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error)
// Compensate compensate a state machine instance
Compensate(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}) (statelang.StateMachineInstance, error)
// CompensateAsync compensate a state machine instance asynchronously
CompensateAsync(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}, callback CallBack) (statelang.StateMachineInstance, error)
// SkipAndForward skips the current failed state instance and restarts the state machine instance
SkipAndForward(ctx context.Context, stateMachineInstId string, replaceParams map[string]interface{}) (statelang.StateMachineInstance, error)
// SkipAndForwardAsync skips the current failed state instance and restarts the state machine instance asynchronously
SkipAndForwardAsync(ctx context.Context, stateMachineInstId string, callback CallBack) (statelang.StateMachineInstance, error)
// GetStateMachineConfig gets the state machine configurations
GetStateMachineConfig() StateMachineConfig
// ReloadStateMachineInstance reloads a state machine instance
ReloadStateMachineInstance(ctx context.Context, instId string) (statelang.StateMachineInstance, error)
}

type CallBack interface {


+ 8
- 0
pkg/saga/statemachine/engine/core/utils.go View File

@@ -98,6 +98,14 @@ func (p *ProcessContextBuilder) WithIsAsyncExecution(async bool) *ProcessContext
return p
}

func (p *ProcessContextBuilder) WithStateInstance(state statelang.StateInstance) *ProcessContextBuilder {
if state != nil {
p.processContext.SetVariable(constant.VarNameStateInst, state)
}

return p
}

func (p *ProcessContextBuilder) Build() ProcessContext {
return p.processContext
}

+ 2
- 0
pkg/util/errors/code.go View File

@@ -111,4 +111,6 @@ const (
InvalidParameter
// OperationDenied Operation denied
OperationDenied
// ForwardInvalid Forward invalid
ForwardInvalid
)

Loading…
Cancel
Save