Browse Source

tcc 模式事务分支执行失败也需要报告分支状态

tags/v1.0.1
liu.xiaomin 4 years ago
parent
commit
0860014d32
1 changed files with 59 additions and 40 deletions
  1. +59
    -40
      pkg/client/tcc/proxy.go

+ 59
- 40
pkg/client/tcc/proxy.go View File

@@ -20,36 +20,48 @@ import (
)

var (
TCC_ACTION_NAME = "TccActionName"

TRY_METHOD = "Try"
CONFIRM_METHOD = "Confirm"
CANCEL_METHOD = "Cancel"

ACTION_START_TIME = "action-start-time"
ACTION_NAME = "actionName"
PREPARE_METHOD = "sys::prepare"
COMMIT_METHOD = "sys::commit"
ROLLBACK_METHOD = "sys::rollback"
HOST_NAME = "host-name"

TCC_METHOD_ARGUMENTS = "arguments"
TCC_METHOD_RESULT = "result"
// TCCActionName
TCCActionName = "TCCActionName"

// TryMethod
TryMethod = "Try"
// ConfirmMethod
ConfirmMethod = "Confirm"
// CancelMethod
CancelMethod = "Cancel"

// ActionStartTime
ActionStartTime = "action-start-time"
// ActionName
ActionName = "actionName"
// PrepareMethod
PrepareMethod = "sys::prepare"
// CommitMethod
CommitMethod = "sys::commit"
// RollbackMethod
RollbackMethod = "sys::rollback"
// HostName
HostName = "host-name"

// TCCMethodArguments
TCCMethodArguments = "arguments"
// TCCMethodResult
TCCMethodResult = "result"

businessActionContextType = reflect.TypeOf(&context.BusinessActionContext{})
)

type TccService interface {
type TCCService interface {
Try(ctx *context.BusinessActionContext) (bool, error)
Confirm(ctx *context.BusinessActionContext) bool
Cancel(ctx *context.BusinessActionContext) bool
}

type TccProxyService interface {
GetTccService() TccService
type TCCProxyService interface {
GetTCCService() TCCService
}

func ImplementTCC(v TccProxyService) {
func ImplementTCC(v TCCProxyService) {
valueOf := reflect.ValueOf(v)
log.Debugf("[Implement] reflect.TypeOf: %s", valueOf.String())

@@ -61,7 +73,7 @@ func ImplementTCC(v TccProxyService) {
log.Errorf("%s must be a struct ptr", valueOf.String())
return
}
proxyService := v.GetTccService()
proxyService := v.GetTCCService()
makeCallProxy := func(methodDesc *proxy.MethodDescriptor, resource *TCCResource) func(in []reflect.Value) []reflect.Value {
return func(in []reflect.Value) []reflect.Value {
businessContextValue := in[0]
@@ -85,28 +97,28 @@ func ImplementTCC(v TccProxyService) {
t := typeOf.Field(i)
methodName := t.Name
f := valueOfElem.Field(i)
if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() && methodName == TRY_METHOD {
if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() && methodName == TryMethod {
if t.Type.NumIn() != 1 && t.Type.In(0) != businessActionContextType {
panic("prepare method argument is not BusinessActionContext")
}

actionName := t.Tag.Get(TCC_ACTION_NAME)
actionName := t.Tag.Get(TCCActionName)
if actionName == "" {
panic("must tag TccActionName")
panic("must tag TCCActionName")
}

commitMethodDesc := proxy.Register(proxyService, CONFIRM_METHOD)
cancelMethodDesc := proxy.Register(proxyService, CANCEL_METHOD)
commitMethodDesc := proxy.Register(proxyService, ConfirmMethod)
cancelMethodDesc := proxy.Register(proxyService, CancelMethod)
tryMethodDesc := proxy.Register(proxyService, methodName)

tccResource := &TCCResource{
ResourceGroupID: "",
AppName: "",
ActionName: actionName,
PrepareMethodName: TRY_METHOD,
CommitMethodName: COMMIT_METHOD,
PrepareMethodName: TryMethod,
CommitMethodName: CommitMethod,
CommitMethod: commitMethodDesc,
RollbackMethodName: CANCEL_METHOD,
RollbackMethodName: CancelMethod,
RollbackMethod: cancelMethodDesc,
}

@@ -124,27 +136,34 @@ func proceed(methodDesc *proxy.MethodDescriptor, ctx *context.BusinessActionCont
args = make([]interface{}, 0)
)

branchID, err := doTccActionLogStore(ctx, resource)
branchID, err := doTCCActionLogStore(ctx, resource)
if err != nil {
return nil, errors.WithStack(err)
}
ctx.BranchID = branchID
ctx.BranchID = strconv.FormatInt(branchID, 10)

args = append(args, ctx)
returnValues := proxy.Invoke(methodDesc, nil, args)
errValue := returnValues[len(returnValues)-1]
if errValue.IsValid() && !errValue.IsNil() {
err := tccResourceManager.BranchReport(meta.BranchTypeTCC, ctx.XID, branchID, meta.BranchStatusPhaseOneFailed, nil)
if err != nil {
log.Errorf("branch report err: %v", err)
}
}

return returnValues, nil
}

func doTccActionLogStore(ctx *context.BusinessActionContext, resource *TCCResource) (string, error) {
ctx.ActionContext[ACTION_START_TIME] = time.CurrentTimeMillis()
ctx.ActionContext[PREPARE_METHOD] = resource.PrepareMethodName
ctx.ActionContext[COMMIT_METHOD] = resource.CommitMethodName
ctx.ActionContext[ROLLBACK_METHOD] = resource.RollbackMethodName
ctx.ActionContext[ACTION_NAME] = ctx.ActionName
func doTCCActionLogStore(ctx *context.BusinessActionContext, resource *TCCResource) (int64, error) {
ctx.ActionContext[ActionStartTime] = time.CurrentTimeMillis()
ctx.ActionContext[PrepareMethod] = resource.PrepareMethodName
ctx.ActionContext[CommitMethod] = resource.CommitMethodName
ctx.ActionContext[RollbackMethod] = resource.RollbackMethodName
ctx.ActionContext[ActionName] = ctx.ActionName
ip, err := gxnet.GetLocalIP()
if err == nil {
ctx.ActionContext[HOST_NAME] = ip
ctx.ActionContext[HostName] = ip
} else {
log.Warn("getLocalIP error")
}
@@ -155,13 +174,13 @@ func doTccActionLogStore(ctx *context.BusinessActionContext, resource *TCCResour
applicationData, err := json.Marshal(applicationContext)
if err != nil {
log.Errorf("marshal applicationContext failed:%v", applicationContext)
return "", err
return 0, err
}

branchID, err := tccResourceManager.BranchRegister(meta.BranchTypeTCC, ctx.ActionName, "", ctx.XID, applicationData, "")
if err != nil {
log.Errorf("TCC branch Register error, xid: %s", ctx.XID)
return "", errors.WithStack(err)
return 0, errors.WithStack(err)
}
return strconv.FormatInt(branchID, 10), nil
return branchID, nil
}

Loading…
Cancel
Save