Browse Source

optimize ResourceManagerInbound function's parameters into one struct-rm_api.go (#198)

optimiz: optimize function's parameters into one struct-rm_api.go
tags/1.0.2-RC1
liiibpm GitHub 3 years ago
parent
commit
28157d4d5f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 13 deletions
  1. +7
    -1
      pkg/remoting/processor/client/rm_branch_commit_processor.go
  2. +8
    -2
      pkg/remoting/processor/client/rm_branch_rollback_processor.go
  3. +11
    -2
      pkg/rm/rm_api.go
  4. +8
    -8
      pkg/rm/tcc/tcc_resource.go

+ 7
- 1
pkg/remoting/processor/client/rm_branch_commit_processor.go View File

@@ -43,8 +43,14 @@ func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage messag
resourceID := request.ResourceId
applicationData := request.ApplicationData
log.Infof("Branch committing: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)
branchResource := rm.BranchResource{
ResourceId: resourceID,
BranchId: branchID,
ApplicationData: applicationData,
Xid: xid,
}

status, err := rm.GetRmCacheInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
status, err := rm.GetRmCacheInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, branchResource)
if err != nil {
log.Infof("branch commit error: %s", err.Error())
return err


+ 8
- 2
pkg/remoting/processor/client/rm_branch_rollback_processor.go View File

@@ -43,8 +43,14 @@ func (f *rmBranchRollbackProcessor) Process(ctx context.Context, rpcMessage mess
resourceID := request.ResourceId
applicationData := request.ApplicationData
log.Infof("Branch rollback request: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := rm.GetRmCacheInstance().GetResourceManager(request.BranchType).BranchRollback(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
branchResource := rm.BranchResource{
BranchType: request.BranchType,
Xid: xid,
BranchId: branchID,
ResourceId: resourceID,
ApplicationData: applicationData,
}
status, err := rm.GetRmCacheInstance().GetResourceManager(request.BranchType).BranchRollback(ctx, branchResource)
if err != nil {
log.Infof("branch rollback error: %s", err.Error())
return err


+ 11
- 2
pkg/rm/rm_api.go View File

@@ -31,12 +31,21 @@ type Resource interface {
GetBranchType() branch.BranchType
}

// branch resource which contains branch to commit or rollback
type BranchResource struct {
BranchType branch.BranchType
Xid string
BranchId int64
ResourceId string
ApplicationData []byte
}

// ResourceManagerInbound Control a branch transaction commit or rollback
type ResourceManagerInbound interface {
// Commit a branch transaction
BranchCommit(ctx context.Context, branchType branch.BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (branch.BranchStatus, error)
BranchCommit(ctx context.Context, resource BranchResource) (branch.BranchStatus, error)
// Rollback a branch transaction
BranchRollback(ctx context.Context, ranchType branch.BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (branch.BranchStatus, error)
BranchRollback(ctx context.Context, resource BranchResource) (branch.BranchStatus, error)
}

// BranchRegisterParam Branch register function param for ResourceManager


+ 8
- 8
pkg/rm/tcc/tcc_resource.go View File

@@ -123,16 +123,16 @@ func (t *TCCResourceManager) GetCachedResources() *sync.Map {
}

// Commit a branch transaction
func (t *TCCResourceManager) BranchCommit(ctx context.Context, ranchType branch.BranchType, xid string, branchID int64, resourceID string, applicationData []byte) (branch.BranchStatus, error) {
func (t *TCCResourceManager) BranchCommit(ctx context.Context, branchResource rm.BranchResource) (branch.BranchStatus, error) {
var tccResource *TCCResource
if resource, ok := t.resourceManagerMap.Load(resourceID); !ok {
err := fmt.Errorf("TCC resource is not exist, resourceId: %s", resourceID)
if resource, ok := t.resourceManagerMap.Load(branchResource.ResourceId); !ok {
err := fmt.Errorf("TCC resource is not exist, resourceId: %s", branchResource.ResourceId)
return 0, err
} else {
tccResource, _ = resource.(*TCCResource)
}

_, err := tccResource.TwoPhaseAction.Commit(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData))
_, err := tccResource.TwoPhaseAction.Commit(ctx, t.getBusinessActionContext(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData))
if err != nil {
return branch.BranchStatusPhasetwoCommitFailedRetryable, err
}
@@ -160,16 +160,16 @@ func (t *TCCResourceManager) getBusinessActionContext(xid string, branchID int64
}

// Rollback a branch transaction
func (t *TCCResourceManager) BranchRollback(ctx context.Context, ranchType branch.BranchType, xid string, branchID int64, resourceID string, applicationData []byte) (branch.BranchStatus, error) {
func (t *TCCResourceManager) BranchRollback(ctx context.Context, branchResource rm.BranchResource) (branch.BranchStatus, error) {
var tccResource *TCCResource
if resource, ok := t.resourceManagerMap.Load(resourceID); !ok {
err := fmt.Errorf("CC resource is not exist, resourceId: %s", resourceID)
if resource, ok := t.resourceManagerMap.Load(branchResource.ResourceId); !ok {
err := fmt.Errorf("CC resource is not exist, resourceId: %s", branchResource.ResourceId)
return 0, err
} else {
tccResource, _ = resource.(*TCCResource)
}

_, err := tccResource.TwoPhaseAction.Rollback(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData))
_, err := tccResource.TwoPhaseAction.Rollback(ctx, t.getBusinessActionContext(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData))
if err != nil {
return branch.BranchStatusPhasetwoRollbackFailedRetryable, err
}


Loading…
Cancel
Save