Browse Source

feature: init saga framework (#635)

init saga framework
pull/647/head
wt_better GitHub 1 year ago
parent
commit
58fc3e13df
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 2508 additions and 0 deletions
  1. +31
    -0
      pkg/saga/readme.md
  2. +20
    -0
      pkg/saga/statemachine/engine/contant.go
  3. +4
    -0
      pkg/saga/statemachine/engine/events/event.go
  4. +49
    -0
      pkg/saga/statemachine/engine/events/event_bus.go
  5. +29
    -0
      pkg/saga/statemachine/engine/events/event_consumer.go
  6. +19
    -0
      pkg/saga/statemachine/engine/events/event_publisher.go
  7. +10
    -0
      pkg/saga/statemachine/engine/expr/expression.go
  8. +14
    -0
      pkg/saga/statemachine/engine/invoker/invoker.go
  9. +92
    -0
      pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go
  10. +25
    -0
      pkg/saga/statemachine/engine/process_ctrl/instruction/instruction.go
  11. +284
    -0
      pkg/saga/statemachine/engine/process_ctrl/process_context.go
  12. +7
    -0
      pkg/saga/statemachine/engine/process_ctrl/process_type.go
  13. +139
    -0
      pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go
  14. +124
    -0
      pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go
  15. +5
    -0
      pkg/saga/statemachine/engine/sequence/sequence.go
  16. +14
    -0
      pkg/saga/statemachine/engine/sequence/uuid.go
  17. +44
    -0
      pkg/saga/statemachine/engine/statemachine_config.go
  18. +16
    -0
      pkg/saga/statemachine/engine/statemachine_engine.go
  19. +4
    -0
      pkg/saga/statemachine/engine/status_decision/status_decision.go
  20. +60
    -0
      pkg/saga/statemachine/engine/store/statemachine_store.go
  21. +72
    -0
      pkg/saga/statemachine/statelang/parser/choice_state_json_parser.go
  22. +100
    -0
      pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go
  23. +13
    -0
      pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go
  24. +104
    -0
      pkg/saga/statemachine/statelang/parser/statemachine_parser.go
  25. +71
    -0
      pkg/saga/statemachine/statelang/state.go
  26. +74
    -0
      pkg/saga/statemachine/statelang/state/choice_state.go
  27. +30
    -0
      pkg/saga/statemachine/statelang/state/task_state.go
  28. +330
    -0
      pkg/saga/statemachine/statelang/state_instance.go
  29. +261
    -0
      pkg/saga/statemachine/statelang/statemachine.go
  30. +316
    -0
      pkg/saga/statemachine/statelang/statemachine_instance.go
  31. +147
    -0
      pkg/util/reflectx/unmarkshaler.go

+ 31
- 0
pkg/saga/readme.md View File

@@ -0,0 +1,31 @@

# seata saga

未来计划有三种使用方式

- 基于状态机引擎的 json
link: statemachine_engine#Start
- stream builder
stateMachine.serviceTask().build().Start
- 二阶段方式saga,类似tcc使用

上面1、2是以来[statemachine](statemachine),状态机引擎实现的,3相对比较独立。


状态机的实现在:saga-statemachine包中
其中[statelang](statemachine%2Fstatelang)是状态机语言的解析,目前实现的是json解析方式,状态机语言可以参考:
https://seata.io/docs/user/mode/saga

状态机json执行的入口类是:[statemachine_engine.go](statemachine%2Fengine%2Fstatemachine_engine.go)

下面简单说下engine中各个包的作用:
events:saga的是基于事件处理的,其中是event、eventBus的实现
expr:表达式声明、解析、执行
invoker:声明了serviceInvoker、scriptInvoker等接口、task调用管理、执行都在这个包中,例如httpInvoker
process_ctrl:状态机处理流程:上下文、执行、事件流转
sequence:分布式id
store:状态机存储接口、实现
status_decision:状态机状态决策




+ 20
- 0
pkg/saga/statemachine/engine/contant.go View File

@@ -0,0 +1,20 @@
package engine

const (
VarNameProcessType string = "_ProcessType_"
VarNameOperationName string = "_operation_name_"
OperationNameStart string = "start"
VarNameAsyncCallback string = "_async_callback_"
VarNameStateMachineInst string = "_current_statemachine_instance_"
VarNameStateMachine string = "_current_statemachine_"
VarNameStateMachineEngine string = "_current_statemachine_engine_"
VarNameStateMachineConfig string = "_statemachine_config_"
VarNameStateMachineContext string = "context"
VarNameIsAsyncExecution string = "_is_async_execution_"
VarNameStateInst string = "_current_state_instance_"
SeqEntityStateMachineInst string = "STATE_MACHINE_INST"
VarNameBusinesskey string = "_business_key_"
VarNameParentId string = "_parent_id_"
StateTypeServiceTask string = "ServiceTask"
StateTypeChoice string = "Choice"
)

+ 4
- 0
pkg/saga/statemachine/engine/events/event.go View File

@@ -0,0 +1,4 @@
package events

type Event interface {
}

+ 49
- 0
pkg/saga/statemachine/engine/events/event_bus.go View File

@@ -0,0 +1,49 @@
package events

import (
"context"
)

type EventBus interface {
Offer(ctx context.Context, event Event) (bool, error)

RegisterEventConsumer(consumer EventConsumer)
}

type BaseEventBus struct {
eventConsumerList []EventConsumer
}

func (b *BaseEventBus) RegisterEventConsumer(consumer EventConsumer) {
if b.eventConsumerList == nil {
b.eventConsumerList = make([]EventConsumer, 0)
}
b.eventConsumerList = append(b.eventConsumerList, consumer)
}

func (b *BaseEventBus) GetEventConsumerList(event Event) []EventConsumer {
var acceptedConsumerList = make([]EventConsumer, 0)
for i := range b.eventConsumerList {
eventConsumer := b.eventConsumerList[i]
if eventConsumer.Accept(event) {
acceptedConsumerList = append(acceptedConsumerList, eventConsumer)
}
}
return acceptedConsumerList
}

type DirectEventBus struct {
BaseEventBus
}

func (d DirectEventBus) Offer(ctx context.Context, event Event) (bool, error) {
eventConsumerList := d.GetEventConsumerList(event)
if len(eventConsumerList) == 0 {
//TODO logger
return false, nil
}

//processContext, _ := event.(process_ctrl.ProcessContext)

return true, nil
}

+ 29
- 0
pkg/saga/statemachine/engine/events/event_consumer.go View File

@@ -0,0 +1,29 @@
package events

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
)

type EventConsumer interface {
Accept(event Event) bool

Process(ctx context.Context, event Event) error
}

type ProcessCtrlEventConsumer struct {
}

func (p ProcessCtrlEventConsumer) Accept(event Event) bool {
if event == nil {
return false
}

_, ok := event.(process_ctrl.ProcessContext)
return ok
}

func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event) error {
//TODO implement me
panic("implement me")
}

+ 19
- 0
pkg/saga/statemachine/engine/events/event_publisher.go View File

@@ -0,0 +1,19 @@
package events

import "context"

type EventPublisher interface {
PushEvent(ctx context.Context, event Event) (bool, error)
}

type ProcessCtrlEventPublisher struct {
eventBus EventBus
}

func NewProcessCtrlEventPublisher(eventBus EventBus) *ProcessCtrlEventPublisher {
return &ProcessCtrlEventPublisher{eventBus: eventBus}
}

func (p ProcessCtrlEventPublisher) PushEvent(ctx context.Context, event Event) (bool, error) {
return p.eventBus.Offer(ctx, event)
}

+ 10
- 0
pkg/saga/statemachine/engine/expr/expression.go View File

@@ -0,0 +1,10 @@
package expr

type ExpressionResolver interface {
}

type Expression interface {
}

type ExpressionFactoryManager struct {
}

+ 14
- 0
pkg/saga/statemachine/engine/invoker/invoker.go View File

@@ -0,0 +1,14 @@
package invoker

type ScriptInvokerManager interface {
}

type ScriptInvoker interface {
}

type ServiceInvokerManager interface {
}

type ServiceInvoker interface {
Invoke()
}

+ 92
- 0
pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go View File

@@ -0,0 +1,92 @@
package process_ctrl

import (
"context"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/engine"
"sync"
)

type BusinessProcessor interface {
Process(ctx context.Context, processContext ProcessContext) error

Route(ctx context.Context, processContext ProcessContext) error
}

type DefaultBusinessProcessor struct {
processHandlers map[string]ProcessHandler
routerHandlers map[string]RouterHandler
mu sync.RWMutex
}

func (d *DefaultBusinessProcessor) RegistryProcessHandler(processType ProcessType, processHandler ProcessHandler) {
d.mu.Lock()
defer d.mu.Unlock()

d.processHandlers[string(processType)] = processHandler
}

func (d *DefaultBusinessProcessor) RegistryRouterHandler(processType ProcessType, routerHandler RouterHandler) {
d.mu.Lock()
defer d.mu.Unlock()

d.routerHandlers[string(processType)] = routerHandler
}

func (d *DefaultBusinessProcessor) Process(ctx context.Context, processContext ProcessContext) error {
processType := d.matchProcessType(processContext)

processHandler, err := d.getProcessHandler(processType)
if err != nil {
return err
}

return processHandler.Process(ctx, processContext)
}

func (d *DefaultBusinessProcessor) Route(ctx context.Context, processContext ProcessContext) error {
processType := d.matchProcessType(processContext)

routerHandler, err := d.getRouterHandler(processType)
if err != nil {
return err
}

return routerHandler.Route(ctx, processContext)
}

func (d *DefaultBusinessProcessor) getProcessHandler(processType ProcessType) (ProcessHandler, error) {
d.mu.RLock()
defer d.mu.RUnlock()
processHandler, ok := d.processHandlers[string(processType)]
if !ok {
return nil, errors.New("Cannot find process handler by type " + string(processType))
}
return processHandler, nil
}

func (d *DefaultBusinessProcessor) getRouterHandler(processType ProcessType) (RouterHandler, error) {
d.mu.RLock()
defer d.mu.RUnlock()
routerHandler, ok := d.routerHandlers[string(processType)]
if !ok {
return nil, errors.New("Cannot find router handler by type " + string(processType))
}
return routerHandler, nil
}

func (d *DefaultBusinessProcessor) matchProcessType(processContext ProcessContext) ProcessType {
ok := processContext.HasVariable(engine.VarNameProcessType)
if ok {
return processContext.GetVariable(engine.VarNameProcessType).(ProcessType)
}
return StateLang
}

type ProcessHandler interface {
Process(ctx context.Context, processContext ProcessContext) error
}

type RouterHandler interface {
Route(ctx context.Context, processContext ProcessContext) error
}

+ 25
- 0
pkg/saga/statemachine/engine/process_ctrl/instruction/instruction.go View File

@@ -0,0 +1,25 @@
package instruction

import (
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

type Instruction interface {
}

type StateInstruction struct {
StateName string
StateMachineName string
TenantId string
End bool
}

func (s StateInstruction) GetState(context process_ctrl.ProcessContext) (statelang.State, error) {
//TODO implement me
panic("implement me")
}

func NewStateInstruction(stateMachineName string, tenantId string) *StateInstruction {
return &StateInstruction{StateMachineName: stateMachineName, TenantId: tenantId}
}

+ 284
- 0
pkg/saga/statemachine/engine/process_ctrl/process_context.go View File

@@ -0,0 +1,284 @@
package process_ctrl

import (
"github.com/seata/seata-go/pkg/saga/statemachine/engine"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"sync"
)

type ProcessContext interface {
GetVariable(name string) interface{}

SetVariable(name string, value interface{})

GetVariables() map[string]interface{}

SetVariables(variables map[string]interface{})

RemoveVariable(name string) interface{}

HasVariable(name string) bool

GetInstruction() instruction.Instruction

SetInstruction(instruction instruction.Instruction)
}

type HierarchicalProcessContext interface {
ProcessContext

GetVariableLocally(name string) interface{}

SetVariableLocally(name string, value interface{})

GetVariablesLocally() map[string]interface{}

SetVariablesLocally(variables map[string]interface{})

RemoveVariableLocally(name string) interface{}

HasVariableLocally(name string) bool

ClearLocally()
}

type ProcessContextImpl struct {
parent ProcessContext
mu sync.RWMutex
mp map[string]interface{}
instruction instruction.Instruction
}

func (p *ProcessContextImpl) GetVariable(name string) interface{} {
p.mu.RLock()
defer p.mu.RUnlock()

value, ok := p.mp[name]
if ok {
return value
}

if p.parent != nil {
return p.parent.GetVariable(name)
}

return nil
}

func (p *ProcessContextImpl) SetVariable(name string, value interface{}) {
p.mu.Lock()
defer p.mu.Unlock()

_, ok := p.mp[name]
if ok {
p.mp[name] = value
} else {
if p.parent != nil {
p.parent.SetVariable(name, value)
} else {
p.mp[name] = value
}
}
}

func (p *ProcessContextImpl) GetVariables() map[string]interface{} {
p.mu.RLock()
defer p.mu.RUnlock()

newVariablesMap := make(map[string]interface{})
if p.parent != nil {
variables := p.parent.GetVariables()
for k, v := range variables {
newVariablesMap[k] = v
}
}

for k, v := range p.mp {
newVariablesMap[k] = v
}

return newVariablesMap
}

func (p *ProcessContextImpl) SetVariables(variables map[string]interface{}) {
for k, v := range variables {
p.SetVariable(k, v)
}
}

func (p *ProcessContextImpl) RemoveVariable(name string) interface{} {
p.mu.Lock()
defer p.mu.Unlock()

value, ok := p.mp[name]
if ok {
delete(p.mp, name)
return value
}

if p.parent != nil {
return p.parent.RemoveVariable(name)
}

return nil
}

func (p *ProcessContextImpl) HasVariable(name string) bool {
p.mu.RLock()
defer p.mu.RUnlock()

_, ok := p.mp[name]
if ok {
return true
}

if p.parent != nil {
return p.parent.HasVariable(name)
}

return false
}

func (p *ProcessContextImpl) GetInstruction() instruction.Instruction {
return p.instruction
}

func (p *ProcessContextImpl) SetInstruction(instruction instruction.Instruction) {
p.instruction = instruction
}

func (p *ProcessContextImpl) GetVariableLocally(name string) interface{} {
p.mu.RLock()
defer p.mu.RUnlock()

value, _ := p.mp[name]
return value
}

func (p *ProcessContextImpl) SetVariableLocally(name string, value interface{}) {
p.mu.Lock()
defer p.mu.Unlock()

p.mp[name] = value
}

func (p *ProcessContextImpl) GetVariablesLocally() map[string]interface{} {
p.mu.RLock()
defer p.mu.RUnlock()

newVariablesMap := make(map[string]interface{}, len(p.mp))
for k, v := range p.mp {
newVariablesMap[k] = v
}
return newVariablesMap
}

func (p *ProcessContextImpl) SetVariablesLocally(variables map[string]interface{}) {
for k, v := range variables {
p.SetVariableLocally(k, v)
}
}

func (p *ProcessContextImpl) RemoveVariableLocally(name string) interface{} {
p.mu.Lock()
defer p.mu.Unlock()

value, _ := p.mp[name]
delete(p.mp, name)
return value
}

func (p *ProcessContextImpl) HasVariableLocally(name string) bool {
p.mu.RLock()
defer p.mu.RUnlock()

_, ok := p.mp[name]
return ok
}

func (p *ProcessContextImpl) ClearLocally() {
p.mu.Lock()
defer p.mu.Unlock()

p.mp = map[string]interface{}{}
}

// ProcessContextBuilder process_ctrl builder
type ProcessContextBuilder struct {
processContext ProcessContext
}

func NewProcessContextBuilder() *ProcessContextBuilder {
processContextImpl := &ProcessContextImpl{}
return &ProcessContextBuilder{processContextImpl}
}

func (p *ProcessContextBuilder) WithProcessType(processType ProcessType) *ProcessContextBuilder {
p.processContext.SetVariable(engine.VarNameProcessType, processType)
return p
}

func (p *ProcessContextBuilder) WithOperationName(operationName string) *ProcessContextBuilder {
p.processContext.SetVariable(engine.VarNameOperationName, operationName)
return p
}

func (p *ProcessContextBuilder) WithAsyncCallback(callBack engine.CallBack) *ProcessContextBuilder {
if callBack != nil {
p.processContext.SetVariable(engine.VarNameAsyncCallback, callBack)
}

return p
}

func (p *ProcessContextBuilder) WithInstruction(instruction instruction.Instruction) *ProcessContextBuilder {
if instruction != nil {
p.processContext.SetInstruction(instruction)
}

return p
}

func (p *ProcessContextBuilder) WithStateMachineInstance(stateMachineInstance statelang.StateMachineInstance) *ProcessContextBuilder {
if stateMachineInstance != nil {
p.processContext.SetVariable(engine.VarNameStateMachineInst, stateMachineInstance)
p.processContext.SetVariable(engine.VarNameStateMachine, stateMachineInstance.StateMachine())
}

return p
}

func (p *ProcessContextBuilder) WithStateMachineEngine(stateMachineEngine engine.StateMachineEngine) *ProcessContextBuilder {
if stateMachineEngine != nil {
p.processContext.SetVariable(engine.VarNameStateMachineEngine, stateMachineEngine)
}

return p
}

func (p *ProcessContextBuilder) WithStateMachineConfig(stateMachineConfig engine.StateMachineConfig) *ProcessContextBuilder {
if stateMachineConfig != nil {
p.processContext.SetVariable(engine.VarNameStateMachineConfig, stateMachineConfig)
}

return p
}

func (p *ProcessContextBuilder) WithStateMachineContextVariables(contextMap map[string]interface{}) *ProcessContextBuilder {
if contextMap != nil {
p.processContext.SetVariable(engine.VarNameStateMachineContext, contextMap)
}

return p
}

func (p *ProcessContextBuilder) WithIsAsyncExecution(async bool) *ProcessContextBuilder {
p.processContext.SetVariable(engine.VarNameIsAsyncExecution, async)

return p
}

func (p *ProcessContextBuilder) Build() ProcessContext {
return p.processContext
}

+ 7
- 0
pkg/saga/statemachine/engine/process_ctrl/process_type.go View File

@@ -0,0 +1,7 @@
package process_ctrl

type ProcessType string

const (
StateLang ProcessType = "STATE_LANG" // SEATA State Language
)

+ 139
- 0
pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go View File

@@ -0,0 +1,139 @@
package process_ctrl

import (
"context"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction"
"sync"
)

type StateHandler interface {
State() string
ProcessHandler
}

type StateRouter interface {
State() string
RouterHandler
}

type InterceptAbleStateHandler interface {
StateHandler
StateHandlerInterceptorList() []StateHandlerInterceptor
RegistryStateHandlerInterceptor(stateHandlerInterceptor StateHandlerInterceptor)
}

type StateHandlerInterceptor interface {
PreProcess(ctx context.Context, processContext ProcessContext) error
PostProcess(ctx context.Context, processContext ProcessContext) error
}

type StateMachineProcessHandler struct {
mp map[string]StateHandler
mu sync.RWMutex
}

func NewStateMachineProcessHandler() *StateMachineProcessHandler {
return &StateMachineProcessHandler{
mp: make(map[string]StateHandler),
}
}

func (s *StateMachineProcessHandler) Process(ctx context.Context, processContext ProcessContext) error {
stateInstruction, _ := processContext.GetInstruction().(instruction.StateInstruction)

state, err := stateInstruction.GetState(processContext)
if err != nil {
return err
}

stateType := state.Type()
stateHandler := s.GetStateHandler(stateType)
if stateHandler == nil {
return errors.New("Not support [" + stateType + "] state handler")
}

interceptAbleStateHandler, ok := stateHandler.(InterceptAbleStateHandler)

var stateHandlerInterceptorList []StateHandlerInterceptor
if ok {
stateHandlerInterceptorList = interceptAbleStateHandler.StateHandlerInterceptorList()
}

if stateHandlerInterceptorList != nil && len(stateHandlerInterceptorList) > 0 {
for _, stateHandlerInterceptor := range stateHandlerInterceptorList {
err = stateHandlerInterceptor.PreProcess(ctx, processContext)
if err != nil {
return err
}
}
}

err = stateHandler.Process(ctx, processContext)
if err != nil {
return err
}

if stateHandlerInterceptorList != nil && len(stateHandlerInterceptorList) > 0 {
for _, stateHandlerInterceptor := range stateHandlerInterceptorList {
err = stateHandlerInterceptor.PostProcess(ctx, processContext)
if err != nil {
return err
}
}
}

return nil
}

func (s *StateMachineProcessHandler) GetStateHandler(stateType string) StateHandler {
s.mu.RLock()
defer s.mu.RUnlock()
return s.mp[stateType]
}

func (s *StateMachineProcessHandler) RegistryStateHandler(stateType string, stateHandler StateHandler) {
s.mu.Lock()
defer s.mu.Unlock()
if s.mp == nil {
s.mp = make(map[string]StateHandler)
}
s.mp[stateType] = stateHandler
}

type StateMachineRouterHandler struct {
mu sync.RWMutex
mp map[string]StateRouter
}

func (s *StateMachineRouterHandler) Route(ctx context.Context, processContext ProcessContext) error {
stateInstruction, _ := processContext.GetInstruction().(instruction.StateInstruction)

state, err := stateInstruction.GetState(processContext)
if err != nil {
return err
}

stateType := state.Type()
stateRouter := s.GetStateRouter(stateType)
if stateRouter == nil {
return errors.New("Not support [" + stateType + "] state router")
}

return stateRouter.Route(ctx, processContext)
}

func (s *StateMachineRouterHandler) GetStateRouter(stateType string) StateRouter {
s.mu.RLock()
defer s.mu.RUnlock()
return s.mp[stateType]
}

func (s *StateMachineRouterHandler) RegistryStateRouter(stateType string, stateRouter StateRouter) {
s.mu.Lock()
defer s.mu.Unlock()
if s.mp == nil {
s.mp = make(map[string]StateRouter)
}
s.mp[stateType] = stateRouter
}

+ 124
- 0
pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go View File

@@ -0,0 +1,124 @@
package engine

import (
"context"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"time"
)

type ProcessCtrlStateMachineEngine struct {
StateMachineConfig StateMachineConfig
}

func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) {
return p.startInternal(ctx, stateMachineName, tenantId, "", startParams, false, nil)
}

func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, stateMachineName string, tenantId string, businessKey string, startParams map[string]interface{}, async bool, callback CallBack) (statelang.StateMachineInstance, error) {
if tenantId == "" {
tenantId = p.StateMachineConfig.DefaultTenantId()
}

stateMachineInstance, err := p.createMachineInstance(stateMachineName, tenantId, businessKey, startParams)
if err != nil {
return nil, err
}

// Build the process_ctrl context.
processContextBuilder := process_ctrl.NewProcessContextBuilder().
WithProcessType(process_ctrl.StateLang).
WithOperationName(OperationNameStart).
WithAsyncCallback(callback).
WithInstruction(instruction.NewStateInstruction(stateMachineName, tenantId)).
WithStateMachineInstance(stateMachineInstance).
WithStateMachineConfig(p.StateMachineConfig).
WithStateMachineEngine(p).
WithIsAsyncExecution(async)

contextMap := p.copyMap(startParams)

stateMachineInstance.SetContext(contextMap)

processContext := processContextBuilder.WithStateMachineContextVariables(contextMap).Build()

if stateMachineInstance.StateMachine().IsPersist() && p.StateMachineConfig.StateLogStore() != nil {
err := p.StateMachineConfig.StateLogStore().RecordStateMachineStarted(ctx, stateMachineInstance, processContext)
if err != nil {
return nil, err
}
}

if stateMachineInstance.ID() == "" {
stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(SeqEntityStateMachineInst, ""))
}

var eventPublisher events.EventPublisher
if async {
eventPublisher = p.StateMachineConfig.AsyncEventPublisher()
} else {
eventPublisher = p.StateMachineConfig.EventPublisher()
}

_, err = eventPublisher.PushEvent(ctx, processContext)
if err != nil {
return nil, err
}

return stateMachineInstance, nil
}

// copyMap not deep copy, so best practice: Don’t pass by reference
func (p ProcessCtrlStateMachineEngine) copyMap(startParams map[string]interface{}) map[string]interface{} {
copyMap := make(map[string]interface{}, len(startParams))
for k, v := range startParams {
copyMap[k] = v
}
return copyMap
}

func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName string, tenantId string, businessKey string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) {
stateMachine, err := p.StateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(stateMachineName, tenantId)
if err != nil {
return nil, err
}

if stateMachine == nil {
return nil, errors.New("StateMachine [" + stateMachineName + "] is not exists")
}

stateMachineInstance := statelang.NewStateMachineInstanceImpl()
stateMachineInstance.SetStateMachine(stateMachine)
stateMachineInstance.SetTenantID(tenantId)
stateMachineInstance.SetBusinessKey(businessKey)
stateMachineInstance.SetStartParams(startParams)
if startParams != nil {
if businessKey != "" {
startParams[VarNameBusinesskey] = businessKey
}

if startParams[VarNameParentId] != nil {
parentId, ok := startParams[VarNameParentId].(string)
if !ok {

}
stateMachineInstance.SetParentID(parentId)
delete(startParams, VarNameParentId)
}
}

stateMachineInstance.SetStatus(statelang.RU)
stateMachineInstance.SetRunning(true)

now := time.Now()
stateMachineInstance.SetStartedTime(now)
stateMachineInstance.SetUpdatedTime(now)
return stateMachineInstance, nil
}

func NewProcessCtrlStateMachineEngine(stateMachineConfig StateMachineConfig) *ProcessCtrlStateMachineEngine {
return &ProcessCtrlStateMachineEngine{StateMachineConfig: stateMachineConfig}
}

+ 5
- 0
pkg/saga/statemachine/engine/sequence/sequence.go View File

@@ -0,0 +1,5 @@
package sequence

type SeqGenerator interface {
GenerateId(entity string, ruleName string) string
}

+ 14
- 0
pkg/saga/statemachine/engine/sequence/uuid.go View File

@@ -0,0 +1,14 @@
package sequence

import "github.com/google/uuid"

type UUIDSeqGenerator struct {
}

func NewUUIDSeqGenerator() *UUIDSeqGenerator {
return &UUIDSeqGenerator{}
}

func (U UUIDSeqGenerator) GenerateId(entity string, ruleName string) string {
return uuid.New().String()
}

+ 44
- 0
pkg/saga/statemachine/engine/statemachine_config.go View File

@@ -0,0 +1,44 @@
package engine

import (
"github.com/seata/seata-go/pkg/saga/statemachine/engine/events"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/expr"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/status_decision"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/store"
)

type StateMachineConfig interface {
StateLogRepository() store.StateLogRepository

StateMachineRepository() store.StateMachineRepository

StateLogStore() store.StateLogStore

StateLangStore() store.StateLangStore

ExpressionFactoryManager() expr.ExpressionFactoryManager

ExpressionResolver() expr.ExpressionResolver

SeqGenerator() sequence.SeqGenerator

StatusDecisionStrategy() status_decision.StatusDecisionStrategy

EventPublisher() events.EventPublisher

AsyncEventPublisher() events.EventPublisher

ServiceInvokerManager() invoker.ServiceInvokerManager

ScriptInvokerManager() invoker.ScriptInvokerManager

CharSet() string

DefaultTenantId() string

TransOperationTimeout() int

ServiceInvokeTimeout() int
}

+ 16
- 0
pkg/saga/statemachine/engine/statemachine_engine.go View File

@@ -0,0 +1,16 @@
package engine

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

type StateMachineEngine interface {
Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error)
}

type CallBack interface {
OnFinished(ctx context.Context, context process_ctrl.ProcessContext, stateMachineInstance statelang.StateMachineInstance)
OnError(ctx context.Context, context process_ctrl.ProcessContext, stateMachineInstance statelang.StateMachineInstance, err error)
}

+ 4
- 0
pkg/saga/statemachine/engine/status_decision/status_decision.go View File

@@ -0,0 +1,4 @@
package status_decision

type StatusDecisionStrategy interface {
}

+ 60
- 0
pkg/saga/statemachine/engine/store/statemachine_store.go View File

@@ -0,0 +1,60 @@
package store

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"io"
)

type StateLogRepository interface {
GetStateMachineInstance(stateMachineInstanceId string) (statelang.StateInstance, error)

GetStateMachineInstanceByBusinessKey(businessKey string, tenantId string) (statelang.StateInstance, error)

GetStateMachineInstanceByParentId(parentId string) ([]statelang.StateMachineInstance, error)

GetStateInstance(stateInstanceId string, stateMachineInstanceId string) (statelang.StateInstance, error)

GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string) ([]statelang.StateInstance, error)
}

type StateLogStore interface {
RecordStateMachineStarted(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error

RecordStateMachineFinished(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error

RecordStateMachineRestarted(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error

RecordStateStarted(ctx context.Context, stateInstance statelang.StateInstance, context process_ctrl.ProcessContext) error

RecordStateFinished(ctx context.Context, stateInstance statelang.StateInstance, context process_ctrl.ProcessContext) error

GetStateMachineInstance(stateMachineInstanceId string) (statelang.StateInstance, error)

GetStateMachineInstanceByBusinessKey(businessKey string, tenantId string) (statelang.StateInstance, error)

GetStateMachineInstanceByParentId(parentId string) ([]statelang.StateMachineInstance, error)

GetStateInstance(stateInstanceId string, stateMachineInstanceId string) (statelang.StateInstance, error)

GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string) ([]statelang.StateInstance, error)
}

type StateMachineRepository interface {
GetStateMachineById(stateMachineId string) (statelang.StateMachine, error)

GetLastVersionStateMachine(stateMachineName string, tenantId string) (statelang.StateMachine, error)

RegistryStateMachine(statelang.StateMachine) error

RegistryStateMachineByReader(reader io.Reader) error
}

type StateLangStore interface {
GetStateMachineById(stateMachineId string) (statelang.StateMachine, error)

GetLastVersionStateMachine(stateMachineName string, tenantId string) (statelang.StateMachine, error)

StoreStateMachine(stateMachine statelang.StateMachine) error
}

+ 72
- 0
pkg/saga/statemachine/statelang/parser/choice_state_json_parser.go View File

@@ -0,0 +1,72 @@
package parser

import (
"fmt"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/engine"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
)

type ChoiceStateParser struct {
BaseStateParser
}

func NewChoiceStateParser() *ChoiceStateParser {
return &ChoiceStateParser{}
}

func (c ChoiceStateParser) StateType() string {
return engine.StateTypeChoice
}

func (c ChoiceStateParser) Parse(stateName string, stateMap map[string]interface{}) (statelang.State, error) {
choiceState := state.NewChoiceStateImpl()
choiceState.SetName(stateName)

//parse Type
typeName, err := c.GetString(stateName, stateMap, "Type")
if err != nil {
return nil, err
}
choiceState.SetType(typeName)

//parse Default
defaultChoice, err := c.GetString(stateName, stateMap, "Default")
if err != nil {
return nil, err
}
choiceState.SetDefault(defaultChoice)

//parse Choices
slice, err := c.GetSlice(stateName, stateMap, "Choices")
if err != nil {
return nil, err
}

var choices []state.Choice
for i := range slice {
choiceValMap, ok := slice[i].(map[string]interface{})
if !ok {
return nil, errors.New(fmt.Sprintf("State [%s] Choices element required struct", stateName))
}

choice := state.NewChoiceImpl()
expression, err := c.GetString(stateName, choiceValMap, "Expression")
if err != nil {
return nil, err
}
choice.SetExpression(expression)

next, err := c.GetString(stateName, choiceValMap, "Next")
if err != nil {
return nil, err
}
choice.SetNext(next)

choices = append(choices, choice)
}
choiceState.SetChoices(choices)

return choiceState, nil
}

+ 100
- 0
pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go View File

@@ -0,0 +1,100 @@
package parser

import (
"encoding/json"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

type JSONStateMachineParser struct {
}

func NewJSONStateMachineParser() *JSONStateMachineParser {
return &JSONStateMachineParser{}
}

func (stateMachineParser JSONStateMachineParser) GetType() string {
return "JSON"
}

func (stateMachineParser JSONStateMachineParser) Parse(content string) (statelang.StateMachine, error) {
var stateMachineJsonObject StateMachineJsonObject

err := json.Unmarshal([]byte(content), &stateMachineJsonObject)
if err != nil {
return nil, err
}

stateMachine := statelang.NewStateMachineImpl()
stateMachine.SetName(stateMachineJsonObject.Name)
stateMachine.SetComment(stateMachineJsonObject.Comment)
stateMachine.SetVersion(stateMachineJsonObject.Version)
stateMachine.SetStartState(stateMachineJsonObject.StartState)
stateMachine.SetPersist(stateMachineJsonObject.Persist)

if stateMachineJsonObject.Type != "" {
stateMachine.SetType(stateMachineJsonObject.Type)
}

if stateMachineJsonObject.RecoverStrategy != "" {
recoverStrategy, ok := statelang.ValueOfRecoverStrategy(stateMachineJsonObject.RecoverStrategy)
if !ok {
return nil, errors.New("Not support " + stateMachineJsonObject.RecoverStrategy)
}
stateMachine.SetRecoverStrategy(recoverStrategy)
}

stateParserFactory := NewDefaultStateParserFactory()
stateParserFactory.InitDefaultStateParser()
for stateName, v := range stateMachineJsonObject.States {
stateMap, ok := v.(map[string]interface{})
if !ok {
return nil, errors.New("State [" + stateName + "] scheme illegal, required map")
}

stateType, ok := stateMap["Type"].(string)
if !ok {
return nil, errors.New("State [" + stateName + "] Type illegal, required string")
}

//stateMap
stateParser := stateParserFactory.GetStateParser(stateType)
if stateParser == nil {
return nil, errors.New("State Type [" + stateType + "] is not support")
}

_, stateExist := stateMachine.States()[stateName]
if stateExist {
return nil, errors.New("State [name:" + stateName + "] already exists")
}

state, err := stateParser.Parse(stateName, stateMap)
if err != nil {
return nil, err
}

state.SetStateMachine(stateMachine)
stateMachine.States()[stateName] = state
}

//TODO setCompensateState
//for stateName, state := range stateMachine.GetStates() {
//
//}
//

return stateMachine, nil
}

type StateMachineJsonObject struct {
Name string `json:"Name"`
Comment string `json:"Comment"`
Version string `json:"Version"`
StartState string `json:"StartState"`
RecoverStrategy string `json:"RecoverStrategy"`
Persist bool `json:"IsPersist"`
RetryPersistModeUpdate bool `json:"IsRetryPersistModeUpdate"`
CompensatePersistModeUpdate bool `json:"IsCompensatePersistModeUpdate"`
Type string `json:"Type"`
States map[string]interface{} `json:"States"`
}

+ 13
- 0
pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go View File

@@ -0,0 +1,13 @@
package parser

import (
"testing"
)

func TestParseChoice(t *testing.T) {
var content = "{\n \"Name\":\"ChoiceTest\",\n \"Comment\":\"ChoiceTest\",\n \"StartState\":\"ChoiceState\",\n \"Version\":\"0.0.1\",\n \"States\":{\n \"ChoiceState\":{\n \"Type\":\"Choice\",\n \"Choices\":[\n {\n \"Expression\":\"[a] == 1\",\n \"Next\":\"SecondState\"\n },\n {\n \"Expression\":\"[a] == 2\",\n \"Next\":\"ThirdState\"\n }\n ],\n \"Default\":\"Fail\"\n }\n }\n}"
_, err := NewJSONStateMachineParser().Parse(content)
if err != nil {
t.Error("parse fail: " + err.Error())
}
}

+ 104
- 0
pkg/saga/statemachine/statelang/parser/statemachine_parser.go View File

@@ -0,0 +1,104 @@
package parser

import (
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
"sync"
)

type StateMachineParser interface {
GetType() string
Parse(content string) (statelang.StateMachine, error)
}

type StateParser interface {
StateType() string
Parse(stateName string, stateMap map[string]interface{}) (statelang.State, error)
}

type BaseStateParser struct {
}

func (b BaseStateParser) ParseBaseAttributes(stateName string, state statelang.State, stateMap map[string]interface{}) error {
state.SetName(stateName)

comment, err := b.GetString(stateName, stateMap, "Comment")
if err != nil {
return err
}
state.SetComment(comment)

next, err := b.GetString(stateName, stateMap, "Next")
if err != nil {
return err
}

state.SetNext(next)
return nil
}

func (b BaseStateParser) GetString(stateName string, stateMap map[string]interface{}, key string) (string, error) {
value := stateMap[key]
if value == nil {
var result string
return result, errors.New("State [" + stateName + "] " + key + " not exist")
}

valueAsString, ok := value.(string)
if !ok {
var s string
return s, errors.New("State [" + stateName + "] " + key + " illegal, required string")
}
return valueAsString, nil
}

func (b BaseStateParser) GetSlice(stateName string, stateMap map[string]interface{}, key string) ([]interface{}, error) {
value := stateMap[key]

if value == nil {
var result []interface{}
return result, errors.New("State [" + stateName + "] " + key + " not exist")
}

valueAsSlice, ok := value.([]interface{})
if !ok {
var result []interface{}
return result, errors.New("State [" + stateName + "] " + key + " illegal, required slice")
}
return valueAsSlice, nil
}

type StateParserFactory interface {
RegistryStateParser(stateType string, stateParser StateParser)

GetStateParser(stateType string) StateParser
}

type DefaultStateParserFactory struct {
stateParserMap map[string]StateParser
mutex sync.Mutex
}

func NewDefaultStateParserFactory() *DefaultStateParserFactory {
var stateParserMap map[string]StateParser = make(map[string]StateParser)
return &DefaultStateParserFactory{
stateParserMap: stateParserMap,
}
}

// InitDefaultStateParser init StateParser by default
func (d *DefaultStateParserFactory) InitDefaultStateParser() {
choiceStateParser := NewChoiceStateParser()

d.RegistryStateParser(choiceStateParser.StateType(), choiceStateParser)
}

func (d *DefaultStateParserFactory) RegistryStateParser(stateType string, stateParser StateParser) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.stateParserMap[stateType] = stateParser
}

func (d *DefaultStateParserFactory) GetStateParser(stateType string) StateParser {
return d.stateParserMap[stateType]
}

+ 71
- 0
pkg/saga/statemachine/statelang/state.go View File

@@ -0,0 +1,71 @@
package statelang

type State interface {
Name() string

SetName(name string)

Comment() string

SetComment(comment string)

Type() string

SetType(typeName string)

Next() string

SetNext(next string)

StateMachine() StateMachine

SetStateMachine(machine StateMachine)
}

type BaseState struct {
name string `alias:"Name"`
comment string `alias:"Comment"`
typeName string `alias:"Type"`
next string `alias:"Next"`
stateMachine StateMachine
}

func (b *BaseState) Name() string {
return b.name
}

func (b *BaseState) SetName(name string) {
b.name = name
}

func (b *BaseState) Comment() string {
return b.comment
}

func (b *BaseState) SetComment(comment string) {
b.comment = comment
}

func (b *BaseState) Type() string {
return b.typeName
}

func (b *BaseState) SetType(typeName string) {
b.typeName = typeName
}

func (b *BaseState) Next() string {
return b.next
}

func (b *BaseState) SetNext(next string) {
b.next = next
}

func (b *BaseState) StateMachine() StateMachine {
return b.stateMachine
}

func (b *BaseState) SetStateMachine(machine StateMachine) {
b.stateMachine = machine
}

+ 74
- 0
pkg/saga/statemachine/statelang/state/choice_state.go View File

@@ -0,0 +1,74 @@
package state

import "github.com/seata/seata-go/pkg/saga/statemachine/statelang"

type ChoiceState interface {
statelang.State

Choices() []Choice

Default() string
}

type Choice interface {
Expression() string

SetExpression(expression string)

Next() string

SetNext(next string)
}

type ChoiceStateImpl struct {
statelang.BaseState
defaultChoice string `alias:"Default"`
choices []Choice `alias:"Choices"`
}

func NewChoiceStateImpl() *ChoiceStateImpl {
return &ChoiceStateImpl{
choices: make([]Choice, 0),
}
}

func (choiceState *ChoiceStateImpl) Default() string {
return choiceState.defaultChoice
}

func (choiceState *ChoiceStateImpl) Choices() []Choice {
return choiceState.choices
}

func (choiceState *ChoiceStateImpl) SetDefault(defaultChoice string) {
choiceState.defaultChoice = defaultChoice
}

func (choiceState *ChoiceStateImpl) SetChoices(choices []Choice) {
choiceState.choices = choices
}

type ChoiceImpl struct {
expression string
next string
}

func NewChoiceImpl() *ChoiceImpl {
return &ChoiceImpl{}
}

func (c *ChoiceImpl) Expression() string {
return c.expression
}

func (c *ChoiceImpl) SetExpression(expression string) {
c.expression = expression
}

func (c *ChoiceImpl) Next() string {
return c.next
}

func (c *ChoiceImpl) SetNext(next string) {
c.next = next
}

+ 30
- 0
pkg/saga/statemachine/statelang/state/task_state.go View File

@@ -0,0 +1,30 @@
package state

import (
"github.com/seata/seata-go/pkg/saga/statemachine/statelang"
)

type TaskState interface {
statelang.State

CompensateState() string

Status() map[string]string

Retry() []Retry
}

type Retry interface {
ErrorTypeNames() []string

IntervalSecond() float64

MaxAttempt() int

BackoffRate() float64
}

type ServiceTaskState interface {
TaskState
//TODO add serviceTask
}

+ 330
- 0
pkg/saga/statemachine/statelang/state_instance.go View File

@@ -0,0 +1,330 @@
package statelang

import "time"

type StateInstance interface {
ID() string

SetID(id string)

Name() string

SetName(name string)

Type() string

SetType(typeName string)

ServiceName() string

SetServiceName(serviceName string)

ServiceMethod() string

SetServiceMethod(serviceMethod string)

ServiceType() string

SetServiceType(serviceType string)

BusinessKey() string

SetBusinessKey(businessKey string)

StartedTime() time.Time

SetStartedTime(startedTime time.Time)

UpdatedTime() time.Time

SetUpdatedTime(updateTime time.Time)

EndTime() time.Time

SetEndTime(endTime time.Time)

IsForUpdate() bool

SetForUpdate(forUpdate bool)

Error() error

SetError(err error)

InputParams() interface{}

SetInputParams(inputParams interface{})

OutputParams() interface{}

SetOutputParams(outputParams interface{})

Status() ExecutionStatus

SetStatus(status ExecutionStatus)

StateIDCompensatedFor() string

SetStateIDCompensatedFor(stateIdCompensatedFor string)

StateIDRetriedFor() string

SetStateIDRetriedFor(stateIdRetriedFor string)

CompensationState() StateInstance

SetCompensationState(compensationState StateInstance)

StateMachineInstance() StateMachineInstance

SetStateMachineInstance(stateMachineInstance StateMachineInstance)

IsIgnoreStatus() bool

SetIgnoreStatus(ignoreStatus bool)

IsForCompensation() bool

SerializedInputParams() interface{}

SetSerializedInputParams(serializedInputParams interface{})

SerializedOutputParams() interface{}

SetSerializedOutputParams(serializedOutputParams interface{})

SerializedError() interface{}

SetSerializedError(serializedErr interface{})

CompensationStatus() ExecutionStatus
}

type StateInstanceImpl struct {
id string
machineInstanceId string
name string
typeName string
serviceName string
serviceMethod string
serviceType string
businessKey string
startedTime time.Time
updatedTime time.Time
endTime time.Time
isForUpdate bool
err error
serializedErr interface{}
inputParams interface{}
serializedInputParams interface{}
outputParams interface{}
serializedOutputParams interface{}
status ExecutionStatus
stateIdCompensatedFor string
stateIdRetriedFor string
compensationState StateInstance
stateMachineInstance StateMachineInstance
ignoreStatus bool
}

func NewStateInstanceImpl() *StateInstanceImpl {
return &StateInstanceImpl{}
}

func (s *StateInstanceImpl) ID() string {
return s.id
}

func (s *StateInstanceImpl) SetID(id string) {
s.id = id
}

func (s *StateInstanceImpl) Name() string {
return s.name
}

func (s *StateInstanceImpl) SetName(name string) {
s.name = name
}

func (s *StateInstanceImpl) Type() string {
return s.typeName
}

func (s *StateInstanceImpl) SetType(typeName string) {
s.typeName = typeName
}

func (s *StateInstanceImpl) ServiceName() string {
return s.serviceName
}

func (s *StateInstanceImpl) SetServiceName(serviceName string) {
s.serviceName = serviceName
}

func (s *StateInstanceImpl) ServiceMethod() string {
return s.serviceMethod
}

func (s *StateInstanceImpl) SetServiceMethod(serviceMethod string) {
s.serviceMethod = serviceMethod
}

func (s *StateInstanceImpl) ServiceType() string {
return s.serviceType
}

func (s *StateInstanceImpl) SetServiceType(serviceType string) {
s.serviceType = serviceType
}

func (s *StateInstanceImpl) BusinessKey() string {
return s.businessKey
}

func (s *StateInstanceImpl) SetBusinessKey(businessKey string) {
s.businessKey = businessKey
}

func (s *StateInstanceImpl) StartedTime() time.Time {
return s.startedTime
}

func (s *StateInstanceImpl) SetStartedTime(startedTime time.Time) {
s.startedTime = startedTime
}

func (s *StateInstanceImpl) UpdatedTime() time.Time {
return s.updatedTime
}

func (s *StateInstanceImpl) SetUpdatedTime(updatedTime time.Time) {
s.updatedTime = updatedTime
}

func (s *StateInstanceImpl) EndTime() time.Time {
return s.endTime
}

func (s *StateInstanceImpl) SetEndTime(endTime time.Time) {
s.endTime = endTime
}

func (s *StateInstanceImpl) IsForUpdate() bool {
return s.isForUpdate
}

func (s *StateInstanceImpl) SetForUpdate(forUpdate bool) {
s.isForUpdate = forUpdate
}

func (s *StateInstanceImpl) Error() error {
return s.err
}

func (s *StateInstanceImpl) SetError(err error) {
s.err = err
}

func (s *StateInstanceImpl) InputParams() interface{} {
return s.inputParams
}

func (s *StateInstanceImpl) SetInputParams(inputParams interface{}) {
s.inputParams = inputParams
}

func (s *StateInstanceImpl) OutputParams() interface{} {
return s.outputParams
}

func (s *StateInstanceImpl) SetOutputParams(outputParams interface{}) {
s.outputParams = outputParams
}

func (s *StateInstanceImpl) Status() ExecutionStatus {
return s.status
}

func (s *StateInstanceImpl) SetStatus(status ExecutionStatus) {
s.status = status
}

func (s *StateInstanceImpl) StateIDCompensatedFor() string {
return s.stateIdCompensatedFor
}

func (s *StateInstanceImpl) SetStateIDCompensatedFor(stateIdCompensatedFor string) {
s.stateIdCompensatedFor = stateIdCompensatedFor
}

func (s *StateInstanceImpl) StateIDRetriedFor() string {
return s.stateIdRetriedFor
}

func (s *StateInstanceImpl) SetStateIDRetriedFor(stateIdRetriedFor string) {
s.stateIdRetriedFor = stateIdRetriedFor
}

func (s *StateInstanceImpl) CompensationState() StateInstance {
return s.compensationState
}

func (s *StateInstanceImpl) SetCompensationState(compensationState StateInstance) {
s.compensationState = compensationState
}

func (s *StateInstanceImpl) StateMachineInstance() StateMachineInstance {
return s.stateMachineInstance
}

func (s *StateInstanceImpl) SetStateMachineInstance(stateMachineInstance StateMachineInstance) {
s.stateMachineInstance = stateMachineInstance
}

func (s *StateInstanceImpl) IsIgnoreStatus() bool {
return s.ignoreStatus
}

func (s *StateInstanceImpl) SetIgnoreStatus(ignoreStatus bool) {
s.ignoreStatus = ignoreStatus
}

func (s *StateInstanceImpl) IsForCompensation() bool {
return s.stateIdCompensatedFor == ""
}

func (s *StateInstanceImpl) SerializedInputParams() interface{} {
return s.serializedInputParams
}

func (s *StateInstanceImpl) SetSerializedInputParams(serializedInputParams interface{}) {
s.serializedInputParams = serializedInputParams
}

func (s *StateInstanceImpl) SerializedOutputParams() interface{} {
return s.serializedOutputParams
}

func (s *StateInstanceImpl) SetSerializedOutputParams(serializedOutputParams interface{}) {
s.serializedOutputParams = serializedOutputParams
}

func (s *StateInstanceImpl) SerializedError() interface{} {
return s.serializedErr
}

func (s *StateInstanceImpl) SetSerializedError(serializedErr interface{}) {
s.serializedErr = serializedErr
}

func (s *StateInstanceImpl) CompensationStatus() ExecutionStatus {
if s.compensationState != nil {
return s.compensationState.Status()
}

//return nil ExecutionStatus
var status ExecutionStatus
return status
}

+ 261
- 0
pkg/saga/statemachine/statelang/statemachine.go View File

@@ -0,0 +1,261 @@
package statelang

import (
"time"
)

type StateMachineStatus string

const (
Active StateMachineStatus = "Active"
Inactive StateMachineStatus = "Inactive"
)

// RecoverStrategy : Recover Strategy
type RecoverStrategy string

const (
//Compensate stateMachine
Compensate RecoverStrategy = "Compensate"
// Forward stateMachine
Forward RecoverStrategy = "Forward"
)

func ValueOfRecoverStrategy(recoverStrategy string) (RecoverStrategy, bool) {
switch recoverStrategy {
case "Compensate":
return Compensate, true
case "Forward":
return Forward, true
default:
var recoverStrategy RecoverStrategy
return recoverStrategy, false
}
}

type StateMachine interface {
ID() string

SetID(id string)

Name() string

SetName(name string)

Comment() string

SetComment(comment string)

StartState() string

SetStartState(startState string)

Version() string

SetVersion(version string)

States() map[string]State

State(stateName string) State

TenantId() string

SetTenantId(tenantId string)

AppName() string

SetAppName(appName string)

Type() string

SetType(typeName string)

Status() StateMachineStatus

SetStatus(status StateMachineStatus)

RecoverStrategy() RecoverStrategy

SetRecoverStrategy(recoverStrategy RecoverStrategy)

IsPersist() bool

SetPersist(persist bool)

IsRetryPersistModeUpdate() bool

SetRetryPersistModeUpdate(retryPersistModeUpdate bool)

IsCompensatePersistModeUpdate() bool

SetCompensatePersistModeUpdate(compensatePersistModeUpdate bool)

Content() string

SetContent(content string)

CreateTime() time.Time

SetCreateTime(createTime time.Time)
}

type StateMachineImpl struct {
id string
tenantId string
appName string
name string
comment string
version string
startState string
status StateMachineStatus
recoverStrategy RecoverStrategy
persist bool
retryPersistModeUpdate bool
compensatePersistModeUpdate bool
typeName string
content string
createTime time.Time
states map[string]State
}

func NewStateMachineImpl() *StateMachineImpl {
stateMap := make(map[string]State)
return &StateMachineImpl{
appName: "SEATA",
status: Active,
typeName: "STATE_LANG",
states: stateMap,
}
}

func (s *StateMachineImpl) ID() string {
return s.id
}

func (s *StateMachineImpl) SetID(id string) {
s.id = id
}

func (s *StateMachineImpl) Name() string {
return s.name
}

func (s *StateMachineImpl) SetName(name string) {
s.name = name
}

func (s *StateMachineImpl) SetComment(comment string) {
s.comment = comment
}

func (s *StateMachineImpl) Comment() string {
return s.comment
}

func (s *StateMachineImpl) StartState() string {
return s.startState
}

func (s *StateMachineImpl) SetStartState(startState string) {
s.startState = startState
}

func (s *StateMachineImpl) Version() string {
return s.version
}

func (s *StateMachineImpl) SetVersion(version string) {
s.version = version
}

func (s *StateMachineImpl) States() map[string]State {
return s.states
}

func (s *StateMachineImpl) State(stateName string) State {
if s.states == nil {
return nil
}

return s.states[stateName]
}

func (s *StateMachineImpl) TenantId() string {
return s.tenantId
}

func (s *StateMachineImpl) SetTenantId(tenantId string) {
s.tenantId = tenantId
}

func (s *StateMachineImpl) AppName() string {
return s.appName
}

func (s *StateMachineImpl) SetAppName(appName string) {
s.appName = appName
}

func (s *StateMachineImpl) Type() string {
return s.typeName
}

func (s *StateMachineImpl) SetType(typeName string) {
s.typeName = typeName
}

func (s *StateMachineImpl) Status() StateMachineStatus {
return s.status
}

func (s *StateMachineImpl) SetStatus(status StateMachineStatus) {
s.status = status
}

func (s *StateMachineImpl) RecoverStrategy() RecoverStrategy {
return s.recoverStrategy
}

func (s *StateMachineImpl) SetRecoverStrategy(recoverStrategy RecoverStrategy) {
s.recoverStrategy = recoverStrategy
}

func (s *StateMachineImpl) IsPersist() bool {
return s.persist
}

func (s *StateMachineImpl) SetPersist(persist bool) {
s.persist = persist
}

func (s *StateMachineImpl) IsRetryPersistModeUpdate() bool {
return s.retryPersistModeUpdate
}

func (s *StateMachineImpl) SetRetryPersistModeUpdate(retryPersistModeUpdate bool) {
s.retryPersistModeUpdate = retryPersistModeUpdate
}

func (s *StateMachineImpl) IsCompensatePersistModeUpdate() bool {
return s.compensatePersistModeUpdate
}

func (s *StateMachineImpl) SetCompensatePersistModeUpdate(compensatePersistModeUpdate bool) {
s.compensatePersistModeUpdate = compensatePersistModeUpdate
}

func (s *StateMachineImpl) Content() string {
return s.content
}

func (s *StateMachineImpl) SetContent(content string) {
s.content = content
}

func (s *StateMachineImpl) CreateTime() time.Time {
return s.createTime
}

func (s *StateMachineImpl) SetCreateTime(createTime time.Time) {
s.createTime = createTime
}

+ 316
- 0
pkg/saga/statemachine/statelang/statemachine_instance.go View File

@@ -0,0 +1,316 @@
package statelang

import (
"sync"
"time"
)

type ExecutionStatus string

const (
// RU Running
RU ExecutionStatus = "RU"
// SU Succeed
SU ExecutionStatus = "SU"
// FA Failed
FA ExecutionStatus = "FA"
// UN Unknown
UN ExecutionStatus = "UN"
// SK Skipped
SK ExecutionStatus = "SK"
)

type StateMachineInstance interface {
ID() string

SetID(id string)

MachineID() string

SetMachineID(machineID string)

TenantID() string

SetTenantID(tenantID string)

ParentID() string

SetParentID(parentID string)

StartedTime() time.Time

SetStartedTime(startedTime time.Time)

EndTime() time.Time

SetEndTime(endTime time.Time)

StateList() []StateInstance

State(stateId string) StateInstance

PutState(stateId string, stateInstance StateInstance)

Status() ExecutionStatus

SetStatus(status ExecutionStatus)

CompensationStatus() ExecutionStatus

SetCompensationStatus(compensationStatus ExecutionStatus)

IsRunning() bool

SetRunning(isRunning bool)

UpdatedTime() time.Time

SetUpdatedTime(updatedTime time.Time)

BusinessKey() string

SetBusinessKey(businessKey string)

Error() error

SetError(err error)

StartParams() map[string]interface{}

SetStartParams(startParams map[string]interface{})

EndParams() map[string]interface{}

SetEndParams(endParams map[string]interface{})

PutContext(key string, value interface{})

SetContext(context map[string]interface{})

StateMachine() StateMachine

SetStateMachine(stateMachine StateMachine)

SerializedStartParams() interface{}

SetSerializedStartParams(serializedStartParams interface{})

SerializedEndParams() interface{}

SetSerializedEndParams(serializedEndParams interface{})

SerializedError() interface{}

SetSerializedError(serializedError interface{})
}

type StateMachineInstanceImpl struct {
id string
machineId string
tenantId string
parentId string
businessKey string
startParams map[string]interface{}
serializedStartParams interface{}
startedTime time.Time
endTime time.Time
updatedTime time.Time
err error
serializedError interface{}
endParams map[string]interface{}
serializedEndParams interface{}
status ExecutionStatus
compensationStatus ExecutionStatus
isRunning bool
context map[string]interface{}
stateMachine StateMachine
stateList []StateInstance
stateMap map[string]StateInstance

contextMutex sync.RWMutex // Mutex to protect concurrent access to context
stateMutex sync.RWMutex // Mutex to protect concurrent access to stateList and stateMap
}

func NewStateMachineInstanceImpl() *StateMachineInstanceImpl {
return &StateMachineInstanceImpl{
startParams: make(map[string]interface{}),
endParams: make(map[string]interface{}),
stateList: make([]StateInstance, 0),
stateMap: make(map[string]StateInstance)}
}

func (s *StateMachineInstanceImpl) ID() string {
return s.id
}

func (s *StateMachineInstanceImpl) SetID(id string) {
s.id = id
}

func (s *StateMachineInstanceImpl) MachineID() string {
return s.machineId
}

func (s *StateMachineInstanceImpl) SetMachineID(machineID string) {
s.machineId = machineID
}

func (s *StateMachineInstanceImpl) TenantID() string {
return s.tenantId
}

func (s *StateMachineInstanceImpl) SetTenantID(tenantID string) {
s.tenantId = tenantID
}

func (s *StateMachineInstanceImpl) ParentID() string {
return s.parentId
}

func (s *StateMachineInstanceImpl) SetParentID(parentID string) {
s.parentId = parentID
}

func (s *StateMachineInstanceImpl) StartedTime() time.Time {
return s.startedTime
}

func (s *StateMachineInstanceImpl) SetStartedTime(startedTime time.Time) {
s.startedTime = startedTime
}

func (s *StateMachineInstanceImpl) EndTime() time.Time {
return s.endTime
}

func (s *StateMachineInstanceImpl) SetEndTime(endTime time.Time) {
s.endTime = endTime
}

func (s *StateMachineInstanceImpl) StateList() []StateInstance {
return s.stateList
}

func (s *StateMachineInstanceImpl) State(stateId string) StateInstance {
s.stateMutex.RLock()
defer s.stateMutex.RUnlock()

return s.stateMap[stateId]
}

func (s *StateMachineInstanceImpl) PutState(stateId string, stateInstance StateInstance) {
s.stateMutex.Lock()
defer s.stateMutex.Unlock()

stateInstance.SetStateMachineInstance(s)
s.stateMap[stateId] = stateInstance
s.stateList = append(s.stateList, stateInstance)
}

func (s *StateMachineInstanceImpl) Status() ExecutionStatus {
return s.status
}

func (s *StateMachineInstanceImpl) SetStatus(status ExecutionStatus) {
s.status = status
}

func (s *StateMachineInstanceImpl) CompensationStatus() ExecutionStatus {
return s.compensationStatus
}

func (s *StateMachineInstanceImpl) SetCompensationStatus(compensationStatus ExecutionStatus) {
s.compensationStatus = compensationStatus
}

func (s *StateMachineInstanceImpl) IsRunning() bool {
return s.isRunning
}

func (s *StateMachineInstanceImpl) SetRunning(isRunning bool) {
s.isRunning = isRunning
}

func (s *StateMachineInstanceImpl) UpdatedTime() time.Time {
return s.updatedTime
}

func (s *StateMachineInstanceImpl) SetUpdatedTime(updatedTime time.Time) {
s.updatedTime = updatedTime
}

func (s *StateMachineInstanceImpl) BusinessKey() string {
return s.businessKey
}

func (s *StateMachineInstanceImpl) SetBusinessKey(businessKey string) {
s.businessKey = businessKey
}

func (s *StateMachineInstanceImpl) Error() error {
return s.err
}

func (s *StateMachineInstanceImpl) SetError(err error) {
s.err = err
}

func (s *StateMachineInstanceImpl) StartParams() map[string]interface{} {
return s.startParams
}

func (s *StateMachineInstanceImpl) SetStartParams(startParams map[string]interface{}) {
s.startParams = startParams
}

func (s *StateMachineInstanceImpl) EndParams() map[string]interface{} {
return s.endParams
}

func (s *StateMachineInstanceImpl) SetEndParams(endParams map[string]interface{}) {
s.endParams = endParams
}

func (s *StateMachineInstanceImpl) PutContext(key string, value interface{}) {
s.contextMutex.Lock()
defer s.contextMutex.Unlock()

s.context[key] = value
}

func (s *StateMachineInstanceImpl) SetContext(context map[string]interface{}) {
s.context = context
}

func (s *StateMachineInstanceImpl) StateMachine() StateMachine {
return s.stateMachine
}

func (s *StateMachineInstanceImpl) SetStateMachine(stateMachine StateMachine) {
s.stateMachine = stateMachine
s.machineId = stateMachine.ID()
}

func (s *StateMachineInstanceImpl) SerializedStartParams() interface{} {
return s.serializedStartParams
}

func (s *StateMachineInstanceImpl) SetSerializedStartParams(serializedStartParams interface{}) {
s.serializedStartParams = serializedStartParams
}

func (s *StateMachineInstanceImpl) SerializedEndParams() interface{} {
return s.endParams
}

func (s *StateMachineInstanceImpl) SetSerializedEndParams(serializedEndParams interface{}) {
s.serializedEndParams = serializedEndParams
}

func (s *StateMachineInstanceImpl) SerializedError() interface{} {
return s.serializedError
}

func (s *StateMachineInstanceImpl) SetSerializedError(serializedError interface{}) {
s.serializedError = serializedError
}

+ 147
- 0
pkg/util/reflectx/unmarkshaler.go View File

@@ -0,0 +1,147 @@
package reflectx

import (
"fmt"
"github.com/pkg/errors"
"reflect"
"unicode"
)

// MapToStruct some state can use this util to parse
// TODO 性能测试,性能差的话,直接去解析,不使用反射
func MapToStruct(stateName string, obj interface{}, stateMap map[string]interface{}) error {
objVal := reflect.ValueOf(obj)
if objVal.Kind() != reflect.Pointer {
return errors.New(fmt.Sprintf("State [%s] value required a pointer", stateName))
}

structValue := objVal.Elem()
if structValue.Kind() != reflect.Struct {
return errors.New(fmt.Sprintf("State [%s] value elem required a struct", stateName))
}

structType := structValue.Type()
for key, value := range stateMap {
//Get field, get alias first
field, found := getField(structType, key)
if !found {
continue
}

fieldVal := structValue.FieldByName(field.Name)
if !fieldVal.IsValid() {
return errors.New(fmt.Sprintf("State [%s] not support [%s] filed", stateName, key))
}

//Get setMethod
var setMethod reflect.Value
if !fieldVal.CanSet() {
setMethod = getFiledSetMethod(field.Name, objVal)

if !setMethod.IsValid() {
fieldAliasName := field.Tag.Get("alias")
setMethod = getFiledSetMethod(fieldAliasName, objVal)
}

if !setMethod.IsValid() {
return errors.New(fmt.Sprintf("State [%s] [%s] field not support setMethod", stateName, key))
}
setMethodType := setMethod.Type()
if !(setMethodType.NumIn() == 1 && setMethodType.In(0) == fieldVal.Type()) {
return errors.New(fmt.Sprintf("State [%s] [%s] field setMethod illegal", stateName, key))
}
}

val := reflect.ValueOf(value)
if fieldVal.Kind() == reflect.Struct {
//map[string]interface{}
if val.Kind() != reflect.Map {
return errors.New(fmt.Sprintf("State [%s] [%s] field type required map", stateName, key))
}

err := MapToStruct(stateName, fieldVal.Addr().Interface(), value.(map[string]interface{}))
if err != nil {
return err
}
} else if fieldVal.Kind() == reflect.Slice {
if val.Kind() != reflect.Slice {
return errors.New(fmt.Sprintf("State [%s] [%s] field type required slice", stateName, key))
}

sliceType := fieldVal.Type().Elem()
newSlice := reflect.MakeSlice(fieldVal.Type(), 0, val.Len())

for i := 0; i < val.Len(); i++ {
newElem := reflect.New(sliceType.Elem())
elemMap := val.Index(i).Interface().(map[string]interface{})
err := MapToStruct(stateName, newElem.Interface(), elemMap)
if err != nil {
return err
}
reflect.Append(newSlice, newElem.Elem())
}
setFiled(fieldVal, setMethod, newSlice)
} else if fieldVal.Kind() == reflect.Map {
if val.Kind() != reflect.Map {
return errors.New(fmt.Sprintf("State [%s] [%s] field type required map", stateName, key))
}

mapType := field.Type
newMap := reflect.MakeMap(mapType)

for _, key := range val.MapKeys() {
newVal := reflect.New(mapType.Elem().Elem())
elemMap := val.MapIndex(key).Interface().(map[string]interface{})
err := MapToStruct(stateName, newVal.Interface(), elemMap)
if err != nil {
return err
}
newMap.SetMapIndex(key, newVal.Elem())
}
setFiled(fieldVal, setMethod, newMap)
} else {
setFiled(fieldVal, setMethod, val)
}
}
return nil
}

func getField(t reflect.Type, name string) (reflect.StructField, bool) {
for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
tag, hasAliasTag := field.Tag.Lookup("alias")

if (hasAliasTag && tag == name) || (!hasAliasTag && field.Name == name) {
return field, true
}

if field.Anonymous {
embeddedField, ok := getField(field.Type, name)
if ok {
return embeddedField, true
}
}
}

return reflect.StructField{}, false
}

func getFiledSetMethod(name string, structValue reflect.Value) reflect.Value {
fieldNameSlice := []rune(name)
fieldNameSlice[0] = unicode.ToUpper(fieldNameSlice[0])

setMethodName := "Set" + string(fieldNameSlice)

setMethod := structValue.MethodByName(setMethodName)
return setMethod
}

func setFiled(fieldVal reflect.Value, setMethod reflect.Value, val reflect.Value) {
if !fieldVal.CanSet() {
setMethod.Call([]reflect.Value{
val,
})
} else {
fieldVal.Set(val)
}
}

Loading…
Cancel
Save