diff --git a/go.mod b/go.mod index d24d93c3..c927882f 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/agiledragon/gomonkey v2.0.2+incompatible github.com/agiledragon/gomonkey/v2 v2.9.0 github.com/mattn/go-sqlite3 v1.14.19 + golang.org/x/sync v0.6.0 google.golang.org/protobuf v1.30.0 ) diff --git a/go.sum b/go.sum index 9c0fbc0b..e269c87d 100644 --- a/go.sum +++ b/go.sum @@ -942,6 +942,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= +golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/pkg/saga/statemachine/constant/constant.go b/pkg/saga/statemachine/constant/constant.go index ef332c18..a089e7d5 100644 --- a/pkg/saga/statemachine/constant/constant.go +++ b/pkg/saga/statemachine/constant/constant.go @@ -1,29 +1,72 @@ package constant const ( - VarNameProcessType string = "_ProcessType_" - VarNameOperationName string = "_operation_name_" - OperationNameStart string = "start" - VarNameAsyncCallback string = "_async_callback_" - VarNameStateMachineInst string = "_current_statemachine_instance_" - VarNameStateMachine string = "_current_statemachine_" - VarNameStateMachineEngine string = "_current_statemachine_engine_" - VarNameStateMachineConfig string = "_statemachine_config_" - VarNameStateMachineContext string = "context" - VarNameIsAsyncExecution string = "_is_async_execution_" - VarNameStateInst string = "_current_state_instance_" - SeqEntityStateMachineInst string = "STATE_MACHINE_INST" - SeqEntityStateInst string = "STATE_INST" - VarNameBusinesskey string = "_business_key_" - VarNameParentId string = "_parent_id_" - StateTypeServiceTask string = "ServiceTask" - StateTypeChoice string = "Choice" - StateTypeSubStateMachine string = "SubStateMachine" - CompensateSubMachine string = "CompensateSubMachine" - StateTypeSucceed string = "Succeed" - StateTypeFail string = "Fail" - StateTypeCompensationTrigger string = "CompensationTrigger" - StateTypeScriptTask string = "ScriptTask" - CompensateSubMachineStateNamePrefix string = "_compensate_sub_machine_state_" - DefaultScriptType string = "groovy" + // region State Types + StateTypeServiceTask string = "ServiceTask" + StateTypeChoice string = "Choice" + StateTypeFail string = "Fail" + StateTypeSucceed string = "Succeed" + StateTypeCompensationTrigger string = "CompensationTrigger" + StateTypeSubStateMachine string = "SubStateMachine" + StateTypeCompensateSubMachine string = "CompensateSubMachine" + StateTypeScriptTask string = "ScriptTask" + StateTypeLoopStart string = "LoopStart" + // end region + + // region Service Types + ServiceTypeGRPC string = "GRPC" + // end region + + // region System Variables + VarNameOutputParams string = "outputParams" + VarNameProcessType string = "_ProcessType_" + VarNameOperationName string = "_operation_name_" + OperationNameStart string = "start" + OperationNameCompensate string = "compensate" + VarNameAsyncCallback string = "_async_callback_" + VarNameCurrentExceptionRoute string = "_current_exception_route_" + VarNameIsExceptionNotCatch string = "_is_exception_not_catch_" + VarNameSubMachineParentId string = "_sub_machine_parent_id_" + VarNameCurrentChoice string = "_current_choice_" + VarNameStateMachineInst string = "_current_statemachine_instance_" + VarNameStateMachine string = "_current_statemachine_" + VarNameStateMachineEngine string = "_current_statemachine_engine_" + VarNameStateMachineConfig string = "_statemachine_config_" + VarNameStateMachineContext string = "context" + VarNameIsAsyncExecution string = "_is_async_execution_" + VarNameStateInst string = "_current_state_instance_" + VarNameBusinesskey string = "_business_key_" + VarNameParentId string = "_parent_id_" + VarNameCurrentException string = "currentException" + CompensateSubMachineStateNamePrefix string = "_compensate_sub_machine_state_" + DefaultScriptType string = "groovy" + VarNameSyncExeStack string = "_sync_execution_stack_" + VarNameInputParams string = "inputParams" + VarNameIsLoopState string = "_is_loop_state_" + VarNameCurrentCompensateTriggerState string = "_is_compensating_" + VarNameCurrentCompensationHolder string = "_current_compensation_holder_" + VarNameFirstCompensationStateStarted string = "_first_compensation_state_started" + VarNameCurrentLoopContextHolder string = "_current_loop_context_holder_" + // TODO: this lock in process context only has one, try to add more to add concurrent + VarNameProcessContextMutexLock string = "_current_context_mutex_lock" + VarNameFailEndStateFlag string = "_fail_end_state_flag_" + // end region + + // region of loop + LoopCounter string = "loopCounter" + LoopSemaphore string = "loopSemaphore" + LoopResult string = "loopResult" + NumberOfInstances string = "nrOfInstances" + NumberOfActiveInstances string = "nrOfActiveInstances" + NumberOfCompletedInstances string = "nrOfCompletedInstances" + // end region + + // region others + SeqEntityStateMachineInst string = "STATE_MACHINE_INST" + SeqEntityStateInst string = "STATE_INST" + OperationNameForward string = "forward" + LoopStateNamePattern string = "-loop-" + // end region + + SeperatorParentId string = ":" ) diff --git a/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go b/pkg/saga/statemachine/engine/core/bussiness_processor.go similarity index 79% rename from pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go rename to pkg/saga/statemachine/engine/core/bussiness_processor.go index c12a7c79..6b0a2bb4 100644 --- a/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go +++ b/pkg/saga/statemachine/engine/core/bussiness_processor.go @@ -1,9 +1,10 @@ -package process_ctrl +package core import ( "context" "github.com/pkg/errors" "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process" "sync" ) @@ -19,14 +20,14 @@ type DefaultBusinessProcessor struct { mu sync.RWMutex } -func (d *DefaultBusinessProcessor) RegistryProcessHandler(processType ProcessType, processHandler ProcessHandler) { +func (d *DefaultBusinessProcessor) RegistryProcessHandler(processType process.ProcessType, processHandler ProcessHandler) { d.mu.Lock() defer d.mu.Unlock() d.processHandlers[string(processType)] = processHandler } -func (d *DefaultBusinessProcessor) RegistryRouterHandler(processType ProcessType, routerHandler RouterHandler) { +func (d *DefaultBusinessProcessor) RegistryRouterHandler(processType process.ProcessType, routerHandler RouterHandler) { d.mu.Lock() defer d.mu.Unlock() @@ -55,17 +56,17 @@ func (d *DefaultBusinessProcessor) Route(ctx context.Context, processContext Pro return routerHandler.Route(ctx, processContext) } -func (d *DefaultBusinessProcessor) getProcessHandler(processType ProcessType) (ProcessHandler, error) { +func (d *DefaultBusinessProcessor) getProcessHandler(processType process.ProcessType) (ProcessHandler, error) { d.mu.RLock() defer d.mu.RUnlock() processHandler, ok := d.processHandlers[string(processType)] if !ok { - return nil, errors.New("Cannot find process handler by type " + string(processType)) + return nil, errors.New("Cannot find Process handler by type " + string(processType)) } return processHandler, nil } -func (d *DefaultBusinessProcessor) getRouterHandler(processType ProcessType) (RouterHandler, error) { +func (d *DefaultBusinessProcessor) getRouterHandler(processType process.ProcessType) (RouterHandler, error) { d.mu.RLock() defer d.mu.RUnlock() routerHandler, ok := d.routerHandlers[string(processType)] @@ -75,18 +76,10 @@ func (d *DefaultBusinessProcessor) getRouterHandler(processType ProcessType) (Ro return routerHandler, nil } -func (d *DefaultBusinessProcessor) matchProcessType(processContext ProcessContext) ProcessType { +func (d *DefaultBusinessProcessor) matchProcessType(processContext ProcessContext) process.ProcessType { ok := processContext.HasVariable(constant.VarNameProcessType) if ok { - return processContext.GetVariable(constant.VarNameProcessType).(ProcessType) + return processContext.GetVariable(constant.VarNameProcessType).(process.ProcessType) } - return StateLang -} - -type ProcessHandler interface { - Process(ctx context.Context, processContext ProcessContext) error -} - -type RouterHandler interface { - Route(ctx context.Context, processContext ProcessContext) error + return process.StateLang } diff --git a/pkg/saga/statemachine/engine/core/compensation_holder.go b/pkg/saga/statemachine/engine/core/compensation_holder.go new file mode 100644 index 00000000..562cb020 --- /dev/null +++ b/pkg/saga/statemachine/engine/core/compensation_holder.go @@ -0,0 +1,63 @@ +package core + +import ( + "context" + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "github.com/seata/seata-go/pkg/util/collection" + "sync" +) + +type CompensationHolder struct { + statesNeedCompensation *sync.Map + statesForCompensation *sync.Map + stateStackNeedCompensation *collection.Stack +} + +func (c *CompensationHolder) StatesNeedCompensation() *sync.Map { + return c.statesNeedCompensation +} + +func (c *CompensationHolder) SetStatesNeedCompensation(statesNeedCompensation *sync.Map) { + c.statesNeedCompensation = statesNeedCompensation +} + +func (c *CompensationHolder) StatesForCompensation() *sync.Map { + return c.statesForCompensation +} + +func (c *CompensationHolder) SetStatesForCompensation(statesForCompensation *sync.Map) { + c.statesForCompensation = statesForCompensation +} + +func (c *CompensationHolder) StateStackNeedCompensation() *collection.Stack { + return c.stateStackNeedCompensation +} + +func (c *CompensationHolder) SetStateStackNeedCompensation(stateStackNeedCompensation *collection.Stack) { + c.stateStackNeedCompensation = stateStackNeedCompensation +} + +func (c *CompensationHolder) AddToBeCompensatedState(stateName string, toBeCompensatedState statelang.StateInstance) { + c.statesNeedCompensation.Store(stateName, toBeCompensatedState) +} + +func NewCompensationHolder() *CompensationHolder { + return &CompensationHolder{ + statesNeedCompensation: &sync.Map{}, + statesForCompensation: &sync.Map{}, + stateStackNeedCompensation: collection.NewStack(), + } +} + +func GetCurrentCompensationHolder(ctx context.Context, processContext ProcessContext, forceCreate bool) *CompensationHolder { + compensationholder := processContext.GetVariable(constant.VarNameCurrentCompensationHolder).(*CompensationHolder) + lock := processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex) + lock.Lock() + defer lock.Unlock() + if compensationholder == nil && forceCreate { + compensationholder = NewCompensationHolder() + processContext.SetVariable(constant.VarNameCurrentCompensationHolder, compensationholder) + } + return compensationholder +} diff --git a/pkg/saga/statemachine/engine/core/default_statemachine_config.go b/pkg/saga/statemachine/engine/core/default_statemachine_config.go new file mode 100644 index 00000000..c7692696 --- /dev/null +++ b/pkg/saga/statemachine/engine/core/default_statemachine_config.go @@ -0,0 +1,199 @@ +package core + +import ( + "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence" + "sync" +) + +const ( + DefaultTransOperTimeout = 60000 * 30 + DefaultServiceInvokeTimeout = 60000 * 5 +) + +type DefaultStateMachineConfig struct { + // Configuration + transOperationTimeout int + serviceInvokeTimeout int + charset string + defaultTenantId string + + // Components + + // Event publisher + syncProcessCtrlEventPublisher EventPublisher + asyncProcessCtrlEventPublisher EventPublisher + + // Store related components + stateLogRepository StateLogRepository + stateLogStore StateLogStore + stateLangStore StateLangStore + stateMachineRepository StateMachineRepository + + // Expression related components + expressionFactoryManager expr.ExpressionFactoryManager + expressionResolver expr.ExpressionResolver + + // Invoker related components + serviceInvokerManager invoker.ServiceInvokerManager + scriptInvokerManager invoker.ScriptInvokerManager + + // Other components + statusDecisionStrategy StatusDecisionStrategy + seqGenerator sequence.SeqGenerator + componentLock *sync.Mutex +} + +func (c *DefaultStateMachineConfig) ComponentLock() *sync.Mutex { + return c.componentLock +} + +func (c *DefaultStateMachineConfig) SetComponentLock(componentLock *sync.Mutex) { + c.componentLock = componentLock +} + +func (c *DefaultStateMachineConfig) SetTransOperationTimeout(transOperationTimeout int) { + c.transOperationTimeout = transOperationTimeout +} + +func (c *DefaultStateMachineConfig) SetServiceInvokeTimeout(serviceInvokeTimeout int) { + c.serviceInvokeTimeout = serviceInvokeTimeout +} + +func (c *DefaultStateMachineConfig) SetCharset(charset string) { + c.charset = charset +} + +func (c *DefaultStateMachineConfig) SetDefaultTenantId(defaultTenantId string) { + c.defaultTenantId = defaultTenantId +} + +func (c *DefaultStateMachineConfig) SetSyncProcessCtrlEventPublisher(syncProcessCtrlEventPublisher EventPublisher) { + c.syncProcessCtrlEventPublisher = syncProcessCtrlEventPublisher +} + +func (c *DefaultStateMachineConfig) SetAsyncProcessCtrlEventPublisher(asyncProcessCtrlEventPublisher EventPublisher) { + c.asyncProcessCtrlEventPublisher = asyncProcessCtrlEventPublisher +} + +func (c *DefaultStateMachineConfig) SetStateLogRepository(stateLogRepository StateLogRepository) { + c.stateLogRepository = stateLogRepository +} + +func (c *DefaultStateMachineConfig) SetStateLogStore(stateLogStore StateLogStore) { + c.stateLogStore = stateLogStore +} + +func (c *DefaultStateMachineConfig) SetStateLangStore(stateLangStore StateLangStore) { + c.stateLangStore = stateLangStore +} + +func (c *DefaultStateMachineConfig) SetStateMachineRepository(stateMachineRepository StateMachineRepository) { + c.stateMachineRepository = stateMachineRepository +} + +func (c *DefaultStateMachineConfig) SetExpressionFactoryManager(expressionFactoryManager expr.ExpressionFactoryManager) { + c.expressionFactoryManager = expressionFactoryManager +} + +func (c *DefaultStateMachineConfig) SetExpressionResolver(expressionResolver expr.ExpressionResolver) { + c.expressionResolver = expressionResolver +} + +func (c *DefaultStateMachineConfig) SetServiceInvokerManager(serviceInvokerManager invoker.ServiceInvokerManager) { + c.serviceInvokerManager = serviceInvokerManager +} + +func (c *DefaultStateMachineConfig) SetScriptInvokerManager(scriptInvokerManager invoker.ScriptInvokerManager) { + c.scriptInvokerManager = scriptInvokerManager +} + +func (c *DefaultStateMachineConfig) SetStatusDecisionStrategy(statusDecisionStrategy StatusDecisionStrategy) { + c.statusDecisionStrategy = statusDecisionStrategy +} + +func (c *DefaultStateMachineConfig) SetSeqGenerator(seqGenerator sequence.SeqGenerator) { + c.seqGenerator = seqGenerator +} + +func (c *DefaultStateMachineConfig) StateLogRepository() StateLogRepository { + return c.stateLogRepository +} + +func (c *DefaultStateMachineConfig) StateMachineRepository() StateMachineRepository { + return c.stateMachineRepository +} + +func (c *DefaultStateMachineConfig) StateLogStore() StateLogStore { + return c.stateLogStore +} + +func (c *DefaultStateMachineConfig) StateLangStore() StateLangStore { + return c.stateLangStore +} + +func (c *DefaultStateMachineConfig) ExpressionFactoryManager() expr.ExpressionFactoryManager { + return c.expressionFactoryManager +} + +func (c *DefaultStateMachineConfig) ExpressionResolver() expr.ExpressionResolver { + return c.expressionResolver +} + +func (c *DefaultStateMachineConfig) SeqGenerator() sequence.SeqGenerator { + return c.seqGenerator +} + +func (c *DefaultStateMachineConfig) StatusDecisionStrategy() StatusDecisionStrategy { + return c.statusDecisionStrategy +} + +func (c *DefaultStateMachineConfig) EventPublisher() EventPublisher { + return c.syncProcessCtrlEventPublisher +} + +func (c *DefaultStateMachineConfig) AsyncEventPublisher() EventPublisher { + return c.asyncProcessCtrlEventPublisher +} + +func (c *DefaultStateMachineConfig) ServiceInvokerManager() invoker.ServiceInvokerManager { + return c.serviceInvokerManager +} + +func (c *DefaultStateMachineConfig) ScriptInvokerManager() invoker.ScriptInvokerManager { + return c.scriptInvokerManager +} + +func (c *DefaultStateMachineConfig) CharSet() string { + return c.charset +} + +func (c *DefaultStateMachineConfig) SetCharSet(charset string) { + c.charset = charset +} + +func (c *DefaultStateMachineConfig) DefaultTenantId() string { + return c.defaultTenantId +} + +func (c *DefaultStateMachineConfig) TransOperationTimeout() int { + return c.transOperationTimeout +} + +func (c *DefaultStateMachineConfig) ServiceInvokeTimeout() int { + return c.serviceInvokeTimeout +} + +func NewDefaultStateMachineConfig() *DefaultStateMachineConfig { + c := &DefaultStateMachineConfig{ + transOperationTimeout: DefaultTransOperTimeout, + serviceInvokeTimeout: DefaultServiceInvokeTimeout, + charset: "UTF-8", + defaultTenantId: "000001", + componentLock: &sync.Mutex{}, + } + + // TODO: init config + return c +} diff --git a/pkg/saga/statemachine/engine/core/engine_utils.go b/pkg/saga/statemachine/engine/core/engine_utils.go new file mode 100644 index 00000000..e1dcd4de --- /dev/null +++ b/pkg/saga/statemachine/engine/core/engine_utils.go @@ -0,0 +1,149 @@ +package core + +import ( + "context" + "errors" + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state" + "github.com/seata/seata-go/pkg/util/log" + "golang.org/x/sync/semaphore" + "reflect" + "strings" + "sync" + "time" +) + +func EndStateMachine(ctx context.Context, processContext ProcessContext) error { + if processContext.HasVariable(constant.VarNameIsLoopState) { + if processContext.HasVariable(constant.LoopSemaphore) { + weighted, ok := processContext.GetVariable(constant.LoopSemaphore).(semaphore.Weighted) + if !ok { + return errors.New("semaphore type is not weighted") + } + weighted.Release(1) + } + } + + stateMachineInstance, ok := processContext.GetVariable(constant.VarNameStateMachineInst).(statelang.StateMachineInstance) + if !ok { + return errors.New("state machine instance type is not statelang.StateMachineInstance") + } + + stateMachineInstance.SetEndTime(time.Now()) + + exp, ok := processContext.GetVariable(constant.VarNameCurrentException).(error) + if !ok { + return errors.New("exception type is not error") + } + + if exp != nil { + stateMachineInstance.SetException(exp) + log.Debugf("Exception Occurred: %s", exp) + } + + stateMachineConfig, ok := processContext.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig) + + if err := stateMachineConfig.StatusDecisionStrategy().DecideOnEndState(ctx, processContext, stateMachineInstance, exp); err != nil { + return err + } + + contextParams, ok := processContext.GetVariable(constant.VarNameStateMachineContext).(map[string]interface{}) + if !ok { + return errors.New("state machine context type is not map[string]interface{}") + } + endParams := stateMachineInstance.EndParams() + for k, v := range contextParams { + endParams[k] = v + } + stateMachineInstance.SetEndParams(endParams) + + stateInstruction, ok := processContext.GetInstruction().(StateInstruction) + if !ok { + return errors.New("state instruction type is not process_ctrl.StateInstruction") + } + stateInstruction.SetEnd(true) + + stateMachineInstance.SetRunning(false) + stateMachineInstance.SetEndTime(time.Now()) + + if stateMachineInstance.StateMachine().IsPersist() && stateMachineConfig.StateLangStore() != nil { + err := stateMachineConfig.StateLogStore().RecordStateMachineFinished(ctx, stateMachineInstance, processContext) + if err != nil { + return err + } + } + + callBack, ok := processContext.GetVariable(constant.VarNameAsyncCallback).(CallBack) + if ok { + if exp != nil { + callBack.OnError(ctx, processContext, stateMachineInstance, exp) + } else { + callBack.OnFinished(ctx, processContext, stateMachineInstance) + } + } + + return nil +} + +func HandleException(processContext ProcessContext, abstractTaskState *state.AbstractTaskState, err error) { + catches := abstractTaskState.Catches() + if catches != nil && len(catches) != 0 { + for _, exceptionMatch := range catches { + exceptions := exceptionMatch.Exceptions() + exceptionTypes := exceptionMatch.ExceptionTypes() + if exceptions != nil && len(exceptions) != 0 { + if exceptionTypes == nil { + lock := processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex) + lock.Lock() + defer lock.Unlock() + error := errors.New("") + for i := 0; i < len(exceptions); i++ { + exceptionTypes = append(exceptionTypes, reflect.TypeOf(error)) + } + } + + exceptionMatch.SetExceptionTypes(exceptionTypes) + } + + for i, _ := range exceptionTypes { + if reflect.TypeOf(err) == exceptionTypes[i] { + // HACK: we can not get error type in config file during runtime, so we use exception str + if strings.Contains(err.Error(), exceptions[i]) { + hierarchicalProcessContext := processContext.(HierarchicalProcessContext) + hierarchicalProcessContext.SetVariable(constant.VarNameCurrentExceptionRoute, exceptionMatch.Next()) + return + } + } + } + } + } + + log.Error("Task execution failed and no catches configured") + hierarchicalProcessContext := processContext.(HierarchicalProcessContext) + hierarchicalProcessContext.SetVariable(constant.VarNameIsExceptionNotCatch, true) +} + +// GetOriginStateName get origin state name without suffix like fork +func GetOriginStateName(stateInstance statelang.StateInstance) string { + stateName := stateInstance.Name() + if stateName != "" { + end := strings.LastIndex(stateName, constant.LoopStateNamePattern) + if end > -1 { + return stateName[:end+1] + } + } + return stateName +} + +// IsTimeout test if is timeout +func IsTimeout(gmtUpdated time.Time, timeoutMillis int) bool { + if timeoutMillis < 0 { + return false + } + return time.Now().Unix()-gmtUpdated.Unix() > int64(timeoutMillis) +} + +func GenerateParentId(stateInstance statelang.StateInstance) string { + return stateInstance.MachineInstanceID() + constant.SeperatorParentId + stateInstance.ID() +} diff --git a/pkg/saga/statemachine/engine/events/event.go b/pkg/saga/statemachine/engine/core/event.go similarity index 63% rename from pkg/saga/statemachine/engine/events/event.go rename to pkg/saga/statemachine/engine/core/event.go index 8f142713..4f7c18b2 100644 --- a/pkg/saga/statemachine/engine/events/event.go +++ b/pkg/saga/statemachine/engine/core/event.go @@ -1,4 +1,4 @@ -package events +package core type Event interface { } diff --git a/pkg/saga/statemachine/engine/core/event_bus.go b/pkg/saga/statemachine/engine/core/event_bus.go new file mode 100644 index 00000000..47d617d4 --- /dev/null +++ b/pkg/saga/statemachine/engine/core/event_bus.go @@ -0,0 +1,113 @@ +package core + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/util/collection" + "github.com/seata/seata-go/pkg/util/log" +) + +type EventBus interface { + Offer(ctx context.Context, event Event) (bool, error) + + EventConsumerList(event Event) []EventConsumer + + RegisterEventConsumer(consumer EventConsumer) +} + +type BaseEventBus struct { + eventConsumerList []EventConsumer +} + +func (b *BaseEventBus) RegisterEventConsumer(consumer EventConsumer) { + if b.eventConsumerList == nil { + b.eventConsumerList = make([]EventConsumer, 0) + } + b.eventConsumerList = append(b.eventConsumerList, consumer) +} + +func (b *BaseEventBus) EventConsumerList(event Event) []EventConsumer { + var acceptedConsumerList = make([]EventConsumer, 0) + for i := range b.eventConsumerList { + eventConsumer := b.eventConsumerList[i] + if eventConsumer.Accept(event) { + acceptedConsumerList = append(acceptedConsumerList, eventConsumer) + } + } + return acceptedConsumerList +} + +type DirectEventBus struct { + BaseEventBus +} + +func (d DirectEventBus) Offer(ctx context.Context, event Event) (bool, error) { + eventConsumerList := d.EventConsumerList(event) + if len(eventConsumerList) == 0 { + log.Debugf("cannot find event handler by type: %T", event) + return false, nil + } + + isFirstEvent := true + processContext, ok := event.(ProcessContext) + if !ok { + log.Errorf("event %T is illegal, required process_ctrl.ProcessContext", event) + return false, nil + } + + stack := processContext.GetVariable(constant.VarNameSyncExeStack).(*collection.Stack) + if stack == nil { + stack = collection.NewStack() + processContext.SetVariable(constant.VarNameSyncExeStack, stack) + isFirstEvent = true + } + + stack.Push(processContext) + if isFirstEvent { + for stack.Len() > 0 { + currentContext := stack.Pop().(ProcessContext) + for _, eventConsumer := range eventConsumerList { + err := eventConsumer.Process(ctx, currentContext) + if err != nil { + log.Errorf("process event %T error: %s", event, err.Error()) + return false, err + } + } + } + } + + return true, nil +} + +type AsyncEventBus struct { + BaseEventBus +} + +func (a AsyncEventBus) Offer(ctx context.Context, event Event) (bool, error) { + eventConsumerList := a.EventConsumerList(event) + if len(eventConsumerList) == 0 { + errStr := fmt.Sprintf("cannot find event handler by type: %T", event) + log.Errorf(errStr) + return false, errors.New(errStr) + } + + processContext, ok := event.(ProcessContext) + if !ok { + errStr := fmt.Sprintf("event %T is illegal, required process_ctrl.ProcessContext", event) + log.Errorf(errStr) + return false, errors.New(errStr) + } + + for _, eventConsumer := range eventConsumerList { + go func() { + err := eventConsumer.Process(ctx, processContext) + if err != nil { + log.Errorf("process event %T error: %s", event, err.Error()) + } + }() + } + + return true, nil +} diff --git a/pkg/saga/statemachine/engine/events/event_consumer.go b/pkg/saga/statemachine/engine/core/event_consumer.go similarity index 53% rename from pkg/saga/statemachine/engine/events/event_consumer.go rename to pkg/saga/statemachine/engine/core/event_consumer.go index 426da15e..1df21bdb 100644 --- a/pkg/saga/statemachine/engine/events/event_consumer.go +++ b/pkg/saga/statemachine/engine/core/event_consumer.go @@ -1,8 +1,9 @@ -package events +package core import ( "context" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" + "fmt" + "github.com/pkg/errors" ) type EventConsumer interface { @@ -12,6 +13,7 @@ type EventConsumer interface { } type ProcessCtrlEventConsumer struct { + processController ProcessController } func (p ProcessCtrlEventConsumer) Accept(event Event) bool { @@ -19,11 +21,14 @@ func (p ProcessCtrlEventConsumer) Accept(event Event) bool { return false } - _, ok := event.(process_ctrl.ProcessContext) + _, ok := event.(ProcessContext) return ok } func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event) error { - //TODO implement me - panic("implement me") + processContext, ok := event.(ProcessContext) + if !ok { + return errors.New(fmt.Sprint("event %T is illegal, required process_ctrl.ProcessContext", event)) + } + return p.processController.Process(ctx, processContext) } diff --git a/pkg/saga/statemachine/engine/events/event_publisher.go b/pkg/saga/statemachine/engine/core/event_publisher.go similarity index 96% rename from pkg/saga/statemachine/engine/events/event_publisher.go rename to pkg/saga/statemachine/engine/core/event_publisher.go index 1fbfaec2..9ad86940 100644 --- a/pkg/saga/statemachine/engine/events/event_publisher.go +++ b/pkg/saga/statemachine/engine/core/event_publisher.go @@ -1,4 +1,4 @@ -package events +package core import "context" diff --git a/pkg/saga/statemachine/engine/core/instruction.go b/pkg/saga/statemachine/engine/core/instruction.go new file mode 100644 index 00000000..ed81fc9c --- /dev/null +++ b/pkg/saga/statemachine/engine/core/instruction.go @@ -0,0 +1,96 @@ +package core + +import ( + "errors" + "fmt" + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" +) + +type Instruction interface { +} + +type StateInstruction struct { + stateName string + stateMachineName string + tenantId string + end bool + temporaryState statelang.State +} + +func NewStateInstruction(stateMachineName string, tenantId string) *StateInstruction { + return &StateInstruction{stateMachineName: stateMachineName, tenantId: tenantId} +} + +func (s *StateInstruction) StateName() string { + return s.stateName +} + +func (s *StateInstruction) SetStateName(stateName string) { + s.stateName = stateName +} + +func (s *StateInstruction) StateMachineName() string { + return s.stateMachineName +} + +func (s *StateInstruction) SetStateMachineName(stateMachineName string) { + s.stateMachineName = stateMachineName +} + +func (s *StateInstruction) TenantId() string { + return s.tenantId +} + +func (s *StateInstruction) SetTenantId(tenantId string) { + s.tenantId = tenantId +} + +func (s *StateInstruction) End() bool { + return s.end +} + +func (s *StateInstruction) SetEnd(end bool) { + s.end = end +} + +func (s *StateInstruction) TemporaryState() statelang.State { + return s.temporaryState +} + +func (s *StateInstruction) SetTemporaryState(temporaryState statelang.State) { + s.temporaryState = temporaryState +} + +func (s *StateInstruction) GetState(context ProcessContext) (statelang.State, error) { + if s.temporaryState != nil { + return s.temporaryState, nil + } + + if s.stateMachineName == "" { + return nil, errors.New("stateMachineName is required") + } + + stateMachineConfig, ok := context.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig) + if !ok { + return nil, errors.New("stateMachineConfig is required in context") + } + stateMachine, err := stateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(s.stateMachineName, s.tenantId) + if err != nil { + return nil, errors.New("get stateMachine in state machine repository error") + } + if stateMachine == nil { + return nil, errors.New(fmt.Sprintf("stateMachine [%s] is not exist", s.stateMachineName)) + } + + if s.stateName == "" { + s.stateName = stateMachine.StartState() + } + + state := stateMachine.States()[s.stateName] + if state == nil { + return nil, errors.New(fmt.Sprintf("state [%s] is not exist", s.stateName)) + } + + return state, nil +} diff --git a/pkg/saga/statemachine/engine/core/loop_context_holder.go b/pkg/saga/statemachine/engine/core/loop_context_holder.go new file mode 100644 index 00000000..b1f69fe8 --- /dev/null +++ b/pkg/saga/statemachine/engine/core/loop_context_holder.go @@ -0,0 +1,92 @@ +package core + +import ( + "context" + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "sync" +) + +type LoopContextHolder struct { + nrOfInstances int32 + nrOfActiveInstances int32 + nrOfCompletedInstances int32 + failEnd bool + completionConditionSatisfied bool + loopCounterStack []int + forwardCounterStack []int + collection interface{} +} + +func NewLoopContextHolder() *LoopContextHolder { + return &LoopContextHolder{ + nrOfInstances: 0, + nrOfActiveInstances: 0, + nrOfCompletedInstances: 0, + failEnd: false, + completionConditionSatisfied: false, + loopCounterStack: make([]int, 0), + forwardCounterStack: make([]int, 0), + collection: nil, + } +} + +func GetCurrentLoopContextHolder(ctx context.Context, processContext ProcessContext, forceCreate bool) *LoopContextHolder { + mutex := processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex) + mutex.Lock() + defer mutex.Unlock() + + loopContextHolder := processContext.GetVariable(constant.VarNameCurrentLoopContextHolder).(*LoopContextHolder) + if loopContextHolder == nil && forceCreate { + loopContextHolder = &LoopContextHolder{} + processContext.SetVariable(constant.VarNameCurrentLoopContextHolder, loopContextHolder) + } + return loopContextHolder +} + +func ClearCurrent(ctx context.Context, processContext ProcessContext) { + processContext.RemoveVariable(constant.VarNameCurrentLoopContextHolder) +} + +func (l *LoopContextHolder) NrOfInstances() int32 { + return l.nrOfInstances +} + +func (l *LoopContextHolder) NrOfActiveInstances() int32 { + return l.nrOfActiveInstances +} + +func (l *LoopContextHolder) NrOfCompletedInstances() int32 { + return l.nrOfCompletedInstances +} + +func (l *LoopContextHolder) FailEnd() bool { + return l.failEnd +} + +func (l *LoopContextHolder) SetFailEnd(failEnd bool) { + l.failEnd = failEnd +} + +func (l *LoopContextHolder) CompletionConditionSatisfied() bool { + return l.completionConditionSatisfied +} + +func (l *LoopContextHolder) SetCompletionConditionSatisfied(completionConditionSatisfied bool) { + l.completionConditionSatisfied = completionConditionSatisfied +} + +func (l *LoopContextHolder) LoopCounterStack() []int { + return l.loopCounterStack +} + +func (l *LoopContextHolder) ForwardCounterStack() []int { + return l.forwardCounterStack +} + +func (l *LoopContextHolder) Collection() interface{} { + return l.collection +} + +func (l *LoopContextHolder) SetCollection(collection interface{}) { + l.collection = collection +} diff --git a/pkg/saga/statemachine/engine/core/loop_task_utils.go b/pkg/saga/statemachine/engine/core/loop_task_utils.go new file mode 100644 index 00000000..d56ac7f0 --- /dev/null +++ b/pkg/saga/statemachine/engine/core/loop_task_utils.go @@ -0,0 +1,40 @@ +package core + +import ( + "context" + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state" + "github.com/seata/seata-go/pkg/util/log" +) + +func GetLoopConfig(ctx context.Context, processContext ProcessContext, currentState statelang.State) state.Loop { + if matchLoop(currentState) { + taskState := currentState.(state.AbstractTaskState) + stateMachineInstance := processContext.GetVariable(constant.VarNameStateMachineInst).(statelang.StateMachineInstance) + stateMachineConfig := processContext.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig) + + if taskState.Loop() != nil { + loop := taskState.Loop() + collectionName := loop.Collection() + if collectionName != "" { + expression := CreateValueExpression(stateMachineConfig.ExpressionResolver(), collectionName) + collection := GetValue(expression, stateMachineInstance.Context(), nil) + collectionList := collection.([]any) + if len(collectionList) > 0 { + current := GetCurrentLoopContextHolder(ctx, processContext, true) + current.SetCollection(collection) + return loop + } + } + log.Warn("State [{}] loop collection param [{}] invalid", currentState.Name(), collectionName) + } + + } + return nil +} + +func matchLoop(currentState statelang.State) bool { + return currentState != nil && (constant.StateTypeServiceTask == currentState.Type() || + constant.StateTypeScriptTask == currentState.Type() || constant.StateTypeSubStateMachine == currentState.Type()) +} diff --git a/pkg/saga/statemachine/engine/core/parameter_utils.go b/pkg/saga/statemachine/engine/core/parameter_utils.go new file mode 100644 index 00000000..680167cf --- /dev/null +++ b/pkg/saga/statemachine/engine/core/parameter_utils.go @@ -0,0 +1,132 @@ +package core + +import ( + "fmt" + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state" + "strings" + "sync" +) + +func CreateInputParams(processContext ProcessContext, expressionResolver expr.ExpressionResolver, + stateInstance *statelang.StateInstanceImpl, serviceTaskState *state.AbstractTaskState, variablesFrom any) []any { + inputAssignments := serviceTaskState.Input() + if inputAssignments == nil || len(inputAssignments) == 0 { + return inputAssignments + } + + inputExpressions := serviceTaskState.InputExpressions() + if inputExpressions == nil || len(inputExpressions) == 0 { + lock := processContext.GetVariable(constant.VarNameProcessContextMutexLock).(*sync.Mutex) + lock.Lock() + defer lock.Unlock() + inputExpressions = serviceTaskState.InputExpressions() + if inputExpressions == nil || len(inputExpressions) == 0 { + inputExpressions = make([]any, 0, len(inputAssignments)) + + for _, assignment := range inputAssignments { + inputExpressions = append(inputExpressions, CreateValueExpression(expressionResolver, assignment)) + } + } + serviceTaskState.SetInputExpressions(inputExpressions) + } + inputValues := make([]any, 0, len(inputExpressions)) + for _, valueExpression := range inputExpressions { + value := GetValue(valueExpression, variablesFrom, stateInstance) + inputValues = append(inputValues, value) + } + + return inputValues +} + +func CreateOutputParams(config StateMachineConfig, expressionResolver expr.ExpressionResolver, + serviceTaskState *state.AbstractTaskState, variablesFrom any) (map[string]any, error) { + outputAssignments := serviceTaskState.Output() + if outputAssignments == nil || len(outputAssignments) == 0 { + return make(map[string]any, 0), nil + } + + outputExpressions := serviceTaskState.OutputExpressions() + if outputExpressions == nil { + config.ComponentLock().Lock() + defer config.ComponentLock().Unlock() + outputExpressions = serviceTaskState.OutputExpressions() + if outputExpressions == nil { + outputExpressions = make(map[string]any, len(outputAssignments)) + for key, value := range outputAssignments { + outputExpressions[key] = CreateValueExpression(expressionResolver, value) + } + } + serviceTaskState.SetOutputExpressions(outputExpressions) + } + outputValues := make(map[string]any, len(outputExpressions)) + for paramName, _ := range outputExpressions { + outputValues[paramName] = GetValue(outputExpressions[paramName], variablesFrom, nil) + } + return outputValues, nil +} + +func CreateValueExpression(expressionResolver expr.ExpressionResolver, paramAssignment any) any { + var valueExpression any + + switch paramAssignment.(type) { + case expr.Expression: + valueExpression = paramAssignment + case map[string]any: + paramMapAssignment := paramAssignment.(map[string]any) + paramMap := make(map[string]any, len(paramMapAssignment)) + for key, value := range paramMapAssignment { + paramMap[key] = CreateValueExpression(expressionResolver, value) + } + valueExpression = paramMap + case []any: + paramListAssignment := paramAssignment.([]any) + paramList := make([]any, 0, len(paramListAssignment)) + for _, value := range paramListAssignment { + paramList = append(paramList, CreateValueExpression(expressionResolver, value)) + } + valueExpression = paramList + case string: + value := paramAssignment.(string) + if !strings.HasPrefix(value, "$") { + valueExpression = paramAssignment + } + valueExpression = expressionResolver.Expression(value) + default: + valueExpression = paramAssignment + } + return valueExpression +} + +func GetValue(valueExpression any, variablesFrom any, stateInstance statelang.StateInstance) any { + switch valueExpression.(type) { + case expr.Expression: + expression := valueExpression.(expr.Expression) + value := expression.Value(variablesFrom) + if _, ok := valueExpression.(expr.SequenceExpression); value != nil && stateInstance != nil && stateInstance.BusinessKey() == "" && ok { + stateInstance.SetBusinessKey(fmt.Sprintf("%v", value)) + } + return value + case map[string]any: + mapValueExpression := valueExpression.(map[string]any) + mapValue := make(map[string]any, len(mapValueExpression)) + for key, value := range mapValueExpression { + value = GetValue(value, variablesFrom, stateInstance) + if value != nil { + mapValue[key] = value + } + } + return mapValue + case []any: + valueExpressionList := valueExpression.([]any) + listValue := make([]any, 0, len(valueExpression.([]any))) + for i, _ := range valueExpressionList { + listValue = append(listValue, GetValue(valueExpressionList[i], variablesFrom, stateInstance)) + } + return listValue + default: + return valueExpression + } +} diff --git a/pkg/saga/statemachine/engine/process_ctrl/process_context.go b/pkg/saga/statemachine/engine/core/process_context.go similarity index 99% rename from pkg/saga/statemachine/engine/process_ctrl/process_context.go rename to pkg/saga/statemachine/engine/core/process_context.go index 7d02e470..9da5c3e8 100644 --- a/pkg/saga/statemachine/engine/process_ctrl/process_context.go +++ b/pkg/saga/statemachine/engine/core/process_context.go @@ -1,4 +1,4 @@ -package process_ctrl +package core import ( "sync" diff --git a/pkg/saga/statemachine/engine/core/process_controller.go b/pkg/saga/statemachine/engine/core/process_controller.go new file mode 100644 index 00000000..2f45aebd --- /dev/null +++ b/pkg/saga/statemachine/engine/core/process_controller.go @@ -0,0 +1,31 @@ +package core + +import ( + "context" +) + +type ProcessController interface { + Process(ctx context.Context, context ProcessContext) error +} + +type ProcessControllerImpl struct { + businessProcessor BusinessProcessor +} + +func (p *ProcessControllerImpl) Process(ctx context.Context, context ProcessContext) error { + if err := p.businessProcessor.Process(ctx, context); err != nil { + return err + } + if err := p.businessProcessor.Route(ctx, context); err != nil { + return err + } + return nil +} + +func (p *ProcessControllerImpl) BusinessProcessor() BusinessProcessor { + return p.businessProcessor +} + +func (p *ProcessControllerImpl) SetBusinessProcessor(businessProcessor BusinessProcessor) { + p.businessProcessor = businessProcessor +} diff --git a/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go new file mode 100644 index 00000000..426ec65d --- /dev/null +++ b/pkg/saga/statemachine/engine/core/process_ctrl_statemachine_engine.go @@ -0,0 +1,424 @@ +package core + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/exception" + "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state" + seataErrors "github.com/seata/seata-go/pkg/util/errors" + "github.com/seata/seata-go/pkg/util/log" + "time" +) + +type ProcessCtrlStateMachineEngine struct { + StateMachineConfig StateMachineConfig +} + +func NewProcessCtrlStateMachineEngine() *ProcessCtrlStateMachineEngine { + return &ProcessCtrlStateMachineEngine{ + StateMachineConfig: NewDefaultStateMachineConfig(), + } +} + +func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) { + return p.startInternal(ctx, stateMachineName, tenantId, "", startParams, false, nil) +} + +func (p ProcessCtrlStateMachineEngine) Compensate(ctx context.Context, stateMachineInstId string, + replaceParams map[string]any) (statelang.StateMachineInstance, error) { + return p.compensateInternal(ctx, stateMachineInstId, replaceParams, false, nil) +} + +func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, stateMachineName string, tenantId string, + businessKey string, startParams map[string]interface{}, async bool, callback CallBack) (statelang.StateMachineInstance, error) { + if tenantId == "" { + tenantId = p.StateMachineConfig.DefaultTenantId() + } + + stateMachineInstance, err := p.createMachineInstance(stateMachineName, tenantId, businessKey, startParams) + if err != nil { + return nil, err + } + + // Build the process_ctrl context. + processContextBuilder := NewProcessContextBuilder(). + WithProcessType(process.StateLang). + WithOperationName(constant.OperationNameStart). + WithAsyncCallback(callback). + WithInstruction(NewStateInstruction(stateMachineName, tenantId)). + WithStateMachineInstance(stateMachineInstance). + WithStateMachineConfig(p.StateMachineConfig). + WithStateMachineEngine(p). + WithIsAsyncExecution(async) + + contextMap := p.copyMap(startParams) + + stateMachineInstance.SetContext(contextMap) + + processContext := processContextBuilder.WithStateMachineContextVariables(contextMap).Build() + + if stateMachineInstance.StateMachine().IsPersist() && p.StateMachineConfig.StateLogStore() != nil { + err := p.StateMachineConfig.StateLogStore().RecordStateMachineStarted(ctx, stateMachineInstance, processContext) + if err != nil { + return nil, err + } + } + + if stateMachineInstance.ID() == "" { + stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(constant.SeqEntityStateMachineInst, "")) + } + + var eventPublisher EventPublisher + if async { + eventPublisher = p.StateMachineConfig.AsyncEventPublisher() + } else { + eventPublisher = p.StateMachineConfig.EventPublisher() + } + + _, err = eventPublisher.PushEvent(ctx, processContext) + if err != nil { + return nil, err + } + + return stateMachineInstance, nil +} + +// copyMap not deep copy, so best practice: Don’t pass by reference +func (p ProcessCtrlStateMachineEngine) copyMap(startParams map[string]interface{}) map[string]interface{} { + copyMap := make(map[string]interface{}, len(startParams)) + for k, v := range startParams { + copyMap[k] = v + } + return copyMap +} + +func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName string, tenantId string, businessKey string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) { + stateMachine, err := p.StateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(stateMachineName, tenantId) + if err != nil { + return nil, err + } + + if stateMachine == nil { + return nil, errors.New("StateMachine [" + stateMachineName + "] is not exists") + } + + stateMachineInstance := statelang.NewStateMachineInstanceImpl() + stateMachineInstance.SetStateMachine(stateMachine) + stateMachineInstance.SetTenantID(tenantId) + stateMachineInstance.SetBusinessKey(businessKey) + stateMachineInstance.SetStartParams(startParams) + if startParams != nil { + if businessKey != "" { + startParams[constant.VarNameBusinesskey] = businessKey + } + + if startParams[constant.VarNameParentId] != nil { + parentId, ok := startParams[constant.VarNameParentId].(string) + if !ok { + + } + stateMachineInstance.SetParentID(parentId) + delete(startParams, constant.VarNameParentId) + } + } + + stateMachineInstance.SetStatus(statelang.RU) + stateMachineInstance.SetRunning(true) + + now := time.Now() + stateMachineInstance.SetStartedTime(now) + stateMachineInstance.SetUpdatedTime(now) + return stateMachineInstance, nil +} + +func (p ProcessCtrlStateMachineEngine) compensateInternal(ctx context.Context, stateMachineInstId string, replaceParams map[string]any, + async bool, callback CallBack) (statelang.StateMachineInstance, error) { + stateMachineInstance, err := p.reloadStateMachineInstance(ctx, stateMachineInstId) + if err != nil { + return nil, err + } + + if stateMachineInstance == nil { + return nil, exception.NewEngineExecutionException(seataErrors.StateMachineInstanceNotExists, + "StateMachineInstance is not exits", nil) + } + + if statelang.SU == stateMachineInstance.CompensationStatus() { + return stateMachineInstance, nil + } + + if stateMachineInstance.CompensationStatus() != "" { + denyStatus := make([]statelang.ExecutionStatus, 0) + denyStatus = append(denyStatus, statelang.SU) + p.checkStatus(ctx, stateMachineInstance, nil, denyStatus, "", stateMachineInstance.CompensationStatus(), + "compensate") + } + + if replaceParams != nil { + for key, value := range replaceParams { + stateMachineInstance.EndParams()[key] = value + } + } + + contextBuilder := NewProcessContextBuilder().WithProcessType(process.StateLang). + WithOperationName(constant.OperationNameCompensate).WithAsyncCallback(callback). + WithStateMachineInstance(stateMachineInstance). + WithStateMachineConfig(p.StateMachineConfig).WithStateMachineEngine(p).WithIsAsyncExecution(async) + + context := contextBuilder.Build() + + contextVariables, err := p.getStateMachineContextVariables(ctx, stateMachineInstance) + + if replaceParams != nil { + for key, value := range replaceParams { + contextVariables[key] = value + } + } + + p.putBusinesskeyToContextariables(stateMachineInstance, contextVariables) + + // TODO: Here is not use sync.map, make sure whether to use it + concurrentContextVariables := make(map[string]any) + p.nullSafeCopy(contextVariables, concurrentContextVariables) + + context.SetVariable(constant.VarNameStateMachineContext, concurrentContextVariables) + stateMachineInstance.SetContext(concurrentContextVariables) + + tempCompensationTriggerState := state.NewCompensationTriggerStateImpl() + tempCompensationTriggerState.SetStateMachine(stateMachineInstance.StateMachine()) + + stateMachineInstance.SetRunning(true) + + log.Info("Operation [compensate] start. stateMachineInstance[id:" + stateMachineInstance.ID() + "]") + + if stateMachineInstance.StateMachine().IsPersist() { + err := p.StateMachineConfig.StateLogStore().RecordStateMachineRestarted(ctx, stateMachineInstance, context) + if err != nil { + return nil, err + } + } + + inst := NewStateInstruction(stateMachineInstance.TenantID(), stateMachineInstance.StateMachine().Name()) + inst.SetTemporaryState(tempCompensationTriggerState) + context.SetInstruction(inst) + + if async { + _, err := p.StateMachineConfig.AsyncEventPublisher().PushEvent(ctx, context) + if err != nil { + return nil, err + } + } else { + _, err := p.StateMachineConfig.EventPublisher().PushEvent(ctx, context) + if err != nil { + return nil, err + } + } + + return stateMachineInstance, nil +} + +func (p ProcessCtrlStateMachineEngine) reloadStateMachineInstance(ctx context.Context, instId string) (statelang.StateMachineInstance, error) { + instance, err := p.StateMachineConfig.StateLogStore().GetStateMachineInstance(instId) + if err != nil { + return nil, err + } + if instance != nil { + stateMachine := instance.StateMachine() + if stateMachine == nil { + stateMachine, err = p.StateMachineConfig.StateMachineRepository().GetStateMachineById(instance.MachineID()) + if err != nil { + return nil, err + } + instance.SetStateMachine(stateMachine) + } + if stateMachine == nil { + return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists, + "StateMachine[id:"+instance.MachineID()+"] not exist.", nil) + } + + stateList := instance.StateList() + if stateList == nil || len(stateList) == 0 { + stateList, err = p.StateMachineConfig.StateLogStore().GetStateInstanceListByMachineInstanceId(instId) + if err != nil { + return nil, err + } + if stateList != nil && len(stateList) > 0 { + for _, tmpStateInstance := range stateList { + instance.PutState(tmpStateInstance.ID(), tmpStateInstance) + } + } + } + + if instance.EndParams() == nil || len(instance.EndParams()) == 0 { + variables, err := p.replayContextVariables(ctx, instance) + if err != nil { + return nil, err + } + instance.SetEndParams(variables) + } + } + return instance, nil +} + +func (p ProcessCtrlStateMachineEngine) replayContextVariables(ctx context.Context, stateMachineInstance statelang.StateMachineInstance) (map[string]any, error) { + contextVariables := make(map[string]any) + if stateMachineInstance.StartParams() != nil { + for key, value := range stateMachineInstance.StartParams() { + contextVariables[key] = value + } + } + + stateInstanceList := stateMachineInstance.StateList() + if stateInstanceList == nil || len(stateInstanceList) == 0 { + return contextVariables, nil + } + + for _, stateInstance := range stateInstanceList { + serviceOutputParams := stateInstance.OutputParams() + if serviceOutputParams != nil { + serviceTaskStateImpl, ok := stateMachineInstance.StateMachine().State(GetOriginStateName(stateInstance)).(*state.ServiceTaskStateImpl) + if !ok { + return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists, + "Cannot find State by state name ["+stateInstance.Name()+"], may be this is a bug", nil) + } + + if serviceTaskStateImpl.Output() != nil && len(serviceTaskStateImpl.Output()) != 0 { + outputVariablesToContext, err := CreateOutputParams(p.StateMachineConfig, + p.StateMachineConfig.ExpressionResolver(), serviceTaskStateImpl.AbstractTaskState, serviceOutputParams) + if err != nil { + return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists, + "Context variable replay failed", err) + } + if outputVariablesToContext != nil && len(outputVariablesToContext) != 0 { + for key, value := range outputVariablesToContext { + contextVariables[key] = value + } + } + if len(stateInstance.BusinessKey()) > 0 { + contextVariables[serviceTaskStateImpl.Name()+constant.VarNameBusinesskey] = stateInstance.BusinessKey() + } + } + } + } + + return contextVariables, nil +} + +func (p ProcessCtrlStateMachineEngine) checkStatus(ctx context.Context, stateMachineInstance statelang.StateMachineInstance, + acceptStatus []statelang.ExecutionStatus, denyStatus []statelang.ExecutionStatus, status statelang.ExecutionStatus, + compenStatus statelang.ExecutionStatus, operation string) (bool, error) { + if status != "" && compenStatus != "" { + return false, exception.NewEngineExecutionException(seataErrors.InvalidParameter, + "status and compensationStatus are not supported at the same time", nil) + } + if status == "" && compenStatus == "" { + return false, exception.NewEngineExecutionException(seataErrors.InvalidParameter, + "status and compensationStatus must input at least one", nil) + } + if statelang.SU == compenStatus { + message := p.buildExceptionMessage(stateMachineInstance, nil, nil, "", statelang.SU, operation) + return false, exception.NewEngineExecutionException(seataErrors.OperationDenied, + message, nil) + } + + if stateMachineInstance.IsRunning() && + !IsTimeout(stateMachineInstance.UpdatedTime(), p.StateMachineConfig.TransOperationTimeout()) { + return false, exception.NewEngineExecutionException(seataErrors.OperationDenied, + "StateMachineInstance [id:"+stateMachineInstance.ID()+"] is running, operation["+operation+ + "] denied", nil) + } + + if (denyStatus == nil || len(denyStatus) == 0) && (acceptStatus == nil || len(acceptStatus) == 0) { + return false, exception.NewEngineExecutionException(seataErrors.InvalidParameter, + "StateMachineInstance[id:"+stateMachineInstance.ID()+ + "], acceptable status and deny status must input at least one", nil) + } + + currentStatus := compenStatus + if status != "" { + currentStatus = status + } + + if denyStatus != nil && len(denyStatus) == 0 { + for _, tempDenyStatus := range denyStatus { + if tempDenyStatus == currentStatus { + message := p.buildExceptionMessage(stateMachineInstance, acceptStatus, denyStatus, status, + compenStatus, operation) + return false, exception.NewEngineExecutionException(seataErrors.OperationDenied, + message, nil) + } + } + } + + if acceptStatus == nil || len(acceptStatus) == 0 { + return true, nil + } else { + for _, tempStatus := range acceptStatus { + if tempStatus == currentStatus { + return true, nil + } + } + } + + message := p.buildExceptionMessage(stateMachineInstance, acceptStatus, denyStatus, status, compenStatus, + operation) + return false, exception.NewEngineExecutionException(seataErrors.OperationDenied, + message, nil) +} + +func (p ProcessCtrlStateMachineEngine) getStateMachineContextVariables(ctx context.Context, + stateMachineInstance statelang.StateMachineInstance) (map[string]any, error) { + contextVariables := stateMachineInstance.EndParams() + if contextVariables == nil || len(contextVariables) == 0 { + return p.replayContextVariables(ctx, stateMachineInstance) + } + return contextVariables, nil +} + +func (p ProcessCtrlStateMachineEngine) buildExceptionMessage(instance statelang.StateMachineInstance, + acceptStatus []statelang.ExecutionStatus, denyStatus []statelang.ExecutionStatus, status statelang.ExecutionStatus, + compenStatus statelang.ExecutionStatus, operation string) string { + message := fmt.Sprintf("StateMachineInstance[id:%s]", instance.ID()) + if len(acceptStatus) > 0 { + message += ",acceptable status :" + for _, tempStatus := range acceptStatus { + message += string(tempStatus) + " " + } + } + + if len(denyStatus) > 0 { + message += ",deny status:" + for _, tempStatus := range denyStatus { + message += string(tempStatus) + " " + } + } + + if status != "" { + message += ",current status:" + string(status) + } + + if compenStatus != "" { + message += ",current compensation status:" + string(compenStatus) + } + + message += fmt.Sprintf(",so operation [%s] denied", operation) + return message +} + +func (p ProcessCtrlStateMachineEngine) putBusinesskeyToContextariables(instance statelang.StateMachineInstance, variables map[string]any) { + if instance.BusinessKey() != "" && variables[constant.VarNameBusinesskey] == "" { + variables[constant.VarNameBusinesskey] = instance.BusinessKey() + } +} + +func (p ProcessCtrlStateMachineEngine) nullSafeCopy(srcMap map[string]any, destMap map[string]any) { + for key, value := range srcMap { + if value == nil { + destMap[key] = value + } + } +} diff --git a/pkg/saga/statemachine/engine/core/process_router.go b/pkg/saga/statemachine/engine/core/process_router.go new file mode 100644 index 00000000..5d7e5998 --- /dev/null +++ b/pkg/saga/statemachine/engine/core/process_router.go @@ -0,0 +1,194 @@ +package core + +import ( + "context" + "github.com/pkg/errors" + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "github.com/seata/seata-go/pkg/util/log" +) + +type RouterHandler interface { + Route(ctx context.Context, processContext ProcessContext) error +} + +type ProcessRouter interface { + Route(ctx context.Context, processContext ProcessContext) error +} + +type InterceptAbleStateRouter interface { + StateRouter + StateRouterInterceptor() []StateRouterInterceptor + RegistryStateRouterInterceptor(stateRouterInterceptor StateRouterInterceptor) +} + +type StateRouter interface { + Route(ctx context.Context, processContext ProcessContext, state statelang.State) (Instruction, error) +} + +type StateRouterInterceptor interface { + PreRoute(ctx context.Context, processContext ProcessContext, state statelang.State) error + PostRoute(ctx context.Context, processContext ProcessContext, instruction Instruction, err error) error + Match(stateType string) bool +} + +type DefaultRouterHandler struct { + eventPublisher EventPublisher + processRouters map[string]ProcessRouter +} + +func (d *DefaultRouterHandler) Route(ctx context.Context, processContext ProcessContext) error { + processType := d.matchProcessType(ctx, processContext) + if processType == "" { + log.Warnf("Process type not found, context= %s", processContext) + return errors.New("Process type not found") + } + + processRouter := d.processRouters[string(processType)] + if processRouter == nil { + log.Errorf("Cannot find process router by type %s, context = %s", processType, processContext) + return errors.New("Process router not found") + } + + instruction := processRouter.Route(ctx, processContext) + if instruction == nil { + log.Info("route instruction is null, process end") + } else { + processContext.SetInstruction(instruction) + _, err := d.eventPublisher.PushEvent(ctx, processContext) + if err != nil { + return err + } + } + + return nil +} + +func (d *DefaultRouterHandler) matchProcessType(ctx context.Context, processContext ProcessContext) process.ProcessType { + processType, ok := processContext.GetVariable(constant.VarNameProcessType).(process.ProcessType) + if !ok || processType == "" { + processType = process.StateLang + } + return processType +} + +func (d *DefaultRouterHandler) EventPublisher() EventPublisher { + return d.eventPublisher +} + +func (d *DefaultRouterHandler) SetEventPublisher(eventPublisher EventPublisher) { + d.eventPublisher = eventPublisher +} + +func (d *DefaultRouterHandler) ProcessRouters() map[string]ProcessRouter { + return d.processRouters +} + +func (d *DefaultRouterHandler) SetProcessRouters(processRouters map[string]ProcessRouter) { + d.processRouters = processRouters +} + +type StateMachineProcessRouter struct { + stateRouters map[string]StateRouter +} + +func (s *StateMachineProcessRouter) Route(ctx context.Context, processContext ProcessContext) (Instruction, error) { + stateInstruction, ok := processContext.GetInstruction().(StateInstruction) + if !ok { + return nil, errors.New("instruction is not a state instruction") + } + + var state statelang.State + if stateInstruction.TemporaryState() != nil { + state = stateInstruction.TemporaryState() + stateInstruction.SetTemporaryState(nil) + } else { + stateMachineConfig, ok := processContext.GetVariable(constant.VarNameStateMachineConfig).(StateMachineConfig) + if !ok { + return nil, errors.New("state machine config not found") + } + + stateMachine, err := stateMachineConfig.StateMachineRepository().GetStateMachineByNameAndTenantId(stateInstruction.StateMachineName(), + stateInstruction.TenantId()) + if err != nil { + return nil, err + } + + state = stateMachine.States()[stateInstruction.StateName()] + } + + stateType := state.Type() + router := s.stateRouters[stateType] + + var interceptors []StateRouterInterceptor + if interceptAbleStateRouter, ok := router.(InterceptAbleStateRouter); ok { + interceptors = interceptAbleStateRouter.StateRouterInterceptor() + } + + var executedInterceptors []StateRouterInterceptor + var exception error + instruction, exception := func() (Instruction, error) { + if interceptors == nil || len(executedInterceptors) == 0 { + executedInterceptors = make([]StateRouterInterceptor, 0, len(interceptors)) + for _, interceptor := range interceptors { + executedInterceptors = append(executedInterceptors, interceptor) + err := interceptor.PreRoute(ctx, processContext, state) + if err != nil { + return nil, err + } + } + } + + instruction, err := router.Route(ctx, processContext, state) + if err != nil { + return nil, err + } + return instruction, nil + }() + + if interceptors == nil || len(executedInterceptors) == 0 { + for i := len(executedInterceptors) - 1; i >= 0; i-- { + err := executedInterceptors[i].PostRoute(ctx, processContext, instruction, exception) + if err != nil { + return nil, err + } + } + + // if 'Succeed' or 'Fail' State did not configured, we must end the state machine + if instruction == nil && !stateInstruction.End() { + err := EndStateMachine(ctx, processContext) + if err != nil { + return nil, err + } + } + } + + return instruction, nil +} + +func (s *StateMachineProcessRouter) InitDefaultStateRouters() { + if s.stateRouters == nil || len(s.stateRouters) == 0 { + s.stateRouters = make(map[string]StateRouter) + taskStateRouter := &TaskStateRouter{} + s.stateRouters[constant.StateTypeServiceTask] = taskStateRouter + s.stateRouters[constant.StateTypeScriptTask] = taskStateRouter + s.stateRouters[constant.StateTypeChoice] = taskStateRouter + s.stateRouters[constant.StateTypeCompensationTrigger] = taskStateRouter + s.stateRouters[constant.StateTypeSubStateMachine] = taskStateRouter + s.stateRouters[constant.StateTypeCompensateSubMachine] = taskStateRouter + s.stateRouters[constant.StateTypeLoopStart] = taskStateRouter + + endStateRouter := &EndStateRouter{} + s.stateRouters[constant.StateTypeSucceed] = endStateRouter + s.stateRouters[constant.StateTypeFail] = endStateRouter + } +} + +func (s *StateMachineProcessRouter) StateRouters() map[string]StateRouter { + return s.stateRouters +} + +func (s *StateMachineProcessRouter) SetStateRouters(stateRouters map[string]StateRouter) { + s.stateRouters = stateRouters +} diff --git a/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go b/pkg/saga/statemachine/engine/core/process_state.go similarity index 69% rename from pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go rename to pkg/saga/statemachine/engine/core/process_state.go index 66338fd8..3dc06920 100644 --- a/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go +++ b/pkg/saga/statemachine/engine/core/process_state.go @@ -1,8 +1,8 @@ -package process_ctrl +package core import ( "context" - "github.com/pkg/errors" + "errors" "sync" ) @@ -11,20 +11,20 @@ type StateHandler interface { ProcessHandler } -type StateRouter interface { - State() string - RouterHandler -} - type InterceptAbleStateHandler interface { StateHandler StateHandlerInterceptorList() []StateHandlerInterceptor RegistryStateHandlerInterceptor(stateHandlerInterceptor StateHandlerInterceptor) } +type ProcessHandler interface { + Process(ctx context.Context, processContext ProcessContext) error +} + type StateHandlerInterceptor interface { PreProcess(ctx context.Context, processContext ProcessContext) error PostProcess(ctx context.Context, processContext ProcessContext) error + Match(stateType string) bool } type StateMachineProcessHandler struct { @@ -99,40 +99,3 @@ func (s *StateMachineProcessHandler) RegistryStateHandler(stateType string, stat } s.mp[stateType] = stateHandler } - -type StateMachineRouterHandler struct { - mu sync.RWMutex - mp map[string]StateRouter -} - -func (s *StateMachineRouterHandler) Route(ctx context.Context, processContext ProcessContext) error { - stateInstruction, _ := processContext.GetInstruction().(StateInstruction) - - state, err := stateInstruction.GetState(processContext) - if err != nil { - return err - } - - stateType := state.Type() - stateRouter := s.GetStateRouter(stateType) - if stateRouter == nil { - return errors.New("Not support [" + stateType + "] state router") - } - - return stateRouter.Route(ctx, processContext) -} - -func (s *StateMachineRouterHandler) GetStateRouter(stateType string) StateRouter { - s.mu.RLock() - defer s.mu.RUnlock() - return s.mp[stateType] -} - -func (s *StateMachineRouterHandler) RegistryStateRouter(stateType string, stateRouter StateRouter) { - s.mu.Lock() - defer s.mu.Unlock() - if s.mp == nil { - s.mp = make(map[string]StateRouter) - } - s.mp[stateType] = stateRouter -} diff --git a/pkg/saga/statemachine/engine/core/state_router.go b/pkg/saga/statemachine/engine/core/state_router.go new file mode 100644 index 00000000..185984e4 --- /dev/null +++ b/pkg/saga/statemachine/engine/core/state_router.go @@ -0,0 +1,151 @@ +package core + +import ( + "context" + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/exception" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + sagaState "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state" + seataErrors "github.com/seata/seata-go/pkg/util/errors" + "github.com/seata/seata-go/pkg/util/log" +) + +type EndStateRouter struct { +} + +func (e EndStateRouter) Route(ctx context.Context, processContext ProcessContext, state statelang.State) (Instruction, error) { + return nil, nil +} + +type TaskStateRouter struct { +} + +func (t TaskStateRouter) Route(ctx context.Context, processContext ProcessContext, state statelang.State) (Instruction, error) { + stateInstruction, _ := processContext.GetInstruction().(StateInstruction) + if stateInstruction.End() { + log.Infof("StateInstruction is ended, Stop the StateMachine executing. StateMachine[%s] Current State[%s]", + stateInstruction.StateMachineName(), stateInstruction.StateName()) + } + + // check if in loop async condition + isLoop, ok := processContext.GetVariable(constant.VarNameIsLoopState).(bool) + if ok && isLoop { + log.Infof("StateMachine[%s] Current State[%s] is in loop async condition, skip route processing.", + stateInstruction.StateMachineName(), stateInstruction.StateName()) + return nil, nil + } + + // The current CompensationTriggerState can mark the compensation process is started and perform compensation + // route processing. + compensationTriggerState, ok := processContext.GetVariable(constant.VarNameCurrentCompensateTriggerState).(statelang.State) + if ok { + return t.compensateRoute(ctx, processContext, compensationTriggerState) + } + + // There is an exception route, indicating that an exception is thrown, and the exception route is prioritized. + next := processContext.GetVariable(constant.VarNameCurrentExceptionRoute).(string) + + if next != "" { + processContext.RemoveVariable(constant.VarNameCurrentExceptionRoute) + } else { + next = state.Next() + } + + // If next is empty, the state selected by the Choice state was taken. + if next == "" && processContext.HasVariable(constant.VarNameCurrentChoice) { + next = processContext.GetVariable(constant.VarNameCurrentChoice).(string) + processContext.RemoveVariable(constant.VarNameCurrentChoice) + } + + if next == "" { + return nil, nil + } + + stateMachine := state.StateMachine() + nextState := stateMachine.State(next) + if nextState == nil { + return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists, + "Next state["+next+"] is not exits", nil) + } + + stateInstruction.SetStateName(next) + + if nil != GetLoopConfig(ctx, processContext, nextState) { + stateInstruction.SetTemporaryState(sagaState.NewLoopStartStateImpl()) + } + + return stateInstruction, nil +} + +func (t *TaskStateRouter) compensateRoute(ctx context.Context, processContext ProcessContext, + compensationTriggerState statelang.State) (Instruction, error) { + //If there is already a compensation state that has been executed, + // it is judged whether it is wrong or unsuccessful, + // and the compensation process is interrupted. + isFirstCompensationStateStart := processContext.GetVariable(constant.VarNameFirstCompensationStateStarted).(bool) + if isFirstCompensationStateStart { + exception := processContext.GetVariable(constant.VarNameCurrentException).(error) + if exception != nil { + return nil, EndStateMachine(ctx, processContext) + } + + stateInstance := processContext.GetVariable(constant.VarNameStateInst).(statelang.StateInstance) + if stateInstance != nil && statelang.SU != stateInstance.Status() { + return nil, EndStateMachine(ctx, processContext) + } + } + + stateStackToBeCompensated := GetCurrentCompensationHolder(ctx, processContext, true).StateStackNeedCompensation() + if stateStackToBeCompensated != nil { + stateToBeCompensated := stateStackToBeCompensated.Pop().(statelang.StateInstance) + + stateMachine := processContext.GetVariable(constant.VarNameStateMachine).(statelang.StateMachine) + state := stateMachine.State(GetOriginStateName(stateToBeCompensated)) + if taskState, ok := state.(sagaState.AbstractTaskState); ok { + instruction := processContext.GetInstruction().(StateInstruction) + + var compensateState statelang.State + compensateStateName := taskState.CompensateState() + if len(compensateStateName) != 0 { + compensateState = stateMachine.State(compensateStateName) + } + + if subStateMachine, ok := state.(sagaState.SubStateMachine); compensateState == nil && ok { + compensateState = subStateMachine.CompensateStateImpl() + instruction.SetTemporaryState(compensateState) + } + + if compensateState == nil { + return nil, EndStateMachine(ctx, processContext) + } + + instruction.SetStateName(compensateState.Name()) + + GetCurrentCompensationHolder(ctx, processContext, true).AddToBeCompensatedState(compensateState.Name(), + stateToBeCompensated) + + hierarchicalProcessContext := processContext.(HierarchicalProcessContext) + hierarchicalProcessContext.SetVariableLocally(constant.VarNameFirstCompensationStateStarted, true) + + if _, ok := compensateState.(sagaState.CompensateSubStateMachineState); ok { + hierarchicalProcessContext = processContext.(HierarchicalProcessContext) + hierarchicalProcessContext.SetVariableLocally( + compensateState.Name()+constant.VarNameSubMachineParentId, + GenerateParentId(stateToBeCompensated)) + } + + return instruction, nil + } + } + + processContext.RemoveVariable(constant.VarNameCurrentCompensateTriggerState) + + compensationTriggerStateNext := compensationTriggerState.Next() + if compensationTriggerStateNext == "" { + return nil, EndStateMachine(ctx, processContext) + } + + instruction := processContext.GetInstruction().(StateInstruction) + instruction.SetStateName(compensationTriggerStateNext) + return instruction, nil +} diff --git a/pkg/saga/statemachine/engine/statemachine_config.go b/pkg/saga/statemachine/engine/core/statemachine_config.go similarity index 52% rename from pkg/saga/statemachine/engine/statemachine_config.go rename to pkg/saga/statemachine/engine/core/statemachine_config.go index 3f3c530b..9c77b73a 100644 --- a/pkg/saga/statemachine/engine/statemachine_config.go +++ b/pkg/saga/statemachine/engine/core/statemachine_config.go @@ -1,22 +1,20 @@ -package engine +package core import ( - "github.com/seata/seata-go/pkg/saga/statemachine/engine/events" "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr" "github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker" "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/status_decision" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/store" + "sync" ) type StateMachineConfig interface { - StateLogRepository() store.StateLogRepository + StateLogRepository() StateLogRepository - StateMachineRepository() store.StateMachineRepository + StateMachineRepository() StateMachineRepository - StateLogStore() store.StateLogStore + StateLogStore() StateLogStore - StateLangStore() store.StateLangStore + StateLangStore() StateLangStore ExpressionFactoryManager() expr.ExpressionFactoryManager @@ -24,11 +22,11 @@ type StateMachineConfig interface { SeqGenerator() sequence.SeqGenerator - StatusDecisionStrategy() status_decision.StatusDecisionStrategy + StatusDecisionStrategy() StatusDecisionStrategy - EventPublisher() events.EventPublisher + EventPublisher() EventPublisher - AsyncEventPublisher() events.EventPublisher + AsyncEventPublisher() EventPublisher ServiceInvokerManager() invoker.ServiceInvokerManager @@ -41,4 +39,6 @@ type StateMachineConfig interface { TransOperationTimeout() int ServiceInvokeTimeout() int + + ComponentLock() *sync.Mutex } diff --git a/pkg/saga/statemachine/engine/core/statemachine_engine.go b/pkg/saga/statemachine/engine/core/statemachine_engine.go new file mode 100644 index 00000000..c18d91c1 --- /dev/null +++ b/pkg/saga/statemachine/engine/core/statemachine_engine.go @@ -0,0 +1,16 @@ +package core + +import ( + "context" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" +) + +type StateMachineEngine interface { + Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) + Compensate(ctx context.Context, stateMachineInstId string, replaceParams map[string]any) (statelang.StateMachineInstance, error) +} + +type CallBack interface { + OnFinished(ctx context.Context, context ProcessContext, stateMachineInstance statelang.StateMachineInstance) + OnError(ctx context.Context, context ProcessContext, stateMachineInstance statelang.StateMachineInstance, err error) +} diff --git a/pkg/saga/statemachine/engine/core/statemachine_engine_test.go b/pkg/saga/statemachine/engine/core/statemachine_engine_test.go new file mode 100644 index 00000000..e87eb9ca --- /dev/null +++ b/pkg/saga/statemachine/engine/core/statemachine_engine_test.go @@ -0,0 +1,15 @@ +package core + +import ( + "context" + "testing" +) + +func TestEngine(t *testing.T) { + +} + +func TestSimpleStateMachine(t *testing.T) { + engine := NewProcessCtrlStateMachineEngine() + engine.Start(context.Background(), "simpleStateMachine", "tenantId", nil) +} diff --git a/pkg/saga/statemachine/engine/store/statemachine_store.go b/pkg/saga/statemachine/engine/core/statemachine_store.go similarity index 83% rename from pkg/saga/statemachine/engine/store/statemachine_store.go rename to pkg/saga/statemachine/engine/core/statemachine_store.go index 2dac06d3..7a763335 100644 --- a/pkg/saga/statemachine/engine/store/statemachine_store.go +++ b/pkg/saga/statemachine/engine/core/statemachine_store.go @@ -1,8 +1,7 @@ -package store +package core import ( "context" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" "github.com/seata/seata-go/pkg/saga/statemachine/statelang" "io" ) @@ -20,15 +19,15 @@ type StateLogRepository interface { } type StateLogStore interface { - RecordStateMachineStarted(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error + RecordStateMachineStarted(ctx context.Context, machineInstance statelang.StateMachineInstance, context ProcessContext) error - RecordStateMachineFinished(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error + RecordStateMachineFinished(ctx context.Context, machineInstance statelang.StateMachineInstance, context ProcessContext) error - RecordStateMachineRestarted(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error + RecordStateMachineRestarted(ctx context.Context, machineInstance statelang.StateMachineInstance, context ProcessContext) error - RecordStateStarted(ctx context.Context, stateInstance statelang.StateInstance, context process_ctrl.ProcessContext) error + RecordStateStarted(ctx context.Context, stateInstance statelang.StateInstance, context ProcessContext) error - RecordStateFinished(ctx context.Context, stateInstance statelang.StateInstance, context process_ctrl.ProcessContext) error + RecordStateFinished(ctx context.Context, stateInstance statelang.StateInstance, context ProcessContext) error GetStateMachineInstance(stateMachineInstanceId string) (statelang.StateMachineInstance, error) @@ -44,6 +43,8 @@ type StateLogStore interface { type StateMachineRepository interface { GetStateMachineById(stateMachineId string) (statelang.StateMachine, error) + GetStateMachineByNameAndTenantId(stateMachineName string, tenantId string) (statelang.StateMachine, error) + GetLastVersionStateMachine(stateMachineName string, tenantId string) (statelang.StateMachine, error) RegistryStateMachine(statelang.StateMachine) error diff --git a/pkg/saga/statemachine/engine/core/status_decision.go b/pkg/saga/statemachine/engine/core/status_decision.go new file mode 100644 index 00000000..e95a6316 --- /dev/null +++ b/pkg/saga/statemachine/engine/core/status_decision.go @@ -0,0 +1,188 @@ +package core + +import ( + "context" + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "github.com/seata/seata-go/pkg/util/log" +) + +type StatusDecisionStrategy interface { + // DecideOnEndState Determine state machine execution status when executing to EndState + DecideOnEndState(ctx context.Context, processContext ProcessContext, + stateMachineInstance statelang.StateMachineInstance, exp error) error + // DecideOnTaskStateFail Determine state machine execution status when executing TaskState error + DecideOnTaskStateFail(ctx context.Context, processContext ProcessContext, + stateMachineInstance statelang.StateMachineInstance, exp error) error + // DecideMachineForwardExecutionStatus Determine the forward execution state of the state machine + DecideMachineForwardExecutionStatus(ctx context.Context, + stateMachineInstance statelang.StateMachineInstance, exp error, specialPolicy bool) error +} + +type DefaultStatusDecisionStrategy struct { +} + +func NewDefaultStatusDecisionStrategy() *DefaultStatusDecisionStrategy { + return &DefaultStatusDecisionStrategy{} +} + +func (d DefaultStatusDecisionStrategy) DecideOnEndState(ctx context.Context, processContext ProcessContext, + stateMachineInstance statelang.StateMachineInstance, exp error) error { + if statelang.RU == stateMachineInstance.CompensationStatus() { + compensationHolder := GetCurrentCompensationHolder(ctx, processContext, true) + if err := decideMachineCompensateStatus(ctx, stateMachineInstance, compensationHolder); err != nil { + return err + } + } else { + failEndStateFlag, ok := processContext.GetVariable(constant.VarNameFailEndStateFlag).(bool) + if !ok { + failEndStateFlag = false + } + if _, err := decideMachineForwardExecutionStatus(ctx, stateMachineInstance, exp, failEndStateFlag); err != nil { + return err + } + } + + if stateMachineInstance.CompensationStatus() != "" && constant.OperationNameForward == + processContext.GetVariable(constant.VarNameOperationName).(string) && statelang.SU == stateMachineInstance.Status() { + stateMachineInstance.SetCompensationStatus(statelang.FA) + } + + log.Debugf("StateMachine Instance[id:%s,name:%s] execute finish with status[%s], compensation status [%s].", + stateMachineInstance.ID(), stateMachineInstance.StateMachine().Name(), + stateMachineInstance.Status(), stateMachineInstance.CompensationStatus()) + + return nil +} + +func decideMachineCompensateStatus(ctx context.Context, stateMachineInstance statelang.StateMachineInstance, compensationHolder *CompensationHolder) error { + if stateMachineInstance.Status() == "" || statelang.RU == stateMachineInstance.Status() { + stateMachineInstance.SetStatus(statelang.UN) + } + if !compensationHolder.StateStackNeedCompensation().Empty() { + hasCompensateSUorUN := false + compensationHolder.StatesForCompensation().Range( + func(key, value any) bool { + stateInstance, ok := value.(statelang.StateInstance) + if !ok { + return false + } + if statelang.UN == stateInstance.Status() || statelang.SU == stateInstance.Status() { + hasCompensateSUorUN = true + return true + } + return false + }) + + if hasCompensateSUorUN { + stateMachineInstance.SetCompensationStatus(statelang.UN) + } else { + stateMachineInstance.SetCompensationStatus(statelang.FA) + } + } else { + hasCompensateError := false + compensationHolder.StatesForCompensation().Range( + func(key, value any) bool { + stateInstance, ok := value.(statelang.StateInstance) + if !ok { + return false + } + if statelang.SU != stateInstance.Status() { + hasCompensateError = true + return true + } + return false + }) + + if hasCompensateError { + stateMachineInstance.SetCompensationStatus(statelang.UN) + } else { + stateMachineInstance.SetCompensationStatus(statelang.SU) + } + } + return nil +} + +func decideMachineForwardExecutionStatus(ctx context.Context, stateMachineInstance statelang.StateMachineInstance, exp error, specialPolicy bool) (bool, error) { + result := false + + if stateMachineInstance.Status() == "" || statelang.RU == stateMachineInstance.Status() { + result = true + stateList := stateMachineInstance.StateList() + setMachineStatusBasedOnStateListAndException(stateMachineInstance, stateList, exp) + + if specialPolicy && statelang.SU == stateMachineInstance.Status() { + for _, stateInstance := range stateMachineInstance.StateList() { + if !stateInstance.IsIgnoreStatus() && (stateInstance.IsForUpdate() || stateInstance.IsForCompensation()) { + stateMachineInstance.SetStatus(statelang.UN) + break + } + } + if statelang.SU == stateMachineInstance.Status() { + stateMachineInstance.SetStatus(statelang.FA) + } + } + } + return result, nil +} + +func setMachineStatusBasedOnStateListAndException(stateMachineInstance statelang.StateMachineInstance, + stateList []statelang.StateInstance, exp error) { + hasSetStatus := false + hasSuccessUpdateService := false + if stateList != nil && len(stateList) > 0 { + hasUnsuccessService := false + + for i := len(stateList) - 1; i >= 0; i-- { + stateInstance := stateList[i] + + if stateInstance.IsIgnoreStatus() || stateInstance.IsForCompensation() { + continue + } + if statelang.UN == stateInstance.Status() { + stateMachineInstance.SetStatus(statelang.UN) + hasSetStatus = true + } else if statelang.SU == stateInstance.Status() { + if constant.StateTypeServiceTask == stateInstance.Type() { + if stateInstance.IsForUpdate() && !stateInstance.IsForCompensation() { + hasSuccessUpdateService = true + } + } + } else if statelang.SK == stateInstance.Status() { + // ignore + } else { + hasUnsuccessService = true + } + } + + if !hasSetStatus && hasUnsuccessService { + if hasSuccessUpdateService { + stateMachineInstance.SetStatus(statelang.UN) + } else { + stateMachineInstance.SetStatus(statelang.FA) + } + hasSetStatus = true + } + } + + if !hasSetStatus { + setMachineStatusBasedOnException(stateMachineInstance, exp, hasSuccessUpdateService) + } +} + +func setMachineStatusBasedOnException(stateMachineInstance statelang.StateMachineInstance, exp error, hasSuccessUpdateService bool) { + //TODO implement me + panic("implement me") +} + +func (d DefaultStatusDecisionStrategy) DecideOnTaskStateFail(ctx context.Context, processContext ProcessContext, + stateMachineInstance statelang.StateMachineInstance, exp error) error { + //TODO implement me + panic("implement me") +} + +func (d DefaultStatusDecisionStrategy) DecideMachineForwardExecutionStatus(ctx context.Context, + stateMachineInstance statelang.StateMachineInstance, exp error, specialPolicy bool) error { + //TODO implement me + panic("implement me") +} diff --git a/pkg/saga/statemachine/engine/utils.go b/pkg/saga/statemachine/engine/core/utils.go similarity index 84% rename from pkg/saga/statemachine/engine/utils.go rename to pkg/saga/statemachine/engine/core/utils.go index 92b7aa10..8300c0eb 100644 --- a/pkg/saga/statemachine/engine/utils.go +++ b/pkg/saga/statemachine/engine/core/utils.go @@ -1,22 +1,22 @@ -package engine +package core import ( "github.com/seata/seata-go/pkg/saga/statemachine/constant" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" + "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process" "github.com/seata/seata-go/pkg/saga/statemachine/statelang" ) // ProcessContextBuilder process_ctrl builder type ProcessContextBuilder struct { - processContext process_ctrl.ProcessContext + processContext ProcessContext } func NewProcessContextBuilder() *ProcessContextBuilder { - processContextImpl := process_ctrl.NewProcessContextImpl() + processContextImpl := NewProcessContextImpl() return &ProcessContextBuilder{processContextImpl} } -func (p *ProcessContextBuilder) WithProcessType(processType process_ctrl.ProcessType) *ProcessContextBuilder { +func (p *ProcessContextBuilder) WithProcessType(processType process.ProcessType) *ProcessContextBuilder { p.processContext.SetVariable(constant.VarNameProcessType, processType) return p } @@ -34,7 +34,7 @@ func (p *ProcessContextBuilder) WithAsyncCallback(callBack CallBack) *ProcessCon return p } -func (p *ProcessContextBuilder) WithInstruction(instruction process_ctrl.Instruction) *ProcessContextBuilder { +func (p *ProcessContextBuilder) WithInstruction(instruction Instruction) *ProcessContextBuilder { if instruction != nil { p.processContext.SetInstruction(instruction) } @@ -81,6 +81,6 @@ func (p *ProcessContextBuilder) WithIsAsyncExecution(async bool) *ProcessContext return p } -func (p *ProcessContextBuilder) Build() process_ctrl.ProcessContext { +func (p *ProcessContextBuilder) Build() ProcessContext { return p.processContext } diff --git a/pkg/saga/statemachine/engine/default_statemachine_config.go b/pkg/saga/statemachine/engine/default_statemachine_config.go deleted file mode 100644 index 2d7b1d8e..00000000 --- a/pkg/saga/statemachine/engine/default_statemachine_config.go +++ /dev/null @@ -1,123 +0,0 @@ -package engine - -import ( - "github.com/seata/seata-go/pkg/saga/statemachine/engine/events" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/status_decision" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/store" -) - -const ( - DefaultTransOperTimeout = 60000 * 30 - DefaultServiceInvokeTimeout = 60000 * 5 -) - -type DefaultStateMachineConfig struct { - // Configuration - transOperationTimeout int - serviceInvokeTimeout int - charset string - defaultTenantId string - - // Components - - // Store related components - stateLogRepository store.StateLogRepository - stateLogStore store.StateLogStore - stateLangStore store.StateLangStore - stateMachineRepository store.StateMachineRepository - - // Expression related components - expressionFactoryManager expr.ExpressionFactoryManager - expressionResolver expr.ExpressionResolver - - // Invoker related components - serviceInvokerManager invoker.ServiceInvokerManager - scriptInvokerManager invoker.ScriptInvokerManager - - // Other components - statusDecisionStrategy status_decision.StatusDecisionStrategy - seqGenerator sequence.SeqGenerator -} - -func (c *DefaultStateMachineConfig) StateLogRepository() store.StateLogRepository { - return c.stateLogRepository -} - -func (c *DefaultStateMachineConfig) StateMachineRepository() store.StateMachineRepository { - return c.stateMachineRepository -} - -func (c *DefaultStateMachineConfig) StateLogStore() store.StateLogStore { - return c.stateLogStore -} - -func (c *DefaultStateMachineConfig) StateLangStore() store.StateLangStore { - return c.stateLangStore -} - -func (c *DefaultStateMachineConfig) ExpressionFactoryManager() expr.ExpressionFactoryManager { - return c.expressionFactoryManager -} - -func (c *DefaultStateMachineConfig) ExpressionResolver() expr.ExpressionResolver { - return c.expressionResolver -} - -func (c *DefaultStateMachineConfig) SeqGenerator() sequence.SeqGenerator { - return c.seqGenerator -} - -func (c *DefaultStateMachineConfig) StatusDecisionStrategy() status_decision.StatusDecisionStrategy { - return c.statusDecisionStrategy -} - -func (c *DefaultStateMachineConfig) EventPublisher() events.EventPublisher { - //TODO implement me - panic("implement me") -} - -func (c *DefaultStateMachineConfig) AsyncEventPublisher() events.EventPublisher { - //TODO implement me - panic("implement me") -} - -func (c *DefaultStateMachineConfig) ServiceInvokerManager() invoker.ServiceInvokerManager { - return c.serviceInvokerManager -} - -func (c *DefaultStateMachineConfig) ScriptInvokerManager() invoker.ScriptInvokerManager { - return c.scriptInvokerManager -} - -func (c *DefaultStateMachineConfig) CharSet() string { - return c.charset -} - -func (c *DefaultStateMachineConfig) SetCharSet(charset string) { - c.charset = charset -} - -func (c *DefaultStateMachineConfig) DefaultTenantId() string { - return c.defaultTenantId -} - -func (c *DefaultStateMachineConfig) TransOperationTimeout() int { - return c.transOperationTimeout -} - -func (c *DefaultStateMachineConfig) ServiceInvokeTimeout() int { - return c.serviceInvokeTimeout -} - -func NewDefaultStateMachineConfig() *DefaultStateMachineConfig { - c := &DefaultStateMachineConfig{ - transOperationTimeout: DefaultTransOperTimeout, - serviceInvokeTimeout: DefaultServiceInvokeTimeout, - charset: "UTF-8", - defaultTenantId: "000001", - } - return c -} diff --git a/pkg/saga/statemachine/engine/events/event_bus.go b/pkg/saga/statemachine/engine/events/event_bus.go deleted file mode 100644 index 77a1e616..00000000 --- a/pkg/saga/statemachine/engine/events/event_bus.go +++ /dev/null @@ -1,49 +0,0 @@ -package events - -import ( - "context" -) - -type EventBus interface { - Offer(ctx context.Context, event Event) (bool, error) - - RegisterEventConsumer(consumer EventConsumer) -} - -type BaseEventBus struct { - eventConsumerList []EventConsumer -} - -func (b *BaseEventBus) RegisterEventConsumer(consumer EventConsumer) { - if b.eventConsumerList == nil { - b.eventConsumerList = make([]EventConsumer, 0) - } - b.eventConsumerList = append(b.eventConsumerList, consumer) -} - -func (b *BaseEventBus) GetEventConsumerList(event Event) []EventConsumer { - var acceptedConsumerList = make([]EventConsumer, 0) - for i := range b.eventConsumerList { - eventConsumer := b.eventConsumerList[i] - if eventConsumer.Accept(event) { - acceptedConsumerList = append(acceptedConsumerList, eventConsumer) - } - } - return acceptedConsumerList -} - -type DirectEventBus struct { - BaseEventBus -} - -func (d DirectEventBus) Offer(ctx context.Context, event Event) (bool, error) { - eventConsumerList := d.GetEventConsumerList(event) - if len(eventConsumerList) == 0 { - //TODO logger - return false, nil - } - - //processContext, _ := event.(process_ctrl.ProcessContext) - - return true, nil -} diff --git a/pkg/saga/statemachine/engine/exception/exception.go b/pkg/saga/statemachine/engine/exception/exception.go new file mode 100644 index 00000000..f5c58239 --- /dev/null +++ b/pkg/saga/statemachine/engine/exception/exception.go @@ -0,0 +1,54 @@ +package exception + +import "github.com/seata/seata-go/pkg/util/errors" + +type EngineExecutionException struct { + errors.SeataError + stateName string + stateMachineName string + stateMachineInstanceId string + stateInstanceId string +} + +func NewEngineExecutionException(code errors.TransactionErrorCode, msg string, parent error) *EngineExecutionException { + seataError := errors.New(code, msg, parent) + return &EngineExecutionException{ + SeataError: *seataError, + } +} + +func (e *EngineExecutionException) StateName() string { + return e.stateName +} + +func (e *EngineExecutionException) SetStateName(stateName string) { + e.stateName = stateName +} + +func (e *EngineExecutionException) StateMachineName() string { + return e.stateMachineName +} + +func (e *EngineExecutionException) SetStateMachineName(stateMachineName string) { + e.stateMachineName = stateMachineName +} + +func (e *EngineExecutionException) StateMachineInstanceId() string { + return e.stateMachineInstanceId +} + +func (e *EngineExecutionException) SetStateMachineInstanceId(stateMachineInstanceId string) { + e.stateMachineInstanceId = stateMachineInstanceId +} + +func (e *EngineExecutionException) StateInstanceId() string { + return e.stateInstanceId +} + +func (e *EngineExecutionException) SetStateInstanceId(stateInstanceId string) { + e.stateInstanceId = stateInstanceId +} + +type ForwardInvalidException struct { + EngineExecutionException +} diff --git a/pkg/saga/statemachine/engine/expr/expression.go b/pkg/saga/statemachine/engine/expr/expression.go index 9706cda5..767c49da 100644 --- a/pkg/saga/statemachine/engine/expr/expression.go +++ b/pkg/saga/statemachine/engine/expr/expression.go @@ -1,10 +1,93 @@ package expr +import ( + "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence" + "strings" +) + +const DefaultExpressionType string = "Default" + type ExpressionResolver interface { + Expression(expressionStr string) Expression + ExpressionFactoryManager() ExpressionFactoryManager + SetExpressionFactoryManager(expressionFactoryManager ExpressionFactoryManager) } type Expression interface { + Value(elContext any) any + SetValue(value any, elContext any) + ExpressionString() string +} + +type ExpressionFactory interface { + CreateExpression(expression string) Expression } type ExpressionFactoryManager struct { + expressionFactoryMap map[string]ExpressionFactory +} + +func NewExpressionFactoryManager() *ExpressionFactoryManager { + return &ExpressionFactoryManager{ + expressionFactoryMap: make(map[string]ExpressionFactory), + } +} + +func (e *ExpressionFactoryManager) GetExpressionFactory(expressionType string) ExpressionFactory { + if strings.TrimSpace(expressionType) == "" { + expressionType = DefaultExpressionType + } + return e.expressionFactoryMap[expressionType] +} + +func (e *ExpressionFactoryManager) SetExpressionFactoryMap(expressionFactoryMap map[string]ExpressionFactory) { + for k, v := range expressionFactoryMap { + e.expressionFactoryMap[k] = v + } +} + +func (e *ExpressionFactoryManager) PutExpressionFactory(expressionType string, factory ExpressionFactory) { + e.expressionFactoryMap[expressionType] = factory +} + +type SequenceExpression struct { + seqGenerator sequence.SeqGenerator + entity string + rule string +} + +func (s *SequenceExpression) SeqGenerator() sequence.SeqGenerator { + return s.seqGenerator +} + +func (s *SequenceExpression) SetSeqGenerator(seqGenerator sequence.SeqGenerator) { + s.seqGenerator = seqGenerator +} + +func (s *SequenceExpression) Entity() string { + return s.entity +} + +func (s *SequenceExpression) SetEntity(entity string) { + s.entity = entity +} + +func (s *SequenceExpression) Rule() string { + return s.rule +} + +func (s *SequenceExpression) SetRule(rule string) { + s.rule = rule +} + +func (s SequenceExpression) Value(elContext any) any { + return s.seqGenerator.GenerateId(s.entity, s.rule) +} + +func (s SequenceExpression) SetValue(value any, elContext any) { + +} + +func (s SequenceExpression) ExpressionString() string { + return s.entity + "|" + s.rule } diff --git a/pkg/saga/statemachine/engine/process_ctrl/instruction.go b/pkg/saga/statemachine/engine/process_ctrl/instruction.go deleted file mode 100644 index 2f7c0a54..00000000 --- a/pkg/saga/statemachine/engine/process_ctrl/instruction.go +++ /dev/null @@ -1,24 +0,0 @@ -package process_ctrl - -import ( - "github.com/seata/seata-go/pkg/saga/statemachine/statelang" -) - -type Instruction interface { -} - -type StateInstruction struct { - StateName string - StateMachineName string - TenantId string - End bool -} - -func (s StateInstruction) GetState(context ProcessContext) (statelang.State, error) { - //TODO implement me - panic("implement me") -} - -func NewStateInstruction(stateMachineName string, tenantId string) *StateInstruction { - return &StateInstruction{StateMachineName: stateMachineName, TenantId: tenantId} -} diff --git a/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go b/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go deleted file mode 100644 index aae9fe89..00000000 --- a/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go +++ /dev/null @@ -1,125 +0,0 @@ -package engine - -import ( - "context" - "time" - - "github.com/pkg/errors" - "github.com/seata/seata-go/pkg/saga/statemachine/constant" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/events" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" - "github.com/seata/seata-go/pkg/saga/statemachine/statelang" -) - -type ProcessCtrlStateMachineEngine struct { - StateMachineConfig StateMachineConfig -} - -func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) { - return p.startInternal(ctx, stateMachineName, tenantId, "", startParams, false, nil) -} - -func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, stateMachineName string, tenantId string, businessKey string, startParams map[string]interface{}, async bool, callback CallBack) (statelang.StateMachineInstance, error) { - if tenantId == "" { - tenantId = p.StateMachineConfig.DefaultTenantId() - } - - stateMachineInstance, err := p.createMachineInstance(stateMachineName, tenantId, businessKey, startParams) - if err != nil { - return nil, err - } - - // Build the process_ctrl context. - processContextBuilder := NewProcessContextBuilder(). - WithProcessType(process_ctrl.StateLang). - WithOperationName(constant.OperationNameStart). - WithAsyncCallback(callback). - WithInstruction(process_ctrl.NewStateInstruction(stateMachineName, tenantId)). - WithStateMachineInstance(stateMachineInstance). - WithStateMachineConfig(p.StateMachineConfig). - WithStateMachineEngine(p). - WithIsAsyncExecution(async) - - contextMap := p.copyMap(startParams) - - stateMachineInstance.SetContext(contextMap) - - processContext := processContextBuilder.WithStateMachineContextVariables(contextMap).Build() - - if stateMachineInstance.StateMachine().IsPersist() && p.StateMachineConfig.StateLogStore() != nil { - err := p.StateMachineConfig.StateLogStore().RecordStateMachineStarted(ctx, stateMachineInstance, processContext) - if err != nil { - return nil, err - } - } - - if stateMachineInstance.ID() == "" { - stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(constant.SeqEntityStateMachineInst, "")) - } - - var eventPublisher events.EventPublisher - if async { - eventPublisher = p.StateMachineConfig.AsyncEventPublisher() - } else { - eventPublisher = p.StateMachineConfig.EventPublisher() - } - - _, err = eventPublisher.PushEvent(ctx, processContext) - if err != nil { - return nil, err - } - - return stateMachineInstance, nil -} - -// copyMap not deep copy, so best practice: Don’t pass by reference -func (p ProcessCtrlStateMachineEngine) copyMap(startParams map[string]interface{}) map[string]interface{} { - copyMap := make(map[string]interface{}, len(startParams)) - for k, v := range startParams { - copyMap[k] = v - } - return copyMap -} - -func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName string, tenantId string, businessKey string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) { - stateMachine, err := p.StateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(stateMachineName, tenantId) - if err != nil { - return nil, err - } - - if stateMachine == nil { - return nil, errors.New("StateMachine [" + stateMachineName + "] is not exists") - } - - stateMachineInstance := statelang.NewStateMachineInstanceImpl() - stateMachineInstance.SetStateMachine(stateMachine) - stateMachineInstance.SetTenantID(tenantId) - stateMachineInstance.SetBusinessKey(businessKey) - stateMachineInstance.SetStartParams(startParams) - if startParams != nil { - if businessKey != "" { - startParams[constant.VarNameBusinesskey] = businessKey - } - - if startParams[constant.VarNameParentId] != nil { - parentId, ok := startParams[constant.VarNameParentId].(string) - if !ok { - - } - stateMachineInstance.SetParentID(parentId) - delete(startParams, constant.VarNameParentId) - } - } - - stateMachineInstance.SetStatus(statelang.RU) - stateMachineInstance.SetRunning(true) - - now := time.Now() - stateMachineInstance.SetStartedTime(now) - stateMachineInstance.SetUpdatedTime(now) - return stateMachineInstance, nil -} - -func NewProcessCtrlStateMachineEngine(stateMachineConfig StateMachineConfig) *ProcessCtrlStateMachineEngine { - return &ProcessCtrlStateMachineEngine{StateMachineConfig: stateMachineConfig} -} diff --git a/pkg/saga/statemachine/engine/statemachine_engine.go b/pkg/saga/statemachine/engine/statemachine_engine.go deleted file mode 100644 index 6b3954c1..00000000 --- a/pkg/saga/statemachine/engine/statemachine_engine.go +++ /dev/null @@ -1,16 +0,0 @@ -package engine - -import ( - "context" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" - "github.com/seata/seata-go/pkg/saga/statemachine/statelang" -) - -type StateMachineEngine interface { - Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) -} - -type CallBack interface { - OnFinished(ctx context.Context, context process_ctrl.ProcessContext, stateMachineInstance statelang.StateMachineInstance) - OnError(ctx context.Context, context process_ctrl.ProcessContext, stateMachineInstance statelang.StateMachineInstance, err error) -} diff --git a/pkg/saga/statemachine/engine/statemachine_engine_test.go b/pkg/saga/statemachine/engine/statemachine_engine_test.go deleted file mode 100644 index c9ac2518..00000000 --- a/pkg/saga/statemachine/engine/statemachine_engine_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package engine - -import "testing" - -func TestEngine(t *testing.T) { - -} diff --git a/pkg/saga/statemachine/engine/status_decision/status_decision.go b/pkg/saga/statemachine/engine/status_decision/status_decision.go deleted file mode 100644 index 6def093e..00000000 --- a/pkg/saga/statemachine/engine/status_decision/status_decision.go +++ /dev/null @@ -1,4 +0,0 @@ -package status_decision - -type StatusDecisionStrategy interface { -} diff --git a/pkg/saga/statemachine/process_ctrl/handlers/service_task_state_handler.go b/pkg/saga/statemachine/process_ctrl/handlers/service_task_state_handler.go new file mode 100644 index 00000000..28d60d0a --- /dev/null +++ b/pkg/saga/statemachine/process_ctrl/handlers/service_task_state_handler.go @@ -0,0 +1,174 @@ +package handlers + +import ( + "context" + "errors" + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/core" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/exception" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state" + seataErrors "github.com/seata/seata-go/pkg/util/errors" + "github.com/seata/seata-go/pkg/util/log" +) + +type ServiceTaskStateHandler struct { + interceptors []core.StateHandlerInterceptor +} + +func NewServiceTaskStateHandler() *ServiceTaskStateHandler { + return &ServiceTaskStateHandler{} +} + +func (s *ServiceTaskStateHandler) State() string { + return constant.StateTypeServiceTask +} + +func (s *ServiceTaskStateHandler) Process(ctx context.Context, processContext core.ProcessContext) error { + stateInstruction, ok := processContext.GetInstruction().(core.StateInstruction) + if !ok { + return errors.New("invalid state instruction from processContext") + } + stateInterface, err := stateInstruction.GetState(processContext) + if err != nil { + return err + } + serviceTaskStateImpl, ok := stateInterface.(*state.ServiceTaskStateImpl) + + serviceName := serviceTaskStateImpl.ServiceName() + methodName := serviceTaskStateImpl.ServiceMethod() + stateInstance, ok := processContext.GetVariable(constant.VarNameStateInst).(statelang.StateInstance) + if !ok { + return errors.New("invalid state instance type from processContext") + } + + // invoke service task and record + var result any + var resultErr error + handleResultErr := func(err error) { + log.Error("<<<<<<<<<<<<<<<<<<<<<< State[%s], ServiceName[%s], Method[%s] Execute failed.", + serviceTaskStateImpl.Name(), serviceName, methodName, err) + + hierarchicalProcessContext, ok := processContext.(core.HierarchicalProcessContext) + if !ok { + return + } + hierarchicalProcessContext.SetVariable(constant.VarNameCurrentException, err) + core.HandleException(processContext, serviceTaskStateImpl.AbstractTaskState, err) + } + + input, ok := processContext.GetVariable(constant.VarNameInputParams).([]any) + if !ok { + handleResultErr(errors.New("invalid input params type from processContext")) + return nil + } + + stateInstance.SetStatus(statelang.RU) + log.Debugf(">>>>>>>>>>>>>>>>>>>>>> Start to execute State[%s], ServiceName[%s], Method[%s], Input:%s", + serviceTaskStateImpl.Name(), serviceName, methodName, input) + + if _, ok := stateInterface.(state.CompensateSubStateMachineState); ok { + // If it is the compensation of the subState machine, + // directly call the state machine's compensate method + stateMachineEngine, ok := processContext.GetVariable(constant.VarNameStateMachineEngine).(core.StateMachineEngine) + if !ok { + handleResultErr(errors.New("invalid stateMachineEngine type from processContext")) + return nil + } + + result, resultErr = s.compensateSubStateMachine(ctx, processContext, serviceTaskStateImpl, input, + stateInstance, stateMachineEngine) + if resultErr != nil { + handleResultErr(resultErr) + return nil + } + } else { + stateMachineConfig, ok := processContext.GetVariable(constant.VarNameStateMachineConfig).(core.StateMachineConfig) + if !ok { + handleResultErr(errors.New("invalid stateMachineConfig type from processContext")) + return nil + } + + serviceInvoker := stateMachineConfig.ServiceInvokerManager().ServiceInvoker(serviceTaskStateImpl.ServiceType()) + if serviceInvoker == nil { + resultErr = exception.NewEngineExecutionException(seataErrors.ObjectNotExists, + "No such ServiceInvoker["+serviceTaskStateImpl.ServiceType()+"]", nil) + handleResultErr(resultErr) + return nil + } + + result, resultErr = serviceInvoker.Invoke(ctx, input, serviceTaskStateImpl) + if resultErr != nil { + handleResultErr(resultErr) + return nil + } + } + + log.Debugf("<<<<<<<<<<<<<<<<<<<<<< State[%s], ServiceName[%s], Method[%s] Execute finish. result: %s", + serviceTaskStateImpl.Name(), serviceName, methodName, result) + + if result != nil { + stateInstance.SetOutputParams(result) + hierarchicalProcessContext, ok := processContext.(core.HierarchicalProcessContext) + if !ok { + handleResultErr(errors.New("invalid hierarchical process context type from processContext")) + return nil + } + + hierarchicalProcessContext.SetVariable(constant.VarNameOutputParams, result) + } + + return nil +} + +func (s *ServiceTaskStateHandler) StateHandlerInterceptorList() []core.StateHandlerInterceptor { + return s.interceptors +} + +func (s *ServiceTaskStateHandler) RegistryStateHandlerInterceptor(stateHandlerInterceptor core.StateHandlerInterceptor) { + s.interceptors = append(s.interceptors, stateHandlerInterceptor) +} + +func (s *ServiceTaskStateHandler) compensateSubStateMachine(ctx context.Context, processContext core.ProcessContext, + serviceTaskState state.ServiceTaskState, input any, instance statelang.StateInstance, + machineEngine core.StateMachineEngine) (any, error) { + subStateMachineParentId, ok := processContext.GetVariable(serviceTaskState.Name() + constant.VarNameSubMachineParentId).(string) + if !ok { + return nil, errors.New("invalid subStateMachineParentId type from processContext") + } + + if subStateMachineParentId == "" { + return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists, + "sub statemachine parentId is required", nil) + } + + stateMachineConfig := processContext.GetVariable(constant.VarNameStateMachineConfig).(core.StateMachineConfig) + subInst, err := stateMachineConfig.StateLogStore().GetStateMachineInstanceByParentId(subStateMachineParentId) + if err != nil { + return nil, err + } + + if subInst == nil || len(subInst) == 0 { + return nil, exception.NewEngineExecutionException(seataErrors.ObjectNotExists, + "cannot find sub statemachine instance by parentId:"+subStateMachineParentId, nil) + } + + subStateMachineInstId := subInst[0].ID() + log.Debugf(">>>>>>>>>>>>>>>>>>>>>> Start to compensate sub statemachine [id:%s]", subStateMachineInstId) + + startParams := make(map[string]any) + + if inputList, ok := input.([]any); ok { + if len(inputList) > 0 { + startParams = inputList[0].(map[string]any) + } + } else if inputMap, ok := input.(map[string]any); ok { + startParams = inputMap + } + + compensateInst, err := machineEngine.Compensate(ctx, subStateMachineInstId, startParams) + instance.SetStatus(compensateInst.CompensationStatus()) + log.Debugf("<<<<<<<<<<<<<<<<<<<<<< Compensate sub statemachine [id:%s] finished with status[%s], "+"compensateState[%s]", + subStateMachineInstId, compensateInst.Status(), compensateInst.CompensationStatus()) + return compensateInst.EndParams(), nil +} diff --git a/pkg/saga/statemachine/engine/process_ctrl/process_type.go b/pkg/saga/statemachine/process_ctrl/process/process_type.go similarity index 82% rename from pkg/saga/statemachine/engine/process_ctrl/process_type.go rename to pkg/saga/statemachine/process_ctrl/process/process_type.go index 14027a28..b1a8e030 100644 --- a/pkg/saga/statemachine/engine/process_ctrl/process_type.go +++ b/pkg/saga/statemachine/process_ctrl/process/process_type.go @@ -1,4 +1,4 @@ -package process_ctrl +package process type ProcessType string diff --git a/pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go b/pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go index dc466da5..4de6e5de 100644 --- a/pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go +++ b/pkg/saga/statemachine/statelang/parser/sub_state_machine_parser.go @@ -64,7 +64,7 @@ func NewCompensateSubStateMachineStateParser() *CompensateSubStateMachineStatePa } func (c CompensateSubStateMachineStateParser) StateType() string { - return constant.CompensateSubMachine + return constant.StateTypeCompensateSubMachine } func (c CompensateSubStateMachineStateParser) Parse(stateName string, stateMap map[string]interface{}) (statelang.State, error) { diff --git a/pkg/saga/statemachine/statelang/state/loop_start_state.go b/pkg/saga/statemachine/statelang/state/loop_start_state.go new file mode 100644 index 00000000..e059431d --- /dev/null +++ b/pkg/saga/statemachine/statelang/state/loop_start_state.go @@ -0,0 +1,22 @@ +package state + +import ( + "github.com/seata/seata-go/pkg/saga/statemachine/constant" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" +) + +type LoopStartState interface { + statelang.State +} + +type LoopStartStateImpl struct { + *statelang.BaseState +} + +func NewLoopStartStateImpl() *LoopStartStateImpl { + baseState := statelang.NewBaseState() + baseState.SetType(constant.StateTypeLoopStart) + return &LoopStartStateImpl{ + BaseState: baseState, + } +} diff --git a/pkg/saga/statemachine/statelang/state/sub_state_machine.go b/pkg/saga/statemachine/statelang/state/sub_state_machine.go index bf0d96d5..78683a0a 100644 --- a/pkg/saga/statemachine/statelang/state/sub_state_machine.go +++ b/pkg/saga/statemachine/statelang/state/sub_state_machine.go @@ -56,7 +56,7 @@ func NewCompensateSubStateMachineStateImpl() *CompensateSubStateMachineStateImpl ServiceTaskStateImpl: NewServiceTaskStateImpl(), hashcode: uuid.String(), } - c.SetType(constant.CompensateSubMachine) + c.SetType(constant.StateTypeCompensateSubMachine) return c } diff --git a/pkg/saga/statemachine/statelang/state/task_state.go b/pkg/saga/statemachine/statelang/state/task_state.go index 734680ce..bc9880ac 100644 --- a/pkg/saga/statemachine/statelang/state/task_state.go +++ b/pkg/saga/statemachine/statelang/state/task_state.go @@ -38,7 +38,7 @@ type Loop interface { type ExceptionMatch interface { Exceptions() []string - + // TODO: go dose not support get reflect.Type by string, not use it now ExceptionTypes() []reflect.Type SetExceptionTypes(ExceptionTypes []reflect.Type) @@ -79,7 +79,9 @@ type AbstractTaskState struct { loop Loop catches []ExceptionMatch input []interface{} + inputExpressions []interface{} output map[string]interface{} + outputExpressions map[string]interface{} compensatePersistModeUpdate bool retryPersistModeUpdate bool forCompensation bool @@ -96,6 +98,22 @@ func NewAbstractTaskState() *AbstractTaskState { } } +func (a *AbstractTaskState) InputExpressions() []interface{} { + return a.inputExpressions +} + +func (a *AbstractTaskState) SetInputExpressions(inputExpressions []interface{}) { + a.inputExpressions = inputExpressions +} + +func (a *AbstractTaskState) OutputExpressions() map[string]interface{} { + return a.outputExpressions +} + +func (a *AbstractTaskState) SetOutputExpressions(outputExpressions map[string]interface{}) { + a.outputExpressions = outputExpressions +} + func (a *AbstractTaskState) Input() []interface{} { return a.input } diff --git a/pkg/saga/statemachine/statelang/statemachine_instance.go b/pkg/saga/statemachine/statelang/statemachine_instance.go index 399d222b..9da71ee2 100644 --- a/pkg/saga/statemachine/statelang/statemachine_instance.go +++ b/pkg/saga/statemachine/statelang/statemachine_instance.go @@ -71,9 +71,9 @@ type StateMachineInstance interface { SetBusinessKey(businessKey string) - Error() error + Exception() error - SetError(err error) + SetException(err error) StartParams() map[string]interface{} @@ -83,6 +83,8 @@ type StateMachineInstance interface { SetEndParams(endParams map[string]interface{}) + Context() map[string]interface{} + PutContext(key string, value interface{}) SetContext(context map[string]interface{}) @@ -115,7 +117,7 @@ type StateMachineInstanceImpl struct { startedTime time.Time endTime time.Time updatedTime time.Time - err error + exception error serializedError interface{} endParams map[string]interface{} serializedEndParams interface{} @@ -247,12 +249,12 @@ func (s *StateMachineInstanceImpl) SetBusinessKey(businessKey string) { s.businessKey = businessKey } -func (s *StateMachineInstanceImpl) Error() error { - return s.err +func (s *StateMachineInstanceImpl) Exception() error { + return s.exception } -func (s *StateMachineInstanceImpl) SetError(err error) { - s.err = err +func (s *StateMachineInstanceImpl) SetException(err error) { + s.exception = err } func (s *StateMachineInstanceImpl) StartParams() map[string]interface{} { @@ -271,6 +273,10 @@ func (s *StateMachineInstanceImpl) SetEndParams(endParams map[string]interface{} s.endParams = endParams } +func (s *StateMachineInstanceImpl) Context() map[string]interface{} { + return s.context +} + func (s *StateMachineInstanceImpl) PutContext(key string, value interface{}) { s.contextMutex.Lock() defer s.contextMutex.Unlock() diff --git a/pkg/saga/statemachine/engine/store/db/db.go b/pkg/saga/statemachine/store/db/db.go similarity index 100% rename from pkg/saga/statemachine/engine/store/db/db.go rename to pkg/saga/statemachine/store/db/db.go diff --git a/pkg/saga/statemachine/engine/store/db/db_test.go b/pkg/saga/statemachine/store/db/db_test.go similarity index 100% rename from pkg/saga/statemachine/engine/store/db/db_test.go rename to pkg/saga/statemachine/store/db/db_test.go diff --git a/pkg/saga/statemachine/engine/store/db/statelang.go b/pkg/saga/statemachine/store/db/statelang.go similarity index 100% rename from pkg/saga/statemachine/engine/store/db/statelang.go rename to pkg/saga/statemachine/store/db/statelang.go diff --git a/pkg/saga/statemachine/engine/store/db/statelang_test.go b/pkg/saga/statemachine/store/db/statelang_test.go similarity index 100% rename from pkg/saga/statemachine/engine/store/db/statelang_test.go rename to pkg/saga/statemachine/store/db/statelang_test.go diff --git a/pkg/saga/statemachine/engine/store/db/statelog.go b/pkg/saga/statemachine/store/db/statelog.go similarity index 98% rename from pkg/saga/statemachine/engine/store/db/statelog.go rename to pkg/saga/statemachine/store/db/statelog.go index 26b89ed6..23c3b551 100644 --- a/pkg/saga/statemachine/engine/store/db/statelog.go +++ b/pkg/saga/statemachine/store/db/statelog.go @@ -6,7 +6,7 @@ import ( "fmt" "github.com/pkg/errors" "github.com/seata/seata-go/pkg/saga/statemachine/constant" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/core" "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence" "github.com/seata/seata-go/pkg/saga/statemachine/engine/serializer" "github.com/seata/seata-go/pkg/saga/statemachine/statelang" @@ -89,7 +89,7 @@ func NewStateLogStore(db *sql.DB, tablePrefix string) *StateLogStore { } func (s *StateLogStore) RecordStateMachineStarted(ctx context.Context, machineInstance statelang.StateMachineInstance, - context process_ctrl.ProcessContext) error { + context core.ProcessContext) error { if machineInstance == nil { return nil } @@ -133,15 +133,15 @@ func (s *StateLogStore) RecordStateMachineStarted(ctx context.Context, machineIn } func (s *StateLogStore) RecordStateMachineFinished(ctx context.Context, machineInstance statelang.StateMachineInstance, - context process_ctrl.ProcessContext) error { + context core.ProcessContext) error { if machineInstance == nil { return nil } endParams := machineInstance.EndParams() - if statelang.SU == machineInstance.Status() && machineInstance.Error() != nil { - machineInstance.SetError(nil) + if statelang.SU == machineInstance.Status() && machineInstance.Exception() != nil { + machineInstance.SetException(nil) } serializedEndParams, err := s.paramsSerializer.Serialize(endParams) @@ -149,7 +149,7 @@ func (s *StateLogStore) RecordStateMachineFinished(ctx context.Context, machineI return err } machineInstance.SetSerializedEndParams(serializedEndParams) - serializedError, err := s.errorSerializer.Serialize(machineInstance.Error()) + serializedError, err := s.errorSerializer.Serialize(machineInstance.Exception()) if err != nil { return err } @@ -171,7 +171,7 @@ func (s *StateLogStore) RecordStateMachineFinished(ctx context.Context, machineI } func (s *StateLogStore) RecordStateMachineRestarted(ctx context.Context, machineInstance statelang.StateMachineInstance, - context process_ctrl.ProcessContext) error { + context core.ProcessContext) error { if machineInstance == nil { return nil } @@ -190,7 +190,7 @@ func (s *StateLogStore) RecordStateMachineRestarted(ctx context.Context, machine } func (s *StateLogStore) RecordStateStarted(ctx context.Context, stateInstance statelang.StateInstance, - context process_ctrl.ProcessContext) error { + context core.ProcessContext) error { if stateInstance == nil { return nil } @@ -235,7 +235,7 @@ func (s *StateLogStore) RecordStateStarted(ctx context.Context, stateInstance st return nil } -func (s *StateLogStore) isUpdateMode(instance statelang.StateInstance, context process_ctrl.ProcessContext) bool { +func (s *StateLogStore) isUpdateMode(instance statelang.StateInstance, context core.ProcessContext) bool { //TODO implement me, add forward logic return false } @@ -293,7 +293,7 @@ func (s *StateLogStore) getIdIndex(stateInstanceId string, separator string) int } func (s *StateLogStore) RecordStateFinished(ctx context.Context, stateInstance statelang.StateInstance, - context process_ctrl.ProcessContext) error { + context core.ProcessContext) error { if stateInstance == nil { return nil } @@ -372,7 +372,7 @@ func (s *StateLogStore) deserializeStateMachineParamsAndException(stateMachineIn if err != nil { return err } - stateMachineInstance.SetError(deserializedError) + stateMachineInstance.SetException(deserializedError) } serializedStartParams := stateMachineInstance.SerializedStartParams() diff --git a/pkg/saga/statemachine/engine/store/db/statelog_test.go b/pkg/saga/statemachine/store/db/statelog_test.go similarity index 94% rename from pkg/saga/statemachine/engine/store/db/statelog_test.go rename to pkg/saga/statemachine/store/db/statelog_test.go index 9044e6df..43767476 100644 --- a/pkg/saga/statemachine/engine/store/db/statelog_test.go +++ b/pkg/saga/statemachine/store/db/statelog_test.go @@ -5,19 +5,19 @@ import ( "fmt" "github.com/pkg/errors" "github.com/seata/seata-go/pkg/saga/statemachine/constant" - "github.com/seata/seata-go/pkg/saga/statemachine/engine" - "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/core" + "github.com/seata/seata-go/pkg/saga/statemachine/process_ctrl/process" "github.com/seata/seata-go/pkg/saga/statemachine/statelang" "github.com/stretchr/testify/assert" "testing" "time" ) -func mockProcessContext(stateMachineName string, stateMachineInstance statelang.StateMachineInstance) process_ctrl.ProcessContext { - ctx := engine.NewProcessContextBuilder(). - WithProcessType(process_ctrl.StateLang). +func mockProcessContext(stateMachineName string, stateMachineInstance statelang.StateMachineInstance) core.ProcessContext { + ctx := core.NewProcessContextBuilder(). + WithProcessType(process.StateLang). WithOperationName(constant.OperationNameStart). - WithInstruction(process_ctrl.NewStateInstruction(stateMachineName, "000001")). + WithInstruction(core.NewStateInstruction(stateMachineName, "000001")). WithStateMachineInstance(stateMachineInstance). Build() return ctx @@ -55,7 +55,7 @@ func TestStateLogStore_RecordStateMachineStarted(t *testing.T) { assert.Equal(t, expected.ID(), actual.ID()) assert.Equal(t, expected.MachineID(), actual.MachineID()) assert.Equal(t, fmt.Sprint(expected.StartParams()), fmt.Sprint(actual.StartParams())) - assert.Nil(t, actual.Error()) + assert.Nil(t, actual.Exception()) assert.Nil(t, actual.SerializedError()) assert.Equal(t, expected.Status(), actual.Status()) assert.Equal(t, expected.StartedTime().UnixNano(), actual.StartedTime().UnixNano()) @@ -73,7 +73,7 @@ func TestStateLogStore_RecordStateMachineFinished(t *testing.T) { err := stateLogStore.RecordStateMachineStarted(context.Background(), expected, ctx) assert.Nil(t, err) expected.SetEndParams(map[string]any{"end": 100}) - expected.SetError(errors.New("this is a test error")) + expected.SetException(errors.New("this is a test error")) expected.SetStatus(statelang.FA) expected.SetEndTime(time.Now()) expected.SetRunning(false) @@ -86,7 +86,7 @@ func TestStateLogStore_RecordStateMachineFinished(t *testing.T) { assert.Equal(t, expected.ID(), actual.ID()) assert.Equal(t, expected.MachineID(), actual.MachineID()) assert.Equal(t, fmt.Sprint(expected.StartParams()), fmt.Sprint(actual.StartParams())) - assert.Equal(t, "this is a test error", actual.Error().Error()) + assert.Equal(t, "this is a test error", actual.Exception().Error()) assert.Equal(t, expected.Status(), actual.Status()) assert.Equal(t, expected.IsRunning(), actual.IsRunning()) assert.Equal(t, expected.StartedTime().UnixNano(), actual.StartedTime().UnixNano()) @@ -240,7 +240,7 @@ func TestStateLogStore_GetStateMachineInstanceByBusinessKey(t *testing.T) { assert.Equal(t, expected.ID(), actual.ID()) assert.Equal(t, expected.MachineID(), actual.MachineID()) assert.Equal(t, fmt.Sprint(expected.StartParams()), fmt.Sprint(actual.StartParams())) - assert.Nil(t, actual.Error()) + assert.Nil(t, actual.Exception()) assert.Nil(t, actual.SerializedError()) assert.Equal(t, expected.Status(), actual.Status()) assert.Equal(t, expected.StartedTime().UnixNano(), actual.StartedTime().UnixNano()) @@ -269,9 +269,9 @@ func TestStateLogStore_GetStateMachineInstanceByParentId(t *testing.T) { actual := actualList[0] assert.Equal(t, expected.ID(), actual.ID()) assert.Equal(t, expected.MachineID(), actual.MachineID()) - // no startParams, endParams and Error + // no startParams, endParams and Exception assert.NotEqual(t, fmt.Sprint(expected.StartParams()), fmt.Sprint(actual.StartParams())) - assert.Nil(t, actual.Error()) + assert.Nil(t, actual.Exception()) assert.Nil(t, actual.SerializedError()) assert.Equal(t, expected.Status(), actual.Status()) assert.Equal(t, expected.StartedTime().UnixNano(), actual.StartedTime().UnixNano()) diff --git a/pkg/saga/statemachine/store/repository/state_machine_repository.go b/pkg/saga/statemachine/store/repository/state_machine_repository.go new file mode 100644 index 00000000..8a62cc49 --- /dev/null +++ b/pkg/saga/statemachine/store/repository/state_machine_repository.go @@ -0,0 +1,34 @@ +package repository + +import ( + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "io" +) + +type StateMachineRepositoryImpl struct { +} + +func (s StateMachineRepositoryImpl) GetStateMachineById(stateMachineId string) (statelang.StateMachine, error) { + //TODO implement me + panic("implement me") +} + +func (s StateMachineRepositoryImpl) GetStateMachineByNameAndTenantId(stateMachineName string, tenantId string) (statelang.StateMachine, error) { + //TODO implement me + panic("implement me") +} + +func (s StateMachineRepositoryImpl) GetLastVersionStateMachine(stateMachineName string, tenantId string) (statelang.StateMachine, error) { + //TODO implement me + panic("implement me") +} + +func (s StateMachineRepositoryImpl) RegistryStateMachine(machine statelang.StateMachine) error { + //TODO implement me + panic("implement me") +} + +func (s StateMachineRepositoryImpl) RegistryStateMachineByReader(reader io.Reader) error { + //TODO implement me + panic("implement me") +} diff --git a/pkg/tm/transaction_executor_test.go b/pkg/tm/transaction_executor_test.go index f0e513ad..c9bd1084 100644 --- a/pkg/tm/transaction_executor_test.go +++ b/pkg/tm/transaction_executor_test.go @@ -361,7 +361,7 @@ func TestBeginNewGtx(t *testing.T) { assert.Equal(t, message.GlobalStatusBegin, *GetTxStatus(ctx)) // case return error - err := errors.New("Mock Error") + err := errors.New("Mock Exception") gomonkey.ApplyMethod(reflect.TypeOf(GetGlobalTransactionManager()), "Begin", func(_ *GlobalTransactionManager, ctx context.Context, timeout time.Duration) error { return err diff --git a/pkg/util/collection/collection.go b/pkg/util/collection/collection.go index 5149e1a2..4dc04f7c 100644 --- a/pkg/util/collection/collection.go +++ b/pkg/util/collection/collection.go @@ -17,7 +17,10 @@ package collection -import "strings" +import ( + "container/list" + "strings" +) const ( KvSplit = "=" @@ -81,3 +84,42 @@ func DecodeMap(data []byte) map[string]string { return ctxMap } + +type Stack struct { + list *list.List +} + +func NewStack() *Stack { + list := list.New() + return &Stack{list} +} + +func (stack *Stack) Push(value interface{}) { + stack.list.PushBack(value) +} + +func (stack *Stack) Pop() interface{} { + e := stack.list.Back() + if e != nil { + stack.list.Remove(e) + return e.Value + } + return nil +} + +func (stack *Stack) Peak() interface{} { + e := stack.list.Back() + if e != nil { + return e.Value + } + + return nil +} + +func (stack *Stack) Len() int { + return stack.list.Len() +} + +func (stack *Stack) Empty() bool { + return stack.list.Len() == 0 +} diff --git a/pkg/util/collection/collection_test.go b/pkg/util/collection/collection_test.go index 74df03cd..10a9e20f 100644 --- a/pkg/util/collection/collection_test.go +++ b/pkg/util/collection/collection_test.go @@ -57,3 +57,26 @@ func TestEncodeDecodeMap(t *testing.T) { }) } } + +func TestStack(t *testing.T) { + stack := NewStack() + stack.Push(1) + stack.Push(2) + stack.Push(3) + stack.Push(4) + + len := stack.Len() + if len != 4 { + t.Errorf("stack.Len() failed. Got %d, expected 4.", len) + } + + value := stack.Peak().(int) + if value != 4 { + t.Errorf("stack.Peak() failed. Got %d, expected 4.", value) + } + + value = stack.Pop().(int) + if value != 4 { + t.Errorf("stack.Pop() failed. Got %d, expected 4.", value) + } +} diff --git a/pkg/util/errors/code.go b/pkg/util/errors/code.go index dfe50205..9b15e72a 100644 --- a/pkg/util/errors/code.go +++ b/pkg/util/errors/code.go @@ -100,4 +100,15 @@ const ( // FencePhaseError have fence phase but is not illegal value FencePhaseError + + // ObjectNotExists object not exists + ObjectNotExists + // StateMachineInstanceNotExists State machine instance not exists + StateMachineInstanceNotExists + // ContextVariableReplayFailed Context variable replay failed + ContextVariableReplayFailed + // InvalidParameter Context variable replay failed + InvalidParameter + // OperationDenied Operation denied + OperationDenied )