diff --git a/pkg/saga/readme.md b/pkg/saga/readme.md new file mode 100644 index 00000000..9aa593da --- /dev/null +++ b/pkg/saga/readme.md @@ -0,0 +1,31 @@ + +# seata saga + +未来计划有三种使用方式 + +- 基于状态机引擎的 json + link: statemachine_engine#Start +- stream builder + stateMachine.serviceTask().build().Start +- 二阶段方式saga,类似tcc使用 + +上面1、2是以来[statemachine](statemachine),状态机引擎实现的,3相对比较独立。 + + +状态机的实现在:saga-statemachine包中 +其中[statelang](statemachine%2Fstatelang)是状态机语言的解析,目前实现的是json解析方式,状态机语言可以参考: +https://seata.io/docs/user/mode/saga + +状态机json执行的入口类是:[statemachine_engine.go](statemachine%2Fengine%2Fstatemachine_engine.go) + +下面简单说下engine中各个包的作用: +events:saga的是基于事件处理的,其中是event、eventBus的实现 +expr:表达式声明、解析、执行 +invoker:声明了serviceInvoker、scriptInvoker等接口、task调用管理、执行都在这个包中,例如httpInvoker +process_ctrl:状态机处理流程:上下文、执行、事件流转 +sequence:分布式id +store:状态机存储接口、实现 +status_decision:状态机状态决策 + + + diff --git a/pkg/saga/statemachine/engine/contant.go b/pkg/saga/statemachine/engine/contant.go new file mode 100644 index 00000000..9527fc29 --- /dev/null +++ b/pkg/saga/statemachine/engine/contant.go @@ -0,0 +1,20 @@ +package engine + +const ( + VarNameProcessType string = "_ProcessType_" + VarNameOperationName string = "_operation_name_" + OperationNameStart string = "start" + VarNameAsyncCallback string = "_async_callback_" + VarNameStateMachineInst string = "_current_statemachine_instance_" + VarNameStateMachine string = "_current_statemachine_" + VarNameStateMachineEngine string = "_current_statemachine_engine_" + VarNameStateMachineConfig string = "_statemachine_config_" + VarNameStateMachineContext string = "context" + VarNameIsAsyncExecution string = "_is_async_execution_" + VarNameStateInst string = "_current_state_instance_" + SeqEntityStateMachineInst string = "STATE_MACHINE_INST" + VarNameBusinesskey string = "_business_key_" + VarNameParentId string = "_parent_id_" + StateTypeServiceTask string = "ServiceTask" + StateTypeChoice string = "Choice" +) diff --git a/pkg/saga/statemachine/engine/events/event.go b/pkg/saga/statemachine/engine/events/event.go new file mode 100644 index 00000000..8f142713 --- /dev/null +++ b/pkg/saga/statemachine/engine/events/event.go @@ -0,0 +1,4 @@ +package events + +type Event interface { +} diff --git a/pkg/saga/statemachine/engine/events/event_bus.go b/pkg/saga/statemachine/engine/events/event_bus.go new file mode 100644 index 00000000..77a1e616 --- /dev/null +++ b/pkg/saga/statemachine/engine/events/event_bus.go @@ -0,0 +1,49 @@ +package events + +import ( + "context" +) + +type EventBus interface { + Offer(ctx context.Context, event Event) (bool, error) + + RegisterEventConsumer(consumer EventConsumer) +} + +type BaseEventBus struct { + eventConsumerList []EventConsumer +} + +func (b *BaseEventBus) RegisterEventConsumer(consumer EventConsumer) { + if b.eventConsumerList == nil { + b.eventConsumerList = make([]EventConsumer, 0) + } + b.eventConsumerList = append(b.eventConsumerList, consumer) +} + +func (b *BaseEventBus) GetEventConsumerList(event Event) []EventConsumer { + var acceptedConsumerList = make([]EventConsumer, 0) + for i := range b.eventConsumerList { + eventConsumer := b.eventConsumerList[i] + if eventConsumer.Accept(event) { + acceptedConsumerList = append(acceptedConsumerList, eventConsumer) + } + } + return acceptedConsumerList +} + +type DirectEventBus struct { + BaseEventBus +} + +func (d DirectEventBus) Offer(ctx context.Context, event Event) (bool, error) { + eventConsumerList := d.GetEventConsumerList(event) + if len(eventConsumerList) == 0 { + //TODO logger + return false, nil + } + + //processContext, _ := event.(process_ctrl.ProcessContext) + + return true, nil +} diff --git a/pkg/saga/statemachine/engine/events/event_consumer.go b/pkg/saga/statemachine/engine/events/event_consumer.go new file mode 100644 index 00000000..426da15e --- /dev/null +++ b/pkg/saga/statemachine/engine/events/event_consumer.go @@ -0,0 +1,29 @@ +package events + +import ( + "context" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" +) + +type EventConsumer interface { + Accept(event Event) bool + + Process(ctx context.Context, event Event) error +} + +type ProcessCtrlEventConsumer struct { +} + +func (p ProcessCtrlEventConsumer) Accept(event Event) bool { + if event == nil { + return false + } + + _, ok := event.(process_ctrl.ProcessContext) + return ok +} + +func (p ProcessCtrlEventConsumer) Process(ctx context.Context, event Event) error { + //TODO implement me + panic("implement me") +} diff --git a/pkg/saga/statemachine/engine/events/event_publisher.go b/pkg/saga/statemachine/engine/events/event_publisher.go new file mode 100644 index 00000000..1fbfaec2 --- /dev/null +++ b/pkg/saga/statemachine/engine/events/event_publisher.go @@ -0,0 +1,19 @@ +package events + +import "context" + +type EventPublisher interface { + PushEvent(ctx context.Context, event Event) (bool, error) +} + +type ProcessCtrlEventPublisher struct { + eventBus EventBus +} + +func NewProcessCtrlEventPublisher(eventBus EventBus) *ProcessCtrlEventPublisher { + return &ProcessCtrlEventPublisher{eventBus: eventBus} +} + +func (p ProcessCtrlEventPublisher) PushEvent(ctx context.Context, event Event) (bool, error) { + return p.eventBus.Offer(ctx, event) +} diff --git a/pkg/saga/statemachine/engine/expr/expression.go b/pkg/saga/statemachine/engine/expr/expression.go new file mode 100644 index 00000000..9706cda5 --- /dev/null +++ b/pkg/saga/statemachine/engine/expr/expression.go @@ -0,0 +1,10 @@ +package expr + +type ExpressionResolver interface { +} + +type Expression interface { +} + +type ExpressionFactoryManager struct { +} diff --git a/pkg/saga/statemachine/engine/invoker/invoker.go b/pkg/saga/statemachine/engine/invoker/invoker.go new file mode 100644 index 00000000..8c2ec069 --- /dev/null +++ b/pkg/saga/statemachine/engine/invoker/invoker.go @@ -0,0 +1,14 @@ +package invoker + +type ScriptInvokerManager interface { +} + +type ScriptInvoker interface { +} + +type ServiceInvokerManager interface { +} + +type ServiceInvoker interface { + Invoke() +} diff --git a/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go b/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go new file mode 100644 index 00000000..198092a1 --- /dev/null +++ b/pkg/saga/statemachine/engine/process_ctrl/bussiness_processor.go @@ -0,0 +1,92 @@ +package process_ctrl + +import ( + "context" + "github.com/pkg/errors" + "github.com/seata/seata-go/pkg/saga/statemachine/engine" + "sync" +) + +type BusinessProcessor interface { + Process(ctx context.Context, processContext ProcessContext) error + + Route(ctx context.Context, processContext ProcessContext) error +} + +type DefaultBusinessProcessor struct { + processHandlers map[string]ProcessHandler + routerHandlers map[string]RouterHandler + mu sync.RWMutex +} + +func (d *DefaultBusinessProcessor) RegistryProcessHandler(processType ProcessType, processHandler ProcessHandler) { + d.mu.Lock() + defer d.mu.Unlock() + + d.processHandlers[string(processType)] = processHandler +} + +func (d *DefaultBusinessProcessor) RegistryRouterHandler(processType ProcessType, routerHandler RouterHandler) { + d.mu.Lock() + defer d.mu.Unlock() + + d.routerHandlers[string(processType)] = routerHandler +} + +func (d *DefaultBusinessProcessor) Process(ctx context.Context, processContext ProcessContext) error { + processType := d.matchProcessType(processContext) + + processHandler, err := d.getProcessHandler(processType) + if err != nil { + return err + } + + return processHandler.Process(ctx, processContext) +} + +func (d *DefaultBusinessProcessor) Route(ctx context.Context, processContext ProcessContext) error { + processType := d.matchProcessType(processContext) + + routerHandler, err := d.getRouterHandler(processType) + if err != nil { + return err + } + + return routerHandler.Route(ctx, processContext) +} + +func (d *DefaultBusinessProcessor) getProcessHandler(processType ProcessType) (ProcessHandler, error) { + d.mu.RLock() + defer d.mu.RUnlock() + processHandler, ok := d.processHandlers[string(processType)] + if !ok { + return nil, errors.New("Cannot find process handler by type " + string(processType)) + } + return processHandler, nil +} + +func (d *DefaultBusinessProcessor) getRouterHandler(processType ProcessType) (RouterHandler, error) { + d.mu.RLock() + defer d.mu.RUnlock() + routerHandler, ok := d.routerHandlers[string(processType)] + if !ok { + return nil, errors.New("Cannot find router handler by type " + string(processType)) + } + return routerHandler, nil +} + +func (d *DefaultBusinessProcessor) matchProcessType(processContext ProcessContext) ProcessType { + ok := processContext.HasVariable(engine.VarNameProcessType) + if ok { + return processContext.GetVariable(engine.VarNameProcessType).(ProcessType) + } + return StateLang +} + +type ProcessHandler interface { + Process(ctx context.Context, processContext ProcessContext) error +} + +type RouterHandler interface { + Route(ctx context.Context, processContext ProcessContext) error +} diff --git a/pkg/saga/statemachine/engine/process_ctrl/instruction/instruction.go b/pkg/saga/statemachine/engine/process_ctrl/instruction/instruction.go new file mode 100644 index 00000000..1537dc67 --- /dev/null +++ b/pkg/saga/statemachine/engine/process_ctrl/instruction/instruction.go @@ -0,0 +1,25 @@ +package instruction + +import ( + "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" +) + +type Instruction interface { +} + +type StateInstruction struct { + StateName string + StateMachineName string + TenantId string + End bool +} + +func (s StateInstruction) GetState(context process_ctrl.ProcessContext) (statelang.State, error) { + //TODO implement me + panic("implement me") +} + +func NewStateInstruction(stateMachineName string, tenantId string) *StateInstruction { + return &StateInstruction{StateMachineName: stateMachineName, TenantId: tenantId} +} diff --git a/pkg/saga/statemachine/engine/process_ctrl/process_context.go b/pkg/saga/statemachine/engine/process_ctrl/process_context.go new file mode 100644 index 00000000..6531b1af --- /dev/null +++ b/pkg/saga/statemachine/engine/process_ctrl/process_context.go @@ -0,0 +1,284 @@ +package process_ctrl + +import ( + "github.com/seata/seata-go/pkg/saga/statemachine/engine" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "sync" +) + +type ProcessContext interface { + GetVariable(name string) interface{} + + SetVariable(name string, value interface{}) + + GetVariables() map[string]interface{} + + SetVariables(variables map[string]interface{}) + + RemoveVariable(name string) interface{} + + HasVariable(name string) bool + + GetInstruction() instruction.Instruction + + SetInstruction(instruction instruction.Instruction) +} + +type HierarchicalProcessContext interface { + ProcessContext + + GetVariableLocally(name string) interface{} + + SetVariableLocally(name string, value interface{}) + + GetVariablesLocally() map[string]interface{} + + SetVariablesLocally(variables map[string]interface{}) + + RemoveVariableLocally(name string) interface{} + + HasVariableLocally(name string) bool + + ClearLocally() +} + +type ProcessContextImpl struct { + parent ProcessContext + mu sync.RWMutex + mp map[string]interface{} + instruction instruction.Instruction +} + +func (p *ProcessContextImpl) GetVariable(name string) interface{} { + p.mu.RLock() + defer p.mu.RUnlock() + + value, ok := p.mp[name] + if ok { + return value + } + + if p.parent != nil { + return p.parent.GetVariable(name) + } + + return nil +} + +func (p *ProcessContextImpl) SetVariable(name string, value interface{}) { + p.mu.Lock() + defer p.mu.Unlock() + + _, ok := p.mp[name] + if ok { + p.mp[name] = value + } else { + if p.parent != nil { + p.parent.SetVariable(name, value) + } else { + p.mp[name] = value + } + } +} + +func (p *ProcessContextImpl) GetVariables() map[string]interface{} { + p.mu.RLock() + defer p.mu.RUnlock() + + newVariablesMap := make(map[string]interface{}) + if p.parent != nil { + variables := p.parent.GetVariables() + for k, v := range variables { + newVariablesMap[k] = v + } + } + + for k, v := range p.mp { + newVariablesMap[k] = v + } + + return newVariablesMap +} + +func (p *ProcessContextImpl) SetVariables(variables map[string]interface{}) { + for k, v := range variables { + p.SetVariable(k, v) + } +} + +func (p *ProcessContextImpl) RemoveVariable(name string) interface{} { + p.mu.Lock() + defer p.mu.Unlock() + + value, ok := p.mp[name] + if ok { + delete(p.mp, name) + return value + } + + if p.parent != nil { + return p.parent.RemoveVariable(name) + } + + return nil +} + +func (p *ProcessContextImpl) HasVariable(name string) bool { + p.mu.RLock() + defer p.mu.RUnlock() + + _, ok := p.mp[name] + if ok { + return true + } + + if p.parent != nil { + return p.parent.HasVariable(name) + } + + return false +} + +func (p *ProcessContextImpl) GetInstruction() instruction.Instruction { + return p.instruction +} + +func (p *ProcessContextImpl) SetInstruction(instruction instruction.Instruction) { + p.instruction = instruction +} + +func (p *ProcessContextImpl) GetVariableLocally(name string) interface{} { + p.mu.RLock() + defer p.mu.RUnlock() + + value, _ := p.mp[name] + return value +} + +func (p *ProcessContextImpl) SetVariableLocally(name string, value interface{}) { + p.mu.Lock() + defer p.mu.Unlock() + + p.mp[name] = value +} + +func (p *ProcessContextImpl) GetVariablesLocally() map[string]interface{} { + p.mu.RLock() + defer p.mu.RUnlock() + + newVariablesMap := make(map[string]interface{}, len(p.mp)) + for k, v := range p.mp { + newVariablesMap[k] = v + } + return newVariablesMap +} + +func (p *ProcessContextImpl) SetVariablesLocally(variables map[string]interface{}) { + for k, v := range variables { + p.SetVariableLocally(k, v) + } +} + +func (p *ProcessContextImpl) RemoveVariableLocally(name string) interface{} { + p.mu.Lock() + defer p.mu.Unlock() + + value, _ := p.mp[name] + delete(p.mp, name) + return value +} + +func (p *ProcessContextImpl) HasVariableLocally(name string) bool { + p.mu.RLock() + defer p.mu.RUnlock() + + _, ok := p.mp[name] + return ok +} + +func (p *ProcessContextImpl) ClearLocally() { + p.mu.Lock() + defer p.mu.Unlock() + + p.mp = map[string]interface{}{} +} + +// ProcessContextBuilder process_ctrl builder +type ProcessContextBuilder struct { + processContext ProcessContext +} + +func NewProcessContextBuilder() *ProcessContextBuilder { + processContextImpl := &ProcessContextImpl{} + return &ProcessContextBuilder{processContextImpl} +} + +func (p *ProcessContextBuilder) WithProcessType(processType ProcessType) *ProcessContextBuilder { + p.processContext.SetVariable(engine.VarNameProcessType, processType) + return p +} + +func (p *ProcessContextBuilder) WithOperationName(operationName string) *ProcessContextBuilder { + p.processContext.SetVariable(engine.VarNameOperationName, operationName) + return p +} + +func (p *ProcessContextBuilder) WithAsyncCallback(callBack engine.CallBack) *ProcessContextBuilder { + if callBack != nil { + p.processContext.SetVariable(engine.VarNameAsyncCallback, callBack) + } + + return p +} + +func (p *ProcessContextBuilder) WithInstruction(instruction instruction.Instruction) *ProcessContextBuilder { + if instruction != nil { + p.processContext.SetInstruction(instruction) + } + + return p +} + +func (p *ProcessContextBuilder) WithStateMachineInstance(stateMachineInstance statelang.StateMachineInstance) *ProcessContextBuilder { + if stateMachineInstance != nil { + p.processContext.SetVariable(engine.VarNameStateMachineInst, stateMachineInstance) + p.processContext.SetVariable(engine.VarNameStateMachine, stateMachineInstance.StateMachine()) + } + + return p +} + +func (p *ProcessContextBuilder) WithStateMachineEngine(stateMachineEngine engine.StateMachineEngine) *ProcessContextBuilder { + if stateMachineEngine != nil { + p.processContext.SetVariable(engine.VarNameStateMachineEngine, stateMachineEngine) + } + + return p +} + +func (p *ProcessContextBuilder) WithStateMachineConfig(stateMachineConfig engine.StateMachineConfig) *ProcessContextBuilder { + if stateMachineConfig != nil { + p.processContext.SetVariable(engine.VarNameStateMachineConfig, stateMachineConfig) + } + + return p +} + +func (p *ProcessContextBuilder) WithStateMachineContextVariables(contextMap map[string]interface{}) *ProcessContextBuilder { + if contextMap != nil { + p.processContext.SetVariable(engine.VarNameStateMachineContext, contextMap) + } + + return p +} + +func (p *ProcessContextBuilder) WithIsAsyncExecution(async bool) *ProcessContextBuilder { + p.processContext.SetVariable(engine.VarNameIsAsyncExecution, async) + + return p +} + +func (p *ProcessContextBuilder) Build() ProcessContext { + return p.processContext +} diff --git a/pkg/saga/statemachine/engine/process_ctrl/process_type.go b/pkg/saga/statemachine/engine/process_ctrl/process_type.go new file mode 100644 index 00000000..14027a28 --- /dev/null +++ b/pkg/saga/statemachine/engine/process_ctrl/process_type.go @@ -0,0 +1,7 @@ +package process_ctrl + +type ProcessType string + +const ( + StateLang ProcessType = "STATE_LANG" // SEATA State Language +) diff --git a/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go b/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go new file mode 100644 index 00000000..ce3479c3 --- /dev/null +++ b/pkg/saga/statemachine/engine/process_ctrl/statemachine_processor.go @@ -0,0 +1,139 @@ +package process_ctrl + +import ( + "context" + "github.com/pkg/errors" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction" + "sync" +) + +type StateHandler interface { + State() string + ProcessHandler +} + +type StateRouter interface { + State() string + RouterHandler +} + +type InterceptAbleStateHandler interface { + StateHandler + StateHandlerInterceptorList() []StateHandlerInterceptor + RegistryStateHandlerInterceptor(stateHandlerInterceptor StateHandlerInterceptor) +} + +type StateHandlerInterceptor interface { + PreProcess(ctx context.Context, processContext ProcessContext) error + PostProcess(ctx context.Context, processContext ProcessContext) error +} + +type StateMachineProcessHandler struct { + mp map[string]StateHandler + mu sync.RWMutex +} + +func NewStateMachineProcessHandler() *StateMachineProcessHandler { + return &StateMachineProcessHandler{ + mp: make(map[string]StateHandler), + } +} + +func (s *StateMachineProcessHandler) Process(ctx context.Context, processContext ProcessContext) error { + stateInstruction, _ := processContext.GetInstruction().(instruction.StateInstruction) + + state, err := stateInstruction.GetState(processContext) + if err != nil { + return err + } + + stateType := state.Type() + stateHandler := s.GetStateHandler(stateType) + if stateHandler == nil { + return errors.New("Not support [" + stateType + "] state handler") + } + + interceptAbleStateHandler, ok := stateHandler.(InterceptAbleStateHandler) + + var stateHandlerInterceptorList []StateHandlerInterceptor + if ok { + stateHandlerInterceptorList = interceptAbleStateHandler.StateHandlerInterceptorList() + } + + if stateHandlerInterceptorList != nil && len(stateHandlerInterceptorList) > 0 { + for _, stateHandlerInterceptor := range stateHandlerInterceptorList { + err = stateHandlerInterceptor.PreProcess(ctx, processContext) + if err != nil { + return err + } + } + } + + err = stateHandler.Process(ctx, processContext) + if err != nil { + return err + } + + if stateHandlerInterceptorList != nil && len(stateHandlerInterceptorList) > 0 { + for _, stateHandlerInterceptor := range stateHandlerInterceptorList { + err = stateHandlerInterceptor.PostProcess(ctx, processContext) + if err != nil { + return err + } + } + } + + return nil +} + +func (s *StateMachineProcessHandler) GetStateHandler(stateType string) StateHandler { + s.mu.RLock() + defer s.mu.RUnlock() + return s.mp[stateType] +} + +func (s *StateMachineProcessHandler) RegistryStateHandler(stateType string, stateHandler StateHandler) { + s.mu.Lock() + defer s.mu.Unlock() + if s.mp == nil { + s.mp = make(map[string]StateHandler) + } + s.mp[stateType] = stateHandler +} + +type StateMachineRouterHandler struct { + mu sync.RWMutex + mp map[string]StateRouter +} + +func (s *StateMachineRouterHandler) Route(ctx context.Context, processContext ProcessContext) error { + stateInstruction, _ := processContext.GetInstruction().(instruction.StateInstruction) + + state, err := stateInstruction.GetState(processContext) + if err != nil { + return err + } + + stateType := state.Type() + stateRouter := s.GetStateRouter(stateType) + if stateRouter == nil { + return errors.New("Not support [" + stateType + "] state router") + } + + return stateRouter.Route(ctx, processContext) +} + +func (s *StateMachineRouterHandler) GetStateRouter(stateType string) StateRouter { + s.mu.RLock() + defer s.mu.RUnlock() + return s.mp[stateType] +} + +func (s *StateMachineRouterHandler) RegistryStateRouter(stateType string, stateRouter StateRouter) { + s.mu.Lock() + defer s.mu.Unlock() + if s.mp == nil { + s.mp = make(map[string]StateRouter) + } + s.mp[stateType] = stateRouter +} diff --git a/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go b/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go new file mode 100644 index 00000000..4c115df1 --- /dev/null +++ b/pkg/saga/statemachine/engine/process_ctrl_statemachine_engine.go @@ -0,0 +1,124 @@ +package engine + +import ( + "context" + "github.com/pkg/errors" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/events" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl/instruction" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "time" +) + +type ProcessCtrlStateMachineEngine struct { + StateMachineConfig StateMachineConfig +} + +func (p ProcessCtrlStateMachineEngine) Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) { + return p.startInternal(ctx, stateMachineName, tenantId, "", startParams, false, nil) +} + +func (p ProcessCtrlStateMachineEngine) startInternal(ctx context.Context, stateMachineName string, tenantId string, businessKey string, startParams map[string]interface{}, async bool, callback CallBack) (statelang.StateMachineInstance, error) { + if tenantId == "" { + tenantId = p.StateMachineConfig.DefaultTenantId() + } + + stateMachineInstance, err := p.createMachineInstance(stateMachineName, tenantId, businessKey, startParams) + if err != nil { + return nil, err + } + + // Build the process_ctrl context. + processContextBuilder := process_ctrl.NewProcessContextBuilder(). + WithProcessType(process_ctrl.StateLang). + WithOperationName(OperationNameStart). + WithAsyncCallback(callback). + WithInstruction(instruction.NewStateInstruction(stateMachineName, tenantId)). + WithStateMachineInstance(stateMachineInstance). + WithStateMachineConfig(p.StateMachineConfig). + WithStateMachineEngine(p). + WithIsAsyncExecution(async) + + contextMap := p.copyMap(startParams) + + stateMachineInstance.SetContext(contextMap) + + processContext := processContextBuilder.WithStateMachineContextVariables(contextMap).Build() + + if stateMachineInstance.StateMachine().IsPersist() && p.StateMachineConfig.StateLogStore() != nil { + err := p.StateMachineConfig.StateLogStore().RecordStateMachineStarted(ctx, stateMachineInstance, processContext) + if err != nil { + return nil, err + } + } + + if stateMachineInstance.ID() == "" { + stateMachineInstance.SetID(p.StateMachineConfig.SeqGenerator().GenerateId(SeqEntityStateMachineInst, "")) + } + + var eventPublisher events.EventPublisher + if async { + eventPublisher = p.StateMachineConfig.AsyncEventPublisher() + } else { + eventPublisher = p.StateMachineConfig.EventPublisher() + } + + _, err = eventPublisher.PushEvent(ctx, processContext) + if err != nil { + return nil, err + } + + return stateMachineInstance, nil +} + +// copyMap not deep copy, so best practice: Don’t pass by reference +func (p ProcessCtrlStateMachineEngine) copyMap(startParams map[string]interface{}) map[string]interface{} { + copyMap := make(map[string]interface{}, len(startParams)) + for k, v := range startParams { + copyMap[k] = v + } + return copyMap +} + +func (p ProcessCtrlStateMachineEngine) createMachineInstance(stateMachineName string, tenantId string, businessKey string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) { + stateMachine, err := p.StateMachineConfig.StateMachineRepository().GetLastVersionStateMachine(stateMachineName, tenantId) + if err != nil { + return nil, err + } + + if stateMachine == nil { + return nil, errors.New("StateMachine [" + stateMachineName + "] is not exists") + } + + stateMachineInstance := statelang.NewStateMachineInstanceImpl() + stateMachineInstance.SetStateMachine(stateMachine) + stateMachineInstance.SetTenantID(tenantId) + stateMachineInstance.SetBusinessKey(businessKey) + stateMachineInstance.SetStartParams(startParams) + if startParams != nil { + if businessKey != "" { + startParams[VarNameBusinesskey] = businessKey + } + + if startParams[VarNameParentId] != nil { + parentId, ok := startParams[VarNameParentId].(string) + if !ok { + + } + stateMachineInstance.SetParentID(parentId) + delete(startParams, VarNameParentId) + } + } + + stateMachineInstance.SetStatus(statelang.RU) + stateMachineInstance.SetRunning(true) + + now := time.Now() + stateMachineInstance.SetStartedTime(now) + stateMachineInstance.SetUpdatedTime(now) + return stateMachineInstance, nil +} + +func NewProcessCtrlStateMachineEngine(stateMachineConfig StateMachineConfig) *ProcessCtrlStateMachineEngine { + return &ProcessCtrlStateMachineEngine{StateMachineConfig: stateMachineConfig} +} diff --git a/pkg/saga/statemachine/engine/sequence/sequence.go b/pkg/saga/statemachine/engine/sequence/sequence.go new file mode 100644 index 00000000..1a7fdae7 --- /dev/null +++ b/pkg/saga/statemachine/engine/sequence/sequence.go @@ -0,0 +1,5 @@ +package sequence + +type SeqGenerator interface { + GenerateId(entity string, ruleName string) string +} diff --git a/pkg/saga/statemachine/engine/sequence/uuid.go b/pkg/saga/statemachine/engine/sequence/uuid.go new file mode 100644 index 00000000..f3060dcf --- /dev/null +++ b/pkg/saga/statemachine/engine/sequence/uuid.go @@ -0,0 +1,14 @@ +package sequence + +import "github.com/google/uuid" + +type UUIDSeqGenerator struct { +} + +func NewUUIDSeqGenerator() *UUIDSeqGenerator { + return &UUIDSeqGenerator{} +} + +func (U UUIDSeqGenerator) GenerateId(entity string, ruleName string) string { + return uuid.New().String() +} diff --git a/pkg/saga/statemachine/engine/statemachine_config.go b/pkg/saga/statemachine/engine/statemachine_config.go new file mode 100644 index 00000000..3f3c530b --- /dev/null +++ b/pkg/saga/statemachine/engine/statemachine_config.go @@ -0,0 +1,44 @@ +package engine + +import ( + "github.com/seata/seata-go/pkg/saga/statemachine/engine/events" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/expr" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/invoker" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/sequence" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/status_decision" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/store" +) + +type StateMachineConfig interface { + StateLogRepository() store.StateLogRepository + + StateMachineRepository() store.StateMachineRepository + + StateLogStore() store.StateLogStore + + StateLangStore() store.StateLangStore + + ExpressionFactoryManager() expr.ExpressionFactoryManager + + ExpressionResolver() expr.ExpressionResolver + + SeqGenerator() sequence.SeqGenerator + + StatusDecisionStrategy() status_decision.StatusDecisionStrategy + + EventPublisher() events.EventPublisher + + AsyncEventPublisher() events.EventPublisher + + ServiceInvokerManager() invoker.ServiceInvokerManager + + ScriptInvokerManager() invoker.ScriptInvokerManager + + CharSet() string + + DefaultTenantId() string + + TransOperationTimeout() int + + ServiceInvokeTimeout() int +} diff --git a/pkg/saga/statemachine/engine/statemachine_engine.go b/pkg/saga/statemachine/engine/statemachine_engine.go new file mode 100644 index 00000000..6b3954c1 --- /dev/null +++ b/pkg/saga/statemachine/engine/statemachine_engine.go @@ -0,0 +1,16 @@ +package engine + +import ( + "context" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" +) + +type StateMachineEngine interface { + Start(ctx context.Context, stateMachineName string, tenantId string, startParams map[string]interface{}) (statelang.StateMachineInstance, error) +} + +type CallBack interface { + OnFinished(ctx context.Context, context process_ctrl.ProcessContext, stateMachineInstance statelang.StateMachineInstance) + OnError(ctx context.Context, context process_ctrl.ProcessContext, stateMachineInstance statelang.StateMachineInstance, err error) +} diff --git a/pkg/saga/statemachine/engine/status_decision/status_decision.go b/pkg/saga/statemachine/engine/status_decision/status_decision.go new file mode 100644 index 00000000..6def093e --- /dev/null +++ b/pkg/saga/statemachine/engine/status_decision/status_decision.go @@ -0,0 +1,4 @@ +package status_decision + +type StatusDecisionStrategy interface { +} diff --git a/pkg/saga/statemachine/engine/store/statemachine_store.go b/pkg/saga/statemachine/engine/store/statemachine_store.go new file mode 100644 index 00000000..8959df92 --- /dev/null +++ b/pkg/saga/statemachine/engine/store/statemachine_store.go @@ -0,0 +1,60 @@ +package store + +import ( + "context" + "github.com/seata/seata-go/pkg/saga/statemachine/engine/process_ctrl" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "io" +) + +type StateLogRepository interface { + GetStateMachineInstance(stateMachineInstanceId string) (statelang.StateInstance, error) + + GetStateMachineInstanceByBusinessKey(businessKey string, tenantId string) (statelang.StateInstance, error) + + GetStateMachineInstanceByParentId(parentId string) ([]statelang.StateMachineInstance, error) + + GetStateInstance(stateInstanceId string, stateMachineInstanceId string) (statelang.StateInstance, error) + + GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string) ([]statelang.StateInstance, error) +} + +type StateLogStore interface { + RecordStateMachineStarted(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error + + RecordStateMachineFinished(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error + + RecordStateMachineRestarted(ctx context.Context, machineInstance statelang.StateMachineInstance, context process_ctrl.ProcessContext) error + + RecordStateStarted(ctx context.Context, stateInstance statelang.StateInstance, context process_ctrl.ProcessContext) error + + RecordStateFinished(ctx context.Context, stateInstance statelang.StateInstance, context process_ctrl.ProcessContext) error + + GetStateMachineInstance(stateMachineInstanceId string) (statelang.StateInstance, error) + + GetStateMachineInstanceByBusinessKey(businessKey string, tenantId string) (statelang.StateInstance, error) + + GetStateMachineInstanceByParentId(parentId string) ([]statelang.StateMachineInstance, error) + + GetStateInstance(stateInstanceId string, stateMachineInstanceId string) (statelang.StateInstance, error) + + GetStateInstanceListByMachineInstanceId(stateMachineInstanceId string) ([]statelang.StateInstance, error) +} + +type StateMachineRepository interface { + GetStateMachineById(stateMachineId string) (statelang.StateMachine, error) + + GetLastVersionStateMachine(stateMachineName string, tenantId string) (statelang.StateMachine, error) + + RegistryStateMachine(statelang.StateMachine) error + + RegistryStateMachineByReader(reader io.Reader) error +} + +type StateLangStore interface { + GetStateMachineById(stateMachineId string) (statelang.StateMachine, error) + + GetLastVersionStateMachine(stateMachineName string, tenantId string) (statelang.StateMachine, error) + + StoreStateMachine(stateMachine statelang.StateMachine) error +} diff --git a/pkg/saga/statemachine/statelang/parser/choice_state_json_parser.go b/pkg/saga/statemachine/statelang/parser/choice_state_json_parser.go new file mode 100644 index 00000000..04729c57 --- /dev/null +++ b/pkg/saga/statemachine/statelang/parser/choice_state_json_parser.go @@ -0,0 +1,72 @@ +package parser + +import ( + "fmt" + "github.com/pkg/errors" + "github.com/seata/seata-go/pkg/saga/statemachine/engine" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang/state" +) + +type ChoiceStateParser struct { + BaseStateParser +} + +func NewChoiceStateParser() *ChoiceStateParser { + return &ChoiceStateParser{} +} + +func (c ChoiceStateParser) StateType() string { + return engine.StateTypeChoice +} + +func (c ChoiceStateParser) Parse(stateName string, stateMap map[string]interface{}) (statelang.State, error) { + choiceState := state.NewChoiceStateImpl() + choiceState.SetName(stateName) + + //parse Type + typeName, err := c.GetString(stateName, stateMap, "Type") + if err != nil { + return nil, err + } + choiceState.SetType(typeName) + + //parse Default + defaultChoice, err := c.GetString(stateName, stateMap, "Default") + if err != nil { + return nil, err + } + choiceState.SetDefault(defaultChoice) + + //parse Choices + slice, err := c.GetSlice(stateName, stateMap, "Choices") + if err != nil { + return nil, err + } + + var choices []state.Choice + for i := range slice { + choiceValMap, ok := slice[i].(map[string]interface{}) + if !ok { + return nil, errors.New(fmt.Sprintf("State [%s] Choices element required struct", stateName)) + } + + choice := state.NewChoiceImpl() + expression, err := c.GetString(stateName, choiceValMap, "Expression") + if err != nil { + return nil, err + } + choice.SetExpression(expression) + + next, err := c.GetString(stateName, choiceValMap, "Next") + if err != nil { + return nil, err + } + choice.SetNext(next) + + choices = append(choices, choice) + } + choiceState.SetChoices(choices) + + return choiceState, nil +} diff --git a/pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go b/pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go new file mode 100644 index 00000000..c0977034 --- /dev/null +++ b/pkg/saga/statemachine/statelang/parser/statemachine_json_parser.go @@ -0,0 +1,100 @@ +package parser + +import ( + "encoding/json" + "github.com/pkg/errors" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" +) + +type JSONStateMachineParser struct { +} + +func NewJSONStateMachineParser() *JSONStateMachineParser { + return &JSONStateMachineParser{} +} + +func (stateMachineParser JSONStateMachineParser) GetType() string { + return "JSON" +} + +func (stateMachineParser JSONStateMachineParser) Parse(content string) (statelang.StateMachine, error) { + var stateMachineJsonObject StateMachineJsonObject + + err := json.Unmarshal([]byte(content), &stateMachineJsonObject) + if err != nil { + return nil, err + } + + stateMachine := statelang.NewStateMachineImpl() + stateMachine.SetName(stateMachineJsonObject.Name) + stateMachine.SetComment(stateMachineJsonObject.Comment) + stateMachine.SetVersion(stateMachineJsonObject.Version) + stateMachine.SetStartState(stateMachineJsonObject.StartState) + stateMachine.SetPersist(stateMachineJsonObject.Persist) + + if stateMachineJsonObject.Type != "" { + stateMachine.SetType(stateMachineJsonObject.Type) + } + + if stateMachineJsonObject.RecoverStrategy != "" { + recoverStrategy, ok := statelang.ValueOfRecoverStrategy(stateMachineJsonObject.RecoverStrategy) + if !ok { + return nil, errors.New("Not support " + stateMachineJsonObject.RecoverStrategy) + } + stateMachine.SetRecoverStrategy(recoverStrategy) + } + + stateParserFactory := NewDefaultStateParserFactory() + stateParserFactory.InitDefaultStateParser() + for stateName, v := range stateMachineJsonObject.States { + stateMap, ok := v.(map[string]interface{}) + if !ok { + return nil, errors.New("State [" + stateName + "] scheme illegal, required map") + } + + stateType, ok := stateMap["Type"].(string) + if !ok { + return nil, errors.New("State [" + stateName + "] Type illegal, required string") + } + + //stateMap + stateParser := stateParserFactory.GetStateParser(stateType) + if stateParser == nil { + return nil, errors.New("State Type [" + stateType + "] is not support") + } + + _, stateExist := stateMachine.States()[stateName] + if stateExist { + return nil, errors.New("State [name:" + stateName + "] already exists") + } + + state, err := stateParser.Parse(stateName, stateMap) + if err != nil { + return nil, err + } + + state.SetStateMachine(stateMachine) + stateMachine.States()[stateName] = state + } + + //TODO setCompensateState + //for stateName, state := range stateMachine.GetStates() { + // + //} + // + + return stateMachine, nil +} + +type StateMachineJsonObject struct { + Name string `json:"Name"` + Comment string `json:"Comment"` + Version string `json:"Version"` + StartState string `json:"StartState"` + RecoverStrategy string `json:"RecoverStrategy"` + Persist bool `json:"IsPersist"` + RetryPersistModeUpdate bool `json:"IsRetryPersistModeUpdate"` + CompensatePersistModeUpdate bool `json:"IsCompensatePersistModeUpdate"` + Type string `json:"Type"` + States map[string]interface{} `json:"States"` +} diff --git a/pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go b/pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go new file mode 100644 index 00000000..c6d00b75 --- /dev/null +++ b/pkg/saga/statemachine/statelang/parser/statemachine_json_parser_test.go @@ -0,0 +1,13 @@ +package parser + +import ( + "testing" +) + +func TestParseChoice(t *testing.T) { + var content = "{\n \"Name\":\"ChoiceTest\",\n \"Comment\":\"ChoiceTest\",\n \"StartState\":\"ChoiceState\",\n \"Version\":\"0.0.1\",\n \"States\":{\n \"ChoiceState\":{\n \"Type\":\"Choice\",\n \"Choices\":[\n {\n \"Expression\":\"[a] == 1\",\n \"Next\":\"SecondState\"\n },\n {\n \"Expression\":\"[a] == 2\",\n \"Next\":\"ThirdState\"\n }\n ],\n \"Default\":\"Fail\"\n }\n }\n}" + _, err := NewJSONStateMachineParser().Parse(content) + if err != nil { + t.Error("parse fail: " + err.Error()) + } +} diff --git a/pkg/saga/statemachine/statelang/parser/statemachine_parser.go b/pkg/saga/statemachine/statelang/parser/statemachine_parser.go new file mode 100644 index 00000000..99ee95e1 --- /dev/null +++ b/pkg/saga/statemachine/statelang/parser/statemachine_parser.go @@ -0,0 +1,104 @@ +package parser + +import ( + "github.com/pkg/errors" + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + "sync" +) + +type StateMachineParser interface { + GetType() string + Parse(content string) (statelang.StateMachine, error) +} + +type StateParser interface { + StateType() string + Parse(stateName string, stateMap map[string]interface{}) (statelang.State, error) +} + +type BaseStateParser struct { +} + +func (b BaseStateParser) ParseBaseAttributes(stateName string, state statelang.State, stateMap map[string]interface{}) error { + state.SetName(stateName) + + comment, err := b.GetString(stateName, stateMap, "Comment") + if err != nil { + return err + } + state.SetComment(comment) + + next, err := b.GetString(stateName, stateMap, "Next") + if err != nil { + return err + } + + state.SetNext(next) + return nil +} + +func (b BaseStateParser) GetString(stateName string, stateMap map[string]interface{}, key string) (string, error) { + value := stateMap[key] + if value == nil { + var result string + return result, errors.New("State [" + stateName + "] " + key + " not exist") + } + + valueAsString, ok := value.(string) + if !ok { + var s string + return s, errors.New("State [" + stateName + "] " + key + " illegal, required string") + } + return valueAsString, nil +} + +func (b BaseStateParser) GetSlice(stateName string, stateMap map[string]interface{}, key string) ([]interface{}, error) { + value := stateMap[key] + + if value == nil { + var result []interface{} + return result, errors.New("State [" + stateName + "] " + key + " not exist") + } + + valueAsSlice, ok := value.([]interface{}) + if !ok { + var result []interface{} + return result, errors.New("State [" + stateName + "] " + key + " illegal, required slice") + } + return valueAsSlice, nil +} + +type StateParserFactory interface { + RegistryStateParser(stateType string, stateParser StateParser) + + GetStateParser(stateType string) StateParser +} + +type DefaultStateParserFactory struct { + stateParserMap map[string]StateParser + mutex sync.Mutex +} + +func NewDefaultStateParserFactory() *DefaultStateParserFactory { + var stateParserMap map[string]StateParser = make(map[string]StateParser) + return &DefaultStateParserFactory{ + stateParserMap: stateParserMap, + } +} + +// InitDefaultStateParser init StateParser by default +func (d *DefaultStateParserFactory) InitDefaultStateParser() { + choiceStateParser := NewChoiceStateParser() + + d.RegistryStateParser(choiceStateParser.StateType(), choiceStateParser) +} + +func (d *DefaultStateParserFactory) RegistryStateParser(stateType string, stateParser StateParser) { + d.mutex.Lock() + defer d.mutex.Unlock() + d.stateParserMap[stateType] = stateParser +} + +func (d *DefaultStateParserFactory) GetStateParser(stateType string) StateParser { + return d.stateParserMap[stateType] +} diff --git a/pkg/saga/statemachine/statelang/state.go b/pkg/saga/statemachine/statelang/state.go new file mode 100644 index 00000000..1e490ac0 --- /dev/null +++ b/pkg/saga/statemachine/statelang/state.go @@ -0,0 +1,71 @@ +package statelang + +type State interface { + Name() string + + SetName(name string) + + Comment() string + + SetComment(comment string) + + Type() string + + SetType(typeName string) + + Next() string + + SetNext(next string) + + StateMachine() StateMachine + + SetStateMachine(machine StateMachine) +} + +type BaseState struct { + name string `alias:"Name"` + comment string `alias:"Comment"` + typeName string `alias:"Type"` + next string `alias:"Next"` + stateMachine StateMachine +} + +func (b *BaseState) Name() string { + return b.name +} + +func (b *BaseState) SetName(name string) { + b.name = name +} + +func (b *BaseState) Comment() string { + return b.comment +} + +func (b *BaseState) SetComment(comment string) { + b.comment = comment +} + +func (b *BaseState) Type() string { + return b.typeName +} + +func (b *BaseState) SetType(typeName string) { + b.typeName = typeName +} + +func (b *BaseState) Next() string { + return b.next +} + +func (b *BaseState) SetNext(next string) { + b.next = next +} + +func (b *BaseState) StateMachine() StateMachine { + return b.stateMachine +} + +func (b *BaseState) SetStateMachine(machine StateMachine) { + b.stateMachine = machine +} diff --git a/pkg/saga/statemachine/statelang/state/choice_state.go b/pkg/saga/statemachine/statelang/state/choice_state.go new file mode 100644 index 00000000..e1cd973c --- /dev/null +++ b/pkg/saga/statemachine/statelang/state/choice_state.go @@ -0,0 +1,74 @@ +package state + +import "github.com/seata/seata-go/pkg/saga/statemachine/statelang" + +type ChoiceState interface { + statelang.State + + Choices() []Choice + + Default() string +} + +type Choice interface { + Expression() string + + SetExpression(expression string) + + Next() string + + SetNext(next string) +} + +type ChoiceStateImpl struct { + statelang.BaseState + defaultChoice string `alias:"Default"` + choices []Choice `alias:"Choices"` +} + +func NewChoiceStateImpl() *ChoiceStateImpl { + return &ChoiceStateImpl{ + choices: make([]Choice, 0), + } +} + +func (choiceState *ChoiceStateImpl) Default() string { + return choiceState.defaultChoice +} + +func (choiceState *ChoiceStateImpl) Choices() []Choice { + return choiceState.choices +} + +func (choiceState *ChoiceStateImpl) SetDefault(defaultChoice string) { + choiceState.defaultChoice = defaultChoice +} + +func (choiceState *ChoiceStateImpl) SetChoices(choices []Choice) { + choiceState.choices = choices +} + +type ChoiceImpl struct { + expression string + next string +} + +func NewChoiceImpl() *ChoiceImpl { + return &ChoiceImpl{} +} + +func (c *ChoiceImpl) Expression() string { + return c.expression +} + +func (c *ChoiceImpl) SetExpression(expression string) { + c.expression = expression +} + +func (c *ChoiceImpl) Next() string { + return c.next +} + +func (c *ChoiceImpl) SetNext(next string) { + c.next = next +} diff --git a/pkg/saga/statemachine/statelang/state/task_state.go b/pkg/saga/statemachine/statelang/state/task_state.go new file mode 100644 index 00000000..a9164d35 --- /dev/null +++ b/pkg/saga/statemachine/statelang/state/task_state.go @@ -0,0 +1,30 @@ +package state + +import ( + "github.com/seata/seata-go/pkg/saga/statemachine/statelang" +) + +type TaskState interface { + statelang.State + + CompensateState() string + + Status() map[string]string + + Retry() []Retry +} + +type Retry interface { + ErrorTypeNames() []string + + IntervalSecond() float64 + + MaxAttempt() int + + BackoffRate() float64 +} + +type ServiceTaskState interface { + TaskState + //TODO add serviceTask +} diff --git a/pkg/saga/statemachine/statelang/state_instance.go b/pkg/saga/statemachine/statelang/state_instance.go new file mode 100644 index 00000000..826faca6 --- /dev/null +++ b/pkg/saga/statemachine/statelang/state_instance.go @@ -0,0 +1,330 @@ +package statelang + +import "time" + +type StateInstance interface { + ID() string + + SetID(id string) + + Name() string + + SetName(name string) + + Type() string + + SetType(typeName string) + + ServiceName() string + + SetServiceName(serviceName string) + + ServiceMethod() string + + SetServiceMethod(serviceMethod string) + + ServiceType() string + + SetServiceType(serviceType string) + + BusinessKey() string + + SetBusinessKey(businessKey string) + + StartedTime() time.Time + + SetStartedTime(startedTime time.Time) + + UpdatedTime() time.Time + + SetUpdatedTime(updateTime time.Time) + + EndTime() time.Time + + SetEndTime(endTime time.Time) + + IsForUpdate() bool + + SetForUpdate(forUpdate bool) + + Error() error + + SetError(err error) + + InputParams() interface{} + + SetInputParams(inputParams interface{}) + + OutputParams() interface{} + + SetOutputParams(outputParams interface{}) + + Status() ExecutionStatus + + SetStatus(status ExecutionStatus) + + StateIDCompensatedFor() string + + SetStateIDCompensatedFor(stateIdCompensatedFor string) + + StateIDRetriedFor() string + + SetStateIDRetriedFor(stateIdRetriedFor string) + + CompensationState() StateInstance + + SetCompensationState(compensationState StateInstance) + + StateMachineInstance() StateMachineInstance + + SetStateMachineInstance(stateMachineInstance StateMachineInstance) + + IsIgnoreStatus() bool + + SetIgnoreStatus(ignoreStatus bool) + + IsForCompensation() bool + + SerializedInputParams() interface{} + + SetSerializedInputParams(serializedInputParams interface{}) + + SerializedOutputParams() interface{} + + SetSerializedOutputParams(serializedOutputParams interface{}) + + SerializedError() interface{} + + SetSerializedError(serializedErr interface{}) + + CompensationStatus() ExecutionStatus +} + +type StateInstanceImpl struct { + id string + machineInstanceId string + name string + typeName string + serviceName string + serviceMethod string + serviceType string + businessKey string + startedTime time.Time + updatedTime time.Time + endTime time.Time + isForUpdate bool + err error + serializedErr interface{} + inputParams interface{} + serializedInputParams interface{} + outputParams interface{} + serializedOutputParams interface{} + status ExecutionStatus + stateIdCompensatedFor string + stateIdRetriedFor string + compensationState StateInstance + stateMachineInstance StateMachineInstance + ignoreStatus bool +} + +func NewStateInstanceImpl() *StateInstanceImpl { + return &StateInstanceImpl{} +} + +func (s *StateInstanceImpl) ID() string { + return s.id +} + +func (s *StateInstanceImpl) SetID(id string) { + s.id = id +} + +func (s *StateInstanceImpl) Name() string { + return s.name +} + +func (s *StateInstanceImpl) SetName(name string) { + s.name = name +} + +func (s *StateInstanceImpl) Type() string { + return s.typeName +} + +func (s *StateInstanceImpl) SetType(typeName string) { + s.typeName = typeName +} + +func (s *StateInstanceImpl) ServiceName() string { + return s.serviceName +} + +func (s *StateInstanceImpl) SetServiceName(serviceName string) { + s.serviceName = serviceName +} + +func (s *StateInstanceImpl) ServiceMethod() string { + return s.serviceMethod +} + +func (s *StateInstanceImpl) SetServiceMethod(serviceMethod string) { + s.serviceMethod = serviceMethod +} + +func (s *StateInstanceImpl) ServiceType() string { + return s.serviceType +} + +func (s *StateInstanceImpl) SetServiceType(serviceType string) { + s.serviceType = serviceType +} + +func (s *StateInstanceImpl) BusinessKey() string { + return s.businessKey +} + +func (s *StateInstanceImpl) SetBusinessKey(businessKey string) { + s.businessKey = businessKey +} + +func (s *StateInstanceImpl) StartedTime() time.Time { + return s.startedTime +} + +func (s *StateInstanceImpl) SetStartedTime(startedTime time.Time) { + s.startedTime = startedTime +} + +func (s *StateInstanceImpl) UpdatedTime() time.Time { + return s.updatedTime +} + +func (s *StateInstanceImpl) SetUpdatedTime(updatedTime time.Time) { + s.updatedTime = updatedTime +} + +func (s *StateInstanceImpl) EndTime() time.Time { + return s.endTime +} + +func (s *StateInstanceImpl) SetEndTime(endTime time.Time) { + s.endTime = endTime +} + +func (s *StateInstanceImpl) IsForUpdate() bool { + return s.isForUpdate +} + +func (s *StateInstanceImpl) SetForUpdate(forUpdate bool) { + s.isForUpdate = forUpdate +} + +func (s *StateInstanceImpl) Error() error { + return s.err +} + +func (s *StateInstanceImpl) SetError(err error) { + s.err = err +} + +func (s *StateInstanceImpl) InputParams() interface{} { + return s.inputParams +} + +func (s *StateInstanceImpl) SetInputParams(inputParams interface{}) { + s.inputParams = inputParams +} + +func (s *StateInstanceImpl) OutputParams() interface{} { + return s.outputParams +} + +func (s *StateInstanceImpl) SetOutputParams(outputParams interface{}) { + s.outputParams = outputParams +} + +func (s *StateInstanceImpl) Status() ExecutionStatus { + return s.status +} + +func (s *StateInstanceImpl) SetStatus(status ExecutionStatus) { + s.status = status +} + +func (s *StateInstanceImpl) StateIDCompensatedFor() string { + return s.stateIdCompensatedFor +} + +func (s *StateInstanceImpl) SetStateIDCompensatedFor(stateIdCompensatedFor string) { + s.stateIdCompensatedFor = stateIdCompensatedFor +} + +func (s *StateInstanceImpl) StateIDRetriedFor() string { + return s.stateIdRetriedFor +} + +func (s *StateInstanceImpl) SetStateIDRetriedFor(stateIdRetriedFor string) { + s.stateIdRetriedFor = stateIdRetriedFor +} + +func (s *StateInstanceImpl) CompensationState() StateInstance { + return s.compensationState +} + +func (s *StateInstanceImpl) SetCompensationState(compensationState StateInstance) { + s.compensationState = compensationState +} + +func (s *StateInstanceImpl) StateMachineInstance() StateMachineInstance { + return s.stateMachineInstance +} + +func (s *StateInstanceImpl) SetStateMachineInstance(stateMachineInstance StateMachineInstance) { + s.stateMachineInstance = stateMachineInstance +} + +func (s *StateInstanceImpl) IsIgnoreStatus() bool { + return s.ignoreStatus +} + +func (s *StateInstanceImpl) SetIgnoreStatus(ignoreStatus bool) { + s.ignoreStatus = ignoreStatus +} + +func (s *StateInstanceImpl) IsForCompensation() bool { + return s.stateIdCompensatedFor == "" +} + +func (s *StateInstanceImpl) SerializedInputParams() interface{} { + return s.serializedInputParams +} + +func (s *StateInstanceImpl) SetSerializedInputParams(serializedInputParams interface{}) { + s.serializedInputParams = serializedInputParams +} + +func (s *StateInstanceImpl) SerializedOutputParams() interface{} { + return s.serializedOutputParams +} + +func (s *StateInstanceImpl) SetSerializedOutputParams(serializedOutputParams interface{}) { + s.serializedOutputParams = serializedOutputParams +} + +func (s *StateInstanceImpl) SerializedError() interface{} { + return s.serializedErr +} + +func (s *StateInstanceImpl) SetSerializedError(serializedErr interface{}) { + s.serializedErr = serializedErr +} + +func (s *StateInstanceImpl) CompensationStatus() ExecutionStatus { + if s.compensationState != nil { + return s.compensationState.Status() + } + + //return nil ExecutionStatus + var status ExecutionStatus + return status +} diff --git a/pkg/saga/statemachine/statelang/statemachine.go b/pkg/saga/statemachine/statelang/statemachine.go new file mode 100644 index 00000000..8b13f8d0 --- /dev/null +++ b/pkg/saga/statemachine/statelang/statemachine.go @@ -0,0 +1,261 @@ +package statelang + +import ( + "time" +) + +type StateMachineStatus string + +const ( + Active StateMachineStatus = "Active" + Inactive StateMachineStatus = "Inactive" +) + +// RecoverStrategy : Recover Strategy +type RecoverStrategy string + +const ( + //Compensate stateMachine + Compensate RecoverStrategy = "Compensate" + // Forward stateMachine + Forward RecoverStrategy = "Forward" +) + +func ValueOfRecoverStrategy(recoverStrategy string) (RecoverStrategy, bool) { + switch recoverStrategy { + case "Compensate": + return Compensate, true + case "Forward": + return Forward, true + default: + var recoverStrategy RecoverStrategy + return recoverStrategy, false + } +} + +type StateMachine interface { + ID() string + + SetID(id string) + + Name() string + + SetName(name string) + + Comment() string + + SetComment(comment string) + + StartState() string + + SetStartState(startState string) + + Version() string + + SetVersion(version string) + + States() map[string]State + + State(stateName string) State + + TenantId() string + + SetTenantId(tenantId string) + + AppName() string + + SetAppName(appName string) + + Type() string + + SetType(typeName string) + + Status() StateMachineStatus + + SetStatus(status StateMachineStatus) + + RecoverStrategy() RecoverStrategy + + SetRecoverStrategy(recoverStrategy RecoverStrategy) + + IsPersist() bool + + SetPersist(persist bool) + + IsRetryPersistModeUpdate() bool + + SetRetryPersistModeUpdate(retryPersistModeUpdate bool) + + IsCompensatePersistModeUpdate() bool + + SetCompensatePersistModeUpdate(compensatePersistModeUpdate bool) + + Content() string + + SetContent(content string) + + CreateTime() time.Time + + SetCreateTime(createTime time.Time) +} + +type StateMachineImpl struct { + id string + tenantId string + appName string + name string + comment string + version string + startState string + status StateMachineStatus + recoverStrategy RecoverStrategy + persist bool + retryPersistModeUpdate bool + compensatePersistModeUpdate bool + typeName string + content string + createTime time.Time + states map[string]State +} + +func NewStateMachineImpl() *StateMachineImpl { + stateMap := make(map[string]State) + return &StateMachineImpl{ + appName: "SEATA", + status: Active, + typeName: "STATE_LANG", + states: stateMap, + } +} + +func (s *StateMachineImpl) ID() string { + return s.id +} + +func (s *StateMachineImpl) SetID(id string) { + s.id = id +} + +func (s *StateMachineImpl) Name() string { + return s.name +} + +func (s *StateMachineImpl) SetName(name string) { + s.name = name +} + +func (s *StateMachineImpl) SetComment(comment string) { + s.comment = comment +} + +func (s *StateMachineImpl) Comment() string { + return s.comment +} + +func (s *StateMachineImpl) StartState() string { + return s.startState +} + +func (s *StateMachineImpl) SetStartState(startState string) { + s.startState = startState +} + +func (s *StateMachineImpl) Version() string { + return s.version +} + +func (s *StateMachineImpl) SetVersion(version string) { + s.version = version +} + +func (s *StateMachineImpl) States() map[string]State { + return s.states +} + +func (s *StateMachineImpl) State(stateName string) State { + if s.states == nil { + return nil + } + + return s.states[stateName] +} + +func (s *StateMachineImpl) TenantId() string { + return s.tenantId +} + +func (s *StateMachineImpl) SetTenantId(tenantId string) { + s.tenantId = tenantId +} + +func (s *StateMachineImpl) AppName() string { + return s.appName +} + +func (s *StateMachineImpl) SetAppName(appName string) { + s.appName = appName +} + +func (s *StateMachineImpl) Type() string { + return s.typeName +} + +func (s *StateMachineImpl) SetType(typeName string) { + s.typeName = typeName +} + +func (s *StateMachineImpl) Status() StateMachineStatus { + return s.status +} + +func (s *StateMachineImpl) SetStatus(status StateMachineStatus) { + s.status = status +} + +func (s *StateMachineImpl) RecoverStrategy() RecoverStrategy { + return s.recoverStrategy +} + +func (s *StateMachineImpl) SetRecoverStrategy(recoverStrategy RecoverStrategy) { + s.recoverStrategy = recoverStrategy +} + +func (s *StateMachineImpl) IsPersist() bool { + return s.persist +} + +func (s *StateMachineImpl) SetPersist(persist bool) { + s.persist = persist +} + +func (s *StateMachineImpl) IsRetryPersistModeUpdate() bool { + return s.retryPersistModeUpdate +} + +func (s *StateMachineImpl) SetRetryPersistModeUpdate(retryPersistModeUpdate bool) { + s.retryPersistModeUpdate = retryPersistModeUpdate +} + +func (s *StateMachineImpl) IsCompensatePersistModeUpdate() bool { + return s.compensatePersistModeUpdate +} + +func (s *StateMachineImpl) SetCompensatePersistModeUpdate(compensatePersistModeUpdate bool) { + s.compensatePersistModeUpdate = compensatePersistModeUpdate +} + +func (s *StateMachineImpl) Content() string { + return s.content +} + +func (s *StateMachineImpl) SetContent(content string) { + s.content = content +} + +func (s *StateMachineImpl) CreateTime() time.Time { + return s.createTime +} + +func (s *StateMachineImpl) SetCreateTime(createTime time.Time) { + s.createTime = createTime +} diff --git a/pkg/saga/statemachine/statelang/statemachine_instance.go b/pkg/saga/statemachine/statelang/statemachine_instance.go new file mode 100644 index 00000000..ff039024 --- /dev/null +++ b/pkg/saga/statemachine/statelang/statemachine_instance.go @@ -0,0 +1,316 @@ +package statelang + +import ( + "sync" + "time" +) + +type ExecutionStatus string + +const ( + // RU Running + RU ExecutionStatus = "RU" + // SU Succeed + SU ExecutionStatus = "SU" + // FA Failed + FA ExecutionStatus = "FA" + // UN Unknown + UN ExecutionStatus = "UN" + // SK Skipped + SK ExecutionStatus = "SK" +) + +type StateMachineInstance interface { + ID() string + + SetID(id string) + + MachineID() string + + SetMachineID(machineID string) + + TenantID() string + + SetTenantID(tenantID string) + + ParentID() string + + SetParentID(parentID string) + + StartedTime() time.Time + + SetStartedTime(startedTime time.Time) + + EndTime() time.Time + + SetEndTime(endTime time.Time) + + StateList() []StateInstance + + State(stateId string) StateInstance + + PutState(stateId string, stateInstance StateInstance) + + Status() ExecutionStatus + + SetStatus(status ExecutionStatus) + + CompensationStatus() ExecutionStatus + + SetCompensationStatus(compensationStatus ExecutionStatus) + + IsRunning() bool + + SetRunning(isRunning bool) + + UpdatedTime() time.Time + + SetUpdatedTime(updatedTime time.Time) + + BusinessKey() string + + SetBusinessKey(businessKey string) + + Error() error + + SetError(err error) + + StartParams() map[string]interface{} + + SetStartParams(startParams map[string]interface{}) + + EndParams() map[string]interface{} + + SetEndParams(endParams map[string]interface{}) + + PutContext(key string, value interface{}) + + SetContext(context map[string]interface{}) + + StateMachine() StateMachine + + SetStateMachine(stateMachine StateMachine) + + SerializedStartParams() interface{} + + SetSerializedStartParams(serializedStartParams interface{}) + + SerializedEndParams() interface{} + + SetSerializedEndParams(serializedEndParams interface{}) + + SerializedError() interface{} + + SetSerializedError(serializedError interface{}) +} + +type StateMachineInstanceImpl struct { + id string + machineId string + tenantId string + parentId string + businessKey string + startParams map[string]interface{} + serializedStartParams interface{} + startedTime time.Time + endTime time.Time + updatedTime time.Time + err error + serializedError interface{} + endParams map[string]interface{} + serializedEndParams interface{} + status ExecutionStatus + compensationStatus ExecutionStatus + isRunning bool + context map[string]interface{} + stateMachine StateMachine + stateList []StateInstance + stateMap map[string]StateInstance + + contextMutex sync.RWMutex // Mutex to protect concurrent access to context + stateMutex sync.RWMutex // Mutex to protect concurrent access to stateList and stateMap +} + +func NewStateMachineInstanceImpl() *StateMachineInstanceImpl { + return &StateMachineInstanceImpl{ + startParams: make(map[string]interface{}), + endParams: make(map[string]interface{}), + stateList: make([]StateInstance, 0), + stateMap: make(map[string]StateInstance)} +} + +func (s *StateMachineInstanceImpl) ID() string { + return s.id +} + +func (s *StateMachineInstanceImpl) SetID(id string) { + s.id = id +} + +func (s *StateMachineInstanceImpl) MachineID() string { + return s.machineId +} + +func (s *StateMachineInstanceImpl) SetMachineID(machineID string) { + s.machineId = machineID +} + +func (s *StateMachineInstanceImpl) TenantID() string { + return s.tenantId +} + +func (s *StateMachineInstanceImpl) SetTenantID(tenantID string) { + s.tenantId = tenantID +} + +func (s *StateMachineInstanceImpl) ParentID() string { + return s.parentId +} + +func (s *StateMachineInstanceImpl) SetParentID(parentID string) { + s.parentId = parentID +} + +func (s *StateMachineInstanceImpl) StartedTime() time.Time { + return s.startedTime +} + +func (s *StateMachineInstanceImpl) SetStartedTime(startedTime time.Time) { + s.startedTime = startedTime +} + +func (s *StateMachineInstanceImpl) EndTime() time.Time { + return s.endTime +} + +func (s *StateMachineInstanceImpl) SetEndTime(endTime time.Time) { + s.endTime = endTime +} + +func (s *StateMachineInstanceImpl) StateList() []StateInstance { + return s.stateList +} + +func (s *StateMachineInstanceImpl) State(stateId string) StateInstance { + s.stateMutex.RLock() + defer s.stateMutex.RUnlock() + + return s.stateMap[stateId] +} + +func (s *StateMachineInstanceImpl) PutState(stateId string, stateInstance StateInstance) { + s.stateMutex.Lock() + defer s.stateMutex.Unlock() + + stateInstance.SetStateMachineInstance(s) + s.stateMap[stateId] = stateInstance + s.stateList = append(s.stateList, stateInstance) +} + +func (s *StateMachineInstanceImpl) Status() ExecutionStatus { + return s.status +} + +func (s *StateMachineInstanceImpl) SetStatus(status ExecutionStatus) { + s.status = status +} + +func (s *StateMachineInstanceImpl) CompensationStatus() ExecutionStatus { + return s.compensationStatus +} + +func (s *StateMachineInstanceImpl) SetCompensationStatus(compensationStatus ExecutionStatus) { + s.compensationStatus = compensationStatus +} + +func (s *StateMachineInstanceImpl) IsRunning() bool { + return s.isRunning +} + +func (s *StateMachineInstanceImpl) SetRunning(isRunning bool) { + s.isRunning = isRunning +} + +func (s *StateMachineInstanceImpl) UpdatedTime() time.Time { + return s.updatedTime +} + +func (s *StateMachineInstanceImpl) SetUpdatedTime(updatedTime time.Time) { + s.updatedTime = updatedTime +} + +func (s *StateMachineInstanceImpl) BusinessKey() string { + return s.businessKey +} + +func (s *StateMachineInstanceImpl) SetBusinessKey(businessKey string) { + s.businessKey = businessKey +} + +func (s *StateMachineInstanceImpl) Error() error { + return s.err +} + +func (s *StateMachineInstanceImpl) SetError(err error) { + s.err = err +} + +func (s *StateMachineInstanceImpl) StartParams() map[string]interface{} { + return s.startParams +} + +func (s *StateMachineInstanceImpl) SetStartParams(startParams map[string]interface{}) { + s.startParams = startParams +} + +func (s *StateMachineInstanceImpl) EndParams() map[string]interface{} { + return s.endParams +} + +func (s *StateMachineInstanceImpl) SetEndParams(endParams map[string]interface{}) { + s.endParams = endParams +} + +func (s *StateMachineInstanceImpl) PutContext(key string, value interface{}) { + s.contextMutex.Lock() + defer s.contextMutex.Unlock() + + s.context[key] = value +} + +func (s *StateMachineInstanceImpl) SetContext(context map[string]interface{}) { + s.context = context +} + +func (s *StateMachineInstanceImpl) StateMachine() StateMachine { + return s.stateMachine +} + +func (s *StateMachineInstanceImpl) SetStateMachine(stateMachine StateMachine) { + s.stateMachine = stateMachine + s.machineId = stateMachine.ID() +} + +func (s *StateMachineInstanceImpl) SerializedStartParams() interface{} { + return s.serializedStartParams +} + +func (s *StateMachineInstanceImpl) SetSerializedStartParams(serializedStartParams interface{}) { + s.serializedStartParams = serializedStartParams +} + +func (s *StateMachineInstanceImpl) SerializedEndParams() interface{} { + return s.endParams +} + +func (s *StateMachineInstanceImpl) SetSerializedEndParams(serializedEndParams interface{}) { + s.serializedEndParams = serializedEndParams +} + +func (s *StateMachineInstanceImpl) SerializedError() interface{} { + return s.serializedError +} + +func (s *StateMachineInstanceImpl) SetSerializedError(serializedError interface{}) { + s.serializedError = serializedError +} diff --git a/pkg/util/reflectx/unmarkshaler.go b/pkg/util/reflectx/unmarkshaler.go new file mode 100644 index 00000000..73acb276 --- /dev/null +++ b/pkg/util/reflectx/unmarkshaler.go @@ -0,0 +1,147 @@ +package reflectx + +import ( + "fmt" + "github.com/pkg/errors" + "reflect" + "unicode" +) + +// MapToStruct some state can use this util to parse +// TODO 性能测试,性能差的话,直接去解析,不使用反射 +func MapToStruct(stateName string, obj interface{}, stateMap map[string]interface{}) error { + objVal := reflect.ValueOf(obj) + if objVal.Kind() != reflect.Pointer { + return errors.New(fmt.Sprintf("State [%s] value required a pointer", stateName)) + } + + structValue := objVal.Elem() + if structValue.Kind() != reflect.Struct { + return errors.New(fmt.Sprintf("State [%s] value elem required a struct", stateName)) + } + + structType := structValue.Type() + for key, value := range stateMap { + //Get field, get alias first + field, found := getField(structType, key) + if !found { + continue + } + + fieldVal := structValue.FieldByName(field.Name) + if !fieldVal.IsValid() { + return errors.New(fmt.Sprintf("State [%s] not support [%s] filed", stateName, key)) + } + + //Get setMethod + var setMethod reflect.Value + if !fieldVal.CanSet() { + setMethod = getFiledSetMethod(field.Name, objVal) + + if !setMethod.IsValid() { + fieldAliasName := field.Tag.Get("alias") + setMethod = getFiledSetMethod(fieldAliasName, objVal) + } + + if !setMethod.IsValid() { + return errors.New(fmt.Sprintf("State [%s] [%s] field not support setMethod", stateName, key)) + } + setMethodType := setMethod.Type() + if !(setMethodType.NumIn() == 1 && setMethodType.In(0) == fieldVal.Type()) { + return errors.New(fmt.Sprintf("State [%s] [%s] field setMethod illegal", stateName, key)) + } + } + + val := reflect.ValueOf(value) + if fieldVal.Kind() == reflect.Struct { + //map[string]interface{} + if val.Kind() != reflect.Map { + return errors.New(fmt.Sprintf("State [%s] [%s] field type required map", stateName, key)) + } + + err := MapToStruct(stateName, fieldVal.Addr().Interface(), value.(map[string]interface{})) + if err != nil { + return err + } + } else if fieldVal.Kind() == reflect.Slice { + if val.Kind() != reflect.Slice { + return errors.New(fmt.Sprintf("State [%s] [%s] field type required slice", stateName, key)) + } + + sliceType := fieldVal.Type().Elem() + newSlice := reflect.MakeSlice(fieldVal.Type(), 0, val.Len()) + + for i := 0; i < val.Len(); i++ { + newElem := reflect.New(sliceType.Elem()) + elemMap := val.Index(i).Interface().(map[string]interface{}) + err := MapToStruct(stateName, newElem.Interface(), elemMap) + if err != nil { + return err + } + reflect.Append(newSlice, newElem.Elem()) + } + setFiled(fieldVal, setMethod, newSlice) + } else if fieldVal.Kind() == reflect.Map { + if val.Kind() != reflect.Map { + return errors.New(fmt.Sprintf("State [%s] [%s] field type required map", stateName, key)) + } + + mapType := field.Type + newMap := reflect.MakeMap(mapType) + + for _, key := range val.MapKeys() { + newVal := reflect.New(mapType.Elem().Elem()) + elemMap := val.MapIndex(key).Interface().(map[string]interface{}) + err := MapToStruct(stateName, newVal.Interface(), elemMap) + if err != nil { + return err + } + newMap.SetMapIndex(key, newVal.Elem()) + } + setFiled(fieldVal, setMethod, newMap) + } else { + setFiled(fieldVal, setMethod, val) + } + } + return nil +} + +func getField(t reflect.Type, name string) (reflect.StructField, bool) { + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + tag, hasAliasTag := field.Tag.Lookup("alias") + + if (hasAliasTag && tag == name) || (!hasAliasTag && field.Name == name) { + return field, true + } + + if field.Anonymous { + embeddedField, ok := getField(field.Type, name) + if ok { + return embeddedField, true + } + } + } + + return reflect.StructField{}, false +} + +func getFiledSetMethod(name string, structValue reflect.Value) reflect.Value { + fieldNameSlice := []rune(name) + fieldNameSlice[0] = unicode.ToUpper(fieldNameSlice[0]) + + setMethodName := "Set" + string(fieldNameSlice) + + setMethod := structValue.MethodByName(setMethodName) + return setMethod +} + +func setFiled(fieldVal reflect.Value, setMethod reflect.Value, val reflect.Value) { + if !fieldVal.CanSet() { + setMethod.Call([]reflect.Value{ + val, + }) + } else { + fieldVal.Set(val) + } +}