Browse Source

add tcc process

tags/v0.1.0-rc1
luky116_Liuyuecai 3 years ago
parent
commit
5cfb55a838
49 changed files with 1135 additions and 891 deletions
  1. +0
    -8
      .idea/.gitignore
  2. +0
    -15
      .idea/git_toolbox_prj.xml
  3. +0
    -8
      .idea/modules.xml
  4. +0
    -9
      .idea/seata-go.iml
  5. +0
    -6
      .idea/vcs.xml
  6. +0
    -1
      go.mod
  7. +0
    -548
      go.sum
  8. +0
    -5
      pkg/common/constant.go
  9. +10
    -0
      pkg/common/constants.go
  10. +78
    -0
      pkg/common/model/context.go
  11. +6
    -5
      pkg/common/model/resource_manager.go
  12. +7
    -0
      pkg/common/model/transaction_manager.go
  13. +12
    -4
      pkg/config/tm_config.go
  14. +1
    -0
      pkg/imports/imports.go
  15. +1
    -0
      pkg/protocol/codec/codec.go
  16. +1
    -0
      pkg/protocol/codec/seata_decoder.go
  17. +1
    -0
      pkg/protocol/codec/seata_encoder.go
  18. +5
    -5
      pkg/protocol/constant.go
  19. +5
    -11
      pkg/protocol/identify.go
  20. +4
    -3
      pkg/protocol/transaction/rm_handler.go
  21. +163
    -0
      pkg/rm/api/transaction.go
  22. +8
    -0
      pkg/rm/api/transactional_executor.go
  23. +92
    -0
      pkg/rm/api/transactional_template.go
  24. +0
    -28
      pkg/rm/common_resource_manager.go
  25. +7
    -4
      pkg/rm/common_rm_handler.go
  26. +12
    -11
      pkg/rm/resource_manager_facade.go
  27. +7
    -6
      pkg/rm/rm_handler_facade.go
  28. +100
    -0
      pkg/rm/rm_remoting.go
  29. +27
    -0
      pkg/rm/rm_remoting_test.go
  30. +1
    -1
      pkg/rm/tcc/api/business_action_context.go
  31. +10
    -13
      pkg/rm/tcc/tcc_resource.go
  32. +106
    -13
      pkg/rm/tcc/tcc_rm.go
  33. +101
    -2
      pkg/rm/tcc/tcc_service.go
  34. +11
    -77
      pkg/rpc/getty/getty_client.go
  35. +18
    -6
      pkg/rpc/getty/getty_remoting.go
  36. +20
    -25
      pkg/rpc/getty/listener.go
  37. +1
    -0
      pkg/rpc/getty/readwriter.go
  38. +1
    -14
      pkg/rpc/getty/rpc_client.go
  39. +0
    -10
      pkg/rpc/getty/rpc_rm_message.go
  40. +3
    -1
      pkg/rpc/processor/client/client_on_response_processor.go
  41. +48
    -0
      pkg/rpc/processor/client/rm_branch_commit_processor.go
  42. +48
    -0
      pkg/rpc/processor/client/rm_branch_rollback_processor.go
  43. +0
    -45
      pkg/tm/api/global_transaction.go
  44. +154
    -0
      pkg/tm/global_transaction.go
  45. +7
    -0
      pkg/utils/error/error.go
  46. +6
    -0
      pkg/utils/net_utils.go
  47. +3
    -6
      pkg/utils/xid/xid_utils.go
  48. +49
    -0
      test/tcc_service_test.go
  49. +1
    -1
      testdata/mock/mock_tcc_service.go

+ 0
- 8
.idea/.gitignore View File

@@ -1,8 +0,0 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

+ 0
- 15
.idea/git_toolbox_prj.xml View File

@@ -1,15 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="GitToolBoxProjectSettings">
<option name="commitMessageIssueKeyValidationOverride">
<BoolValueOverride>
<option name="enabled" value="true" />
</BoolValueOverride>
</option>
<option name="commitMessageValidationConfigOverride">
<CommitMessageValidationOverride>
<option name="enabled" value="true" />
</CommitMessageValidationOverride>
</option>
</component>
</project>

+ 0
- 8
.idea/modules.xml View File

@@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/seata-go.iml" filepath="$PROJECT_DIR$/.idea/seata-go.iml" />
</modules>
</component>
</project>

+ 0
- 9
.idea/seata-go.iml View File

@@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

+ 0
- 6
.idea/vcs.xml View File

@@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

+ 0
- 1
go.mod View File

@@ -6,7 +6,6 @@ require (
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/apache/dubbo-getty v1.4.8
github.com/dubbogo/gost v1.11.23
github.com/dubbogo/tools v1.0.9 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pkg/errors v0.9.1


+ 0
- 548
go.sum
File diff suppressed because it is too large
View File


+ 0
- 5
pkg/common/constant.go View File

@@ -1,5 +0,0 @@
package common

const (
XID = "xid"
)

+ 10
- 0
pkg/common/constants.go View File

@@ -0,0 +1,10 @@
package common

const (
StartTime = "action-start-time"
HostName = "host-name"
TccBusinessActionContext = "tcc-business-action-context"

CONTEXT_VARIABLE = "contextVariable"
XID = "xid"
)

+ 78
- 0
pkg/common/model/context.go View File

@@ -0,0 +1,78 @@
package model

import (
"context"
"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/rm/tcc/api"
)

type ContextVariable struct {
Xid string
Status *GlobalStatus
Role *GlobalTransactionRole
BusinessActionContext *api.BusinessActionContext
}

func InitSeataContext(ctx context.Context) context.Context {
return context.WithValue(ctx, common.CONTEXT_VARIABLE, &ContextVariable{})
}

func IsSeataContext(ctx context.Context) bool {
return ctx.Value(common.CONTEXT_VARIABLE) != nil
}

func GetBusinessActionContext(ctx context.Context) *api.BusinessActionContext {
variable := ctx.Value(common.TccBusinessActionContext)
if variable == nil {
return nil
}
return variable.(*api.BusinessActionContext)
}

func SetBusinessActionContext(ctx context.Context, businessActionContext *api.BusinessActionContext) {
variable := ctx.Value(common.TccBusinessActionContext)
if variable != nil {
variable.(*ContextVariable).BusinessActionContext = businessActionContext
}
}

func GetTransactionRole(ctx context.Context) *GlobalTransactionRole {
variable := ctx.Value(common.CONTEXT_VARIABLE)
if variable == nil {
return nil
}
return variable.(*ContextVariable).Role
}

func SetTransactionRole(ctx context.Context, role GlobalTransactionRole) {
variable := ctx.Value(common.CONTEXT_VARIABLE)
if variable != nil {
variable.(*ContextVariable).Role = &role
}
}

func GetXID(ctx context.Context) string {
variable := ctx.Value(common.CONTEXT_VARIABLE)
if variable == nil {
return ""
}
return variable.(*ContextVariable).Xid
}

func HasXID(ctx context.Context) bool {
return GetXID(ctx) != ""
}

func SetXID(ctx context.Context, xid string) {
variable := ctx.Value(common.CONTEXT_VARIABLE)
if variable != nil {
variable.(*ContextVariable).Xid = xid
}
}

func UnbindXid(ctx context.Context) {
variable := ctx.Value(common.CONTEXT_VARIABLE)
if variable != nil {
variable.(*ContextVariable).Xid = ""
}
}

+ 6
- 5
pkg/common/model/resource_manager.go View File

@@ -1,25 +1,26 @@
package model

import (
"context"
"sync"
)

// Control a branch transaction commit or rollback
type ResourceManagerInbound interface {
// Commit a branch transaction
BranchCommit(branchType BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (BranchStatus, error)
BranchCommit(ctx context.Context, branchType BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (BranchStatus, error)
// Rollback a branch transaction
BranchRollback(branchType BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (BranchStatus, error)
BranchRollback(ctx context.Context, ranchType BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (BranchStatus, error)
}

// Resource Manager: send outbound request to TC
type ResourceManagerOutbound interface {
// Branch register long
BranchRegister(branchType BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error)
BranchRegister(ctx context.Context, ranchType BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error)
// Branch report
BranchReport(branchType BranchType, xid string, branchId int64, status BranchStatus, applicationData string) error
BranchReport(ctx context.Context, ranchType BranchType, xid string, branchId int64, status BranchStatus, applicationData string) error
// Lock query boolean
LockQuery(branchType BranchType, resourceId, xid, lockKeys string) (bool, error)
LockQuery(ctx context.Context, ranchType BranchType, resourceId, xid, lockKeys string) (bool, error)
}

// Resource Manager: common behaviors


+ 7
- 0
pkg/common/model/transaction_manager.go View File

@@ -1,5 +1,12 @@
package model

type GlobalTransactionRole int8

const (
LAUNCHER GlobalTransactionRole = 0
PARTICIPANT GlobalTransactionRole = 1
)

type TransactionManager interface {
// Begin a new global transaction.
Begin(applicationId, transactionServiceGroup, name string, timeout int64) (string, error)


+ 12
- 4
pkg/config/tm_config.go View File

@@ -1,13 +1,21 @@
package config

type TMConfig struct {
CommitRetryCount int32 `default:"5" yaml:"commit_retry_count" json:"commit_retry_count,omitempty"`
RollbackRetryCount int32 `default:"5" yaml:"rollback_retry_count" json:"rollback_retry_count,omitempty"`
CommitRetryCount uint16 `default:"5" yaml:"commit_retry_count" json:"commit_retry_count,omitempty"`
RollbackRetryCount uint16 `default:"5" yaml:"rollback_retry_count" json:"rollback_retry_count,omitempty"`
DefaultGlobalTransactionTimeout uint16 `default:"60000" yaml:"default_global_transaction_timeout" json:"default_global_transaction_timeout,omitempty"`
DegradeCheck bool `default:"false" yaml:"degrade_check" json:"degrade_check,omitempty"`
DegradeCheckAllowTimes uint16 `default:"10" yaml:"degrade_check_allow_times" json:"degrade_check_allow_times,omitempty"`
DegradeCheckPeriod uint16 `default:"2000" yaml:"degrade_check_period" json:"degrade_check_period,omitempty"`
}

func GetDefaultTmConfig() TMConfig {
return TMConfig{
CommitRetryCount: 5,
RollbackRetryCount: 5,
CommitRetryCount: 5,
RollbackRetryCount: 5,
DefaultGlobalTransactionTimeout: 60000,
DegradeCheck: false,
DegradeCheckAllowTimes: 10,
DegradeCheckPeriod: 2000,
}
}

+ 1
- 0
pkg/imports/imports.go View File

@@ -1,6 +1,7 @@
package imports

import (
_ "github.com/seata/seata-go/pkg/rm/tcc"
_ "github.com/seata/seata-go/pkg/rpc/getty"
_ "github.com/seata/seata-go/pkg/rpc/processor/client"
)

+ 1
- 0
pkg/protocol/codec/codec.go View File

@@ -15,6 +15,7 @@ import (

type SerializerType byte

// TODO 待重构
const (
SEATA = byte(0x1)
PROTOBUF = byte(0x2)


+ 1
- 0
pkg/protocol/codec/seata_decoder.go View File

@@ -13,6 +13,7 @@ import (
"github.com/seata/seata-go/pkg/protocol"
)

// TODO 待重构
func AbstractResultMessageDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0


+ 1
- 0
pkg/protocol/codec/seata_encoder.go View File

@@ -13,6 +13,7 @@ import (
"github.com/seata/seata-go/pkg/utils/log"
)

// TODO 待重构
func AbstractResultMessageEncoder(in interface{}) []byte {
var (
zero16 int16 = 0


+ 5
- 5
pkg/protocol/constant.go View File

@@ -13,18 +13,18 @@ const (
// V1HeadLength v1 head length
V1HeadLength = 16

// MSGTypeRequestSync request message type
// Request message type
MSGTypeRequestSync RequestType = 0

// MSGTypeResponse response message type
// Response message type
MSGTypeResponse RequestType = 1

// MSGTypeRequestOneway request one way
// Request which no need response
MSGTypeRequestOneway RequestType = 2

// MSGTypeHeartbeatRequest heart beat request
// Heartbeat Request
MSGTypeHeartbeatRequest RequestType = 3

// MSGTypeHeartbeatResponse heart beat response
// Heartbeat Response
MSGTypeHeartbeatResponse RequestType = 4
)

+ 5
- 11
pkg/protocol/identify.go View File

@@ -6,21 +6,15 @@ type AbstractResultMessage struct {
}

type AbstractIdentifyRequest struct {
Version string

ApplicationId string `json:"applicationId"`

Version string
ApplicationId string `json:"applicationId"`
TransactionServiceGroup string

ExtraData []byte
ExtraData []byte
}

type AbstractIdentifyResponse struct {
AbstractResultMessage

Version string

ExtraData []byte

Version string
ExtraData []byte
Identified bool
}

+ 4
- 3
pkg/protocol/transaction/rm_handler.go View File

@@ -1,6 +1,7 @@
package transaction

import (
"context"
"github.com/seata/seata-go/pkg/protocol"
)

@@ -12,7 +13,7 @@ type RMInboundHandler interface {
* @param request the request
* @return the branch commit response
*/
HandleBranchCommitRequest(request protocol.BranchCommitRequest) (*protocol.BranchCommitResponse, error)
HandleBranchCommitRequest(ctx context.Context, request protocol.BranchCommitRequest) (*protocol.BranchCommitResponse, error)

/**
* Handle branch rollback response.
@@ -21,12 +22,12 @@ type RMInboundHandler interface {
* @return the branch rollback response
*/

HandleBranchRollbackRequest(request protocol.BranchRollbackRequest) (*protocol.BranchRollbackResponse, error)
HandleBranchRollbackRequest(ctx context.Context, request protocol.BranchRollbackRequest) (*protocol.BranchRollbackResponse, error)

/**
* Handle delete undo log .
*
* @param request the request
*/
HandleUndoLogDeleteRequest(request protocol.UndoLogDeleteRequest) error
HandleUndoLogDeleteRequest(ctx context.Context, request protocol.UndoLogDeleteRequest) error
}

+ 163
- 0
pkg/rm/api/transaction.go View File

@@ -0,0 +1,163 @@
package api

type Propagation int8

type TransactionInfo struct {
TimeOut int32
Name string
Propagation Propagation
LockRetryInternal int64
LockRetryTimes int64
}

const (

/**
* The REQUIRED.
* The default propagation.
*
* <p>
* If transaction is existing, execute with current transaction,
* else execute with new transaction.
* </p>
*
* <p>
* The logic is similar to the following code:
* <code><pre>
* if (tx == null) {
* try {
* tx = beginNewTransaction(); // begin new transaction, is not existing
* Object rs = business.execute(); // execute with new transaction
* commitTransaction(tx);
* return rs;
* } catch (Exception ex) {
* rollbackTransaction(tx);
* throw ex;
* }
* } else {
* return business.execute(); // execute with current transaction
* }
* </pre></code>
* </p>
*/
REQUIRED Propagation = iota

/**
* The REQUIRES_NEW.
*
* <p>
* If transaction is existing, suspend it, and then execute business with new transaction.
* </p>
*
* <p>
* The logic is similar to the following code:
* <code><pre>
* try {
* if (tx != null) {
* suspendedResource = suspendTransaction(tx); // suspend current transaction
* }
* try {
* tx = beginNewTransaction(); // begin new transaction
* Object rs = business.execute(); // execute with new transaction
* commitTransaction(tx);
* return rs;
* } catch (Exception ex) {
* rollbackTransaction(tx);
* throw ex;
* }
* } finally {
* if (suspendedResource != null) {
* resumeTransaction(suspendedResource); // resume transaction
* }
* }
* </pre></code>
* </p>
*/
REQUIRES_NEW

/**
* The NOT_SUPPORTED.
*
* <p>
* If transaction is existing, suspend it, and then execute business without transaction.
* </p>
*
* <p>
* The logic is similar to the following code:
* <code><pre>
* try {
* if (tx != null) {
* suspendedResource = suspendTransaction(tx); // suspend current transaction
* }
* return business.execute(); // execute without transaction
* } finally {
* if (suspendedResource != null) {
* resumeTransaction(suspendedResource); // resume transaction
* }
* }
* </pre></code>
* </p>
*/
NOT_SUPPORTED

/**
* The SUPPORTS.
*
* <p>
* If transaction is not existing, execute without global transaction,
* else execute business with current transaction.
* </p>
*
* <p>
* The logic is similar to the following code:
* <code><pre>
* if (tx != null) {
* return business.execute(); // execute with current transaction
* } else {
* return business.execute(); // execute without transaction
* }
* </pre></code>
* </p>
*/
SUPPORTS

/**
* The NEVER.
*
* <p>
* If transaction is existing, throw exception,
* else execute business without transaction.
* </p>
*
* <p>
* The logic is similar to the following code:
* <code><pre>
* if (tx != null) {
* throw new TransactionException("existing transaction");
* }
* return business.execute(); // execute without transaction
* </pre></code>
* </p>
*/
NEVER

/**
* The MANDATORY.
*
* <p>
* If transaction is not existing, throw exception,
* else execute business with current transaction.
* </p>
*
* <p>
* The logic is similar to the following code:
* <code><pre>
* if (tx == null) {
* throw new TransactionException("not existing transaction");
* }
* return business.execute(); // execute with current transaction
* </pre></code>
* </p>
*/
MANDATORY
)

+ 8
- 0
pkg/rm/api/transactional_executor.go View File

@@ -0,0 +1,8 @@
package api

import "context"

type TransactionalExecutor interface {
Execute(ctx context.Context, param interface{}) (interface{}, error)
GetTransactionInfo() TransactionInfo
}

+ 92
- 0
pkg/rm/api/transactional_template.go View File

@@ -0,0 +1,92 @@
package api

import (
"context"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/utils/log"
"sync"
)

var (
// singletone ResourceManagerFacade
transactionTemplate *TransactionTemplate
onceTransactionTemplate = &sync.Once{}
)

func GetTransactionTemplate() *TransactionTemplate {
if transactionTemplate == nil {
onceTransactionTemplate.Do(func() {
transactionTemplate = &TransactionTemplate{}
})
}
return transactionTemplate
}

type TransactionTemplate struct {
}

func (t *TransactionTemplate) Execute(ctx context.Context, business TransactionalExecutor, param interface{}) (interface{}, error) {
if !model.IsSeataContext(ctx) {
err := errors.New("context should be inited as seata context!")
log.Error(err)
return nil, err
}

if model.GetTransactionRole(ctx) == nil {
model.SetTransactionRole(ctx, model.LAUNCHER)
}

var tx *tm.GlobalTransaction
if model.HasXID(ctx) {
tx = &tm.GlobalTransaction{
Xid: model.GetXID(ctx),
Status: model.Begin,
Role: model.PARTICIPANT,
}
}

// todo: Handle the transaction propagation.

if tx == nil {
tx = &tm.GlobalTransaction{
Xid: model.GetXID(ctx),
Status: model.UnKnown,
Role: model.LAUNCHER,
}
}

// todo: set current tx config to holder

// begin global transaction
err := t.BeginTransaction(ctx, tx, business.GetTransactionInfo().TimeOut, business.GetTransactionInfo().Name)
if err != nil {
log.Infof("transactionTemplate: begin transaction failed, error %v", err)
return nil, err
}

// do your business
res, err := business.Execute(ctx, param)
if err != nil {
log.Infof("transactionTemplate: execute business failed, error %v", err)
return nil, tm.GetGlobalTransactionManager().Rollback(ctx, tx)
}

// commit global transaction
err = t.CommitTransaction(ctx, tx)
if err != nil {
log.Infof("transactionTemplate: commit transaction failed, error %v", err)
// rollback transaction
return nil, tm.GetGlobalTransactionManager().Rollback(ctx, tx)
}
return res, err
}

func (TransactionTemplate) BeginTransaction(ctx context.Context, tx *tm.GlobalTransaction, timeout int32, name string) error {
return tm.GetGlobalTransactionManager().Begin(ctx, tx, timeout, name)
}

func (TransactionTemplate) CommitTransaction(ctx context.Context, tx *tm.GlobalTransaction) error {
return tm.GetGlobalTransactionManager().Commit(ctx, tx)
}

+ 0
- 28
pkg/rm/common_resource_manager.go View File

@@ -1,28 +0,0 @@
package rm

import (
"github.com/seata/seata-go/pkg/common/model"
)

// TODO
type RMRemoting struct {
}

// Branch register long
func (RMRemoting) BranchRegister(branchType model.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error) {
return 0, nil
}

// Branch report
func (RMRemoting) BranchReport(branchType model.BranchType, xid string, branchId int64, status model.BranchStatus, applicationData string) error {
return nil
}

// Lock query boolean
func (RMRemoting) LockQuery(branchType model.BranchType, resourceId, xid, lockKeys string) (bool, error) {
return false, nil
}

func (RMRemoting) RegisterResource(resource model.Resource) error {
return nil
}

+ 7
- 4
pkg/rm/common_rm_handler.go View File

@@ -1,8 +1,10 @@
package rm

import (
"context"
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/utils/log"
)

type CommonRMHandler struct {
@@ -14,13 +16,14 @@ func (h *CommonRMHandler) SetRMGetter(rmGetter model.ResourceManagerGetter) {
}

// Handle branch commit response.
func (h *CommonRMHandler) HandleBranchCommitRequest(request protocol.BranchCommitRequest) (*protocol.BranchCommitResponse, error) {
func (h *CommonRMHandler) HandleBranchCommitRequest(ctx context.Context, request protocol.BranchCommitRequest) (*protocol.BranchCommitResponse, error) {
xid := request.Xid
branchID := request.BranchId
resourceID := request.ResourceId
applicationData := request.ApplicationData
log.Infof("Branch committing: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := h.rmGetter.GetResourceManager().BranchCommit(request.BranchType, xid, branchID, resourceID, applicationData)
status, err := h.rmGetter.GetResourceManager().BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
// TODO: handle error
return nil, err
@@ -36,13 +39,13 @@ func (h *CommonRMHandler) HandleBranchCommitRequest(request protocol.BranchCommi

// Handle branch rollback response.
// TODO
func (h *CommonRMHandler) HandleBranchRollbackRequest(request protocol.BranchRollbackRequest) (*protocol.BranchRollbackResponse, error) {
func (h *CommonRMHandler) HandleBranchRollbackRequest(ctx context.Context, request protocol.BranchRollbackRequest) (*protocol.BranchRollbackResponse, error) {
return nil, nil
}

// Handle delete undo log .
// TODO
func (h *CommonRMHandler) HandleUndoLogDeleteRequest(request protocol.UndoLogDeleteRequest) error {
func (h *CommonRMHandler) HandleUndoLogDeleteRequest(ctx context.Context, request protocol.UndoLogDeleteRequest) error {
return nil
}



+ 12
- 11
pkg/rm/resource_manager_facade.go View File

@@ -1,6 +1,7 @@
package rm

import (
"context"
"fmt"
"sync"
)
@@ -30,7 +31,7 @@ type ResourceManagerFacade struct {
}

// 将事务管理器注册到这里
func RegisterResource(resourceManager model2.ResourceManager) {
func RegisterResourceManager(resourceManager model2.ResourceManager) {
resourceManagerMap.Store(resourceManager.GetBranchType(), resourceManager)
}

@@ -43,28 +44,28 @@ func (*ResourceManagerFacade) GetResourceManager(branchType model2.BranchType) m
}

// Commit a branch transaction
func (d *ResourceManagerFacade) BranchCommit(branchType model2.BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (model2.BranchStatus, error) {
return d.GetResourceManager(branchType).BranchCommit(branchType, xid, branchId, resourceId, applicationData)
func (d *ResourceManagerFacade) BranchCommit(ctx context.Context, branchType model2.BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (model2.BranchStatus, error) {
return d.GetResourceManager(branchType).BranchCommit(ctx, branchType, xid, branchId, resourceId, applicationData)
}

// Rollback a branch transaction
func (d *ResourceManagerFacade) BranchRollback(branchType model2.BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (model2.BranchStatus, error) {
return d.GetResourceManager(branchType).BranchRollback(branchType, xid, branchId, resourceId, applicationData)
func (d *ResourceManagerFacade) BranchRollback(ctx context.Context, branchType model2.BranchType, xid string, branchId int64, resourceId string, applicationData []byte) (model2.BranchStatus, error) {
return d.GetResourceManager(branchType).BranchRollback(ctx, branchType, xid, branchId, resourceId, applicationData)
}

// Branch register long
func (d *ResourceManagerFacade) BranchRegister(branchType model2.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error) {
return d.GetResourceManager(branchType).BranchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys)
func (d *ResourceManagerFacade) BranchRegister(ctx context.Context, branchType model2.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error) {
return d.GetResourceManager(branchType).BranchRegister(ctx, branchType, resourceId, clientId, xid, applicationData, lockKeys)
}

// Branch report
func (d *ResourceManagerFacade) BranchReport(branchType model2.BranchType, xid string, branchId int64, status model2.BranchStatus, applicationData string) error {
return d.GetResourceManager(branchType).BranchReport(branchType, xid, branchId, status, applicationData)
func (d *ResourceManagerFacade) BranchReport(ctx context.Context, branchType model2.BranchType, xid string, branchId int64, status model2.BranchStatus, applicationData string) error {
return d.GetResourceManager(branchType).BranchReport(ctx, branchType, xid, branchId, status, applicationData)
}

// Lock query boolean
func (d *ResourceManagerFacade) LockQuery(branchType model2.BranchType, resourceId, xid, lockKeys string) (bool, error) {
return d.GetResourceManager(branchType).LockQuery(branchType, resourceId, xid, lockKeys)
func (d *ResourceManagerFacade) LockQuery(ctx context.Context, branchType model2.BranchType, resourceId, xid, lockKeys string) (bool, error) {
return d.GetResourceManager(branchType).LockQuery(ctx, branchType, resourceId, xid, lockKeys)
}

// Register a model.Resource to be managed by model.Resource Manager


+ 7
- 6
pkg/rm/rm_handler_facade.go View File

@@ -1,6 +1,7 @@
package rm

import (
"context"
"sync"
)

@@ -28,20 +29,20 @@ func GetRMHandlerFacadeInstance() *RMHandlerFacade {
}

// Handle branch commit response.
func (h *RMHandlerFacade) HandleBranchCommitRequest(request protocol.BranchCommitRequest) (*protocol.BranchCommitResponse, error) {
return h.getRMHandler(request.BranchType).HandleBranchCommitRequest(request)
func (h *RMHandlerFacade) HandleBranchCommitRequest(ctx context.Context, request protocol.BranchCommitRequest) (*protocol.BranchCommitResponse, error) {
return h.getRMHandler(request.BranchType).HandleBranchCommitRequest(ctx, request)
}

// Handle branch rollback response.
// TODO
func (h *RMHandlerFacade) HandleBranchRollbackRequest(request protocol.BranchRollbackRequest) (*protocol.BranchRollbackResponse, error) {
return h.getRMHandler(request.BranchType).HandleBranchRollbackRequest(request)
func (h *RMHandlerFacade) HandleBranchRollbackRequest(ctx context.Context, request protocol.BranchRollbackRequest) (*protocol.BranchRollbackResponse, error) {
return h.getRMHandler(request.BranchType).HandleBranchRollbackRequest(ctx, request)
}

// Handle delete undo log .
// TODO
func (h *RMHandlerFacade) HandleUndoLogDeleteRequest(request protocol.UndoLogDeleteRequest) error {
return h.getRMHandler(request.BranchType).HandleUndoLogDeleteRequest(request)
func (h *RMHandlerFacade) HandleUndoLogDeleteRequest(ctx context.Context, request protocol.UndoLogDeleteRequest) error {
return h.getRMHandler(request.BranchType).HandleUndoLogDeleteRequest(ctx, request)
}

func (h *RMHandlerFacade) RegisteRMHandler(handler *CommonRMHandler) {


+ 100
- 0
pkg/rm/rm_remoting.go View File

@@ -0,0 +1,100 @@
package rm

import (
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/utils/log"
"sync"
)

var (
rmRemoting *RMRemoting
onceGettyRemoting = &sync.Once{}
)

func GetRMRemotingInstance() *RMRemoting {
if rmRemoting == nil {
onceGettyRemoting.Do(func() {
rmRemoting = &RMRemoting{}
})
}
return rmRemoting
}

// TODO
type RMRemoting struct {
}

// Branch register long
func (RMRemoting) BranchRegister(branchType model.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error) {
return 0, nil
}

// Branch report
func (RMRemoting) BranchReport(branchType model.BranchType, xid string, branchId int64, status model.BranchStatus, applicationData string) error {
return nil
}

// Lock query boolean
func (RMRemoting) LockQuery(branchType model.BranchType, resourceId, xid, lockKeys string) (bool, error) {
return false, nil
}

func (r *RMRemoting) RegisterResource(resource model.Resource) error {
req := protocol.RegisterRMRequest{
AbstractIdentifyRequest: protocol.AbstractIdentifyRequest{
//todo replace with config
Version: "1.4.2",
ApplicationId: "tcc-sample",
TransactionServiceGroup: "my_test_tx_group",
},
ResourceIds: resource.GetResourceId(),
}
res, err := getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("RegisterResourceManager error: {%#v}", err.Error())
return err
}

if isRegisterSuccess(res) {
r.onRegisterRMSuccess(res.(protocol.RegisterRMResponse))
} else {
r.onRegisterRMFailure(res.(protocol.RegisterRMResponse))
}

return nil
}

func isRegisterSuccess(response interface{}) bool {
//if res, ok := response.(protocol.RegisterTMResponse); ok {
// return res.Identified
//} else if res, ok := response.(protocol.RegisterRMResponse); ok {
// return res.Identified
//}
//return false
if res, ok := response.(protocol.RegisterRMResponse); ok {
return res.Identified
}
return false
}

func (r *RMRemoting) onRegisterRMSuccess(response protocol.RegisterRMResponse) {
// TODO
log.Infof("register RM success. response: %#v", response)
}

func (r *RMRemoting) onRegisterRMFailure(response protocol.RegisterRMResponse) {
// TODO
log.Infof("register RM failure. response: %#v", response)
}

func (r *RMRemoting) onRegisterTMSuccess(response protocol.RegisterTMResponse) {
// TODO
log.Infof("register TM success. response: %#v", response)
}

func (r *RMRemoting) onRegisterTMFailure(response protocol.RegisterTMResponse) {
// TODO
log.Infof("register TM failure. response: %#v", response)
}

+ 27
- 0
pkg/rm/rm_remoting_test.go View File

@@ -0,0 +1,27 @@
package rm

import (
"github.com/seata/seata-go/pkg/common/model"
_ "github.com/seata/seata-go/pkg/imports"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/utils/log"
)

func (RMRemoting) RegisterResource(resource model.Resource) error {
req := protocol.RegisterRMRequest{
AbstractIdentifyRequest: protocol.AbstractIdentifyRequest{
//todo replace with config
Version: "1.4.2",
ApplicationId: "tcc-sample",
TransactionServiceGroup: "my_test_tx_group",
},
ResourceIds: resource.GetResourceId(),
}
err := getty.GetGettyRemotingClient().SendAsyncRequest(req)
if err != nil {
log.Error("RegisterResourceManager error: {%#v}", err.Error())
return err
}
return nil
}

pkg/rm/api/business_action_context.go → pkg/rm/tcc/api/business_action_context.go View File

@@ -4,5 +4,5 @@ type BusinessActionContext struct {
Xid string
BranchId string
ActionName string
ActionContext map[string]interface{}
ActionContext interface{}
}

+ 10
- 13
pkg/rm/tcc/tcc_resource.go View File

@@ -1,23 +1,20 @@
package tcc

import (
"reflect"
)

import (
"github.com/seata/seata-go/pkg/common/model"
)

type TCCResource struct {
ResourceGroupId string `default:"DEFAULT"`
AppName string
ActionName string
TargetBean interface{}
PrepareMethod reflect.Method
CommitMethodName string
CommitMethod reflect.Method
RollbackMethodName string
RollbackMethod reflect.Method
TCCServiceBean TCCService
ResourceGroupId string `default:"DEFAULT"`
AppName string
ActionName string
//TargetBean interface{}
//PrepareMethod reflect.Method
//CommitMethodName string
//CommitMethod reflect.Method
//RollbackMethodName string
//RollbackMethod reflect.Method
}

func (t *TCCResource) GetResourceGroupId() string {


+ 106
- 13
pkg/rm/tcc/tcc_rm.go View File

@@ -1,6 +1,12 @@
package tcc

import (
"context"
"fmt"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rm/tcc/api"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/utils/log"
"sync"
)

@@ -9,34 +15,121 @@ import (
"github.com/seata/seata-go/pkg/rm"
)

type TCCRm struct {
rmRemoting rm.RMRemoting
var (
tCCResourceManager *TCCResourceManager
onceTCCResourceManager = &sync.Once{}
)

func init() {
rm.RegisterResourceManager(GetTCCResourceManagerInstance())
}

func GetTCCResourceManagerInstance() *TCCResourceManager {
if tCCResourceManager == nil {
onceTCCResourceManager.Do(func() {
tCCResourceManager = &TCCResourceManager{
resourceManagerMap: sync.Map{},
rmRemoting: rm.GetRMRemotingInstance(),
}
})
}
return tCCResourceManager
}

type TCCResourceManager struct {
rmRemoting *rm.RMRemoting
// resourceID -> resource
resourceManagerMap sync.Map
}

func (t *TCCRm) RegisterResource(resource model.Resource) error {
// register transaction branch
func (t *TCCResourceManager) BranchRegister(ctx context.Context, branchType model.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error) {
request := protocol.BranchRegisterRequest{
Xid: xid,
BranchType: t.GetBranchType(),
ResourceId: resourceId,
LockKey: lockKeys,
ApplicationData: []byte(applicationData),
}
response, err := getty.GetGettyRemotingClient().SendSyncRequest(request)
if err != nil || response == nil {
log.Errorf("BranchRegister error: %v, res %v", err.Error(), response)
return 0, err
}
return response.(protocol.BranchRegisterResponse).BranchId, nil
}

func (t *TCCResourceManager) BranchReport(ctx context.Context, ranchType model.BranchType, xid string, branchId int64, status model.BranchStatus, applicationData string) error {
//TODO implement me
panic("implement me")
}

func (t *TCCResourceManager) LockQuery(ctx context.Context, ranchType model.BranchType, resourceId, xid, lockKeys string) (bool, error) {
//TODO implement me
panic("implement me")
}

func (t *TCCResourceManager) UnregisterResource(resource model.Resource) error {
//TODO implement me
panic("implement me")
}

func (t *TCCResourceManager) RegisterResource(resource model.Resource) error {
if _, ok := resource.(*TCCResource); !ok {
panic(fmt.Sprintf("register tcc resource error, TCCResource is needed, param %v", resource))
}
t.resourceManagerMap.Store(resource.GetResourceId(), resource)
t.rmRemoting.RegisterResource(resource)
return nil
return t.rmRemoting.RegisterResource(resource)
}

func (t *TCCRm) GetManagedResources() sync.Map {
func (t *TCCResourceManager) GetManagedResources() sync.Map {
return t.resourceManagerMap
}

// Commit a branch transaction
func (t *TCCRm) BranchCommit(branchType model.BranchType, xid, branchId int64, resourceId, applicationData string) (model.BranchStatus, error) {
// TODO
return 0, nil
func (t *TCCResourceManager) BranchCommit(ctx context.Context, ranchType model.BranchType, xid string, branchID int64, resourceID string, applicationData []byte) (model.BranchStatus, error) {
var tccResource *TCCResource
if resource, ok := t.resourceManagerMap.Load(resourceID); !ok {
err := fmt.Errorf("CC resource is not exist, resourceId: %s", resourceID)
return 0, err
} else {
tccResource, _ = resource.(*TCCResource)
}

err := tccResource.TCCServiceBean.Commit(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData))
if err != nil {
return model.BranchStatusPhasetwoCommitFailedRetryable, err
}
return model.BranchStatusPhasetwoCommitted, err
}

func (t *TCCResourceManager) getBusinessActionContext(xid string, branchID int64, resourceID string, applicationData []byte) api.BusinessActionContext {
return api.BusinessActionContext{
Xid: xid,
BranchId: string(branchID),
ActionName: resourceID,
// todo get ActionContext
//ActionContext:,
}
}

// Rollback a branch transaction
func (t *TCCRm) BranchRollback(branchType model.BranchType, xid string, branchId int64, resourceId, applicationData string) (model.BranchStatus, error) {
// TODO
return 0, nil
func (t *TCCResourceManager) BranchRollback(ctx context.Context, ranchType model.BranchType, xid string, branchID int64, resourceID string, applicationData []byte) (model.BranchStatus, error) {
var tccResource *TCCResource
if resource, ok := t.resourceManagerMap.Load(resourceID); !ok {
err := fmt.Errorf("CC resource is not exist, resourceId: %s", resourceID)
return 0, err
} else {
tccResource, _ = resource.(*TCCResource)
}

err := tccResource.TCCServiceBean.Rollback(ctx, t.getBusinessActionContext(xid, branchID, resourceID, applicationData))
if err != nil {
return model.BranchStatusPhasetwoRollbacked, err
}
return model.BranchStatusPhasetwoRollbackFailedRetryable, err
}

func (t *TCCRm) GetBranchType() model.BranchType {
func (t *TCCResourceManager) GetBranchType() model.BranchType {
return model.BranchTypeTCC
}

+ 101
- 2
pkg/rm/tcc/tcc_service.go View File

@@ -2,6 +2,16 @@ package tcc

import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/rm"
api2 "github.com/seata/seata-go/pkg/rm/tcc/api"
"github.com/seata/seata-go/pkg/utils"
"github.com/seata/seata-go/pkg/utils/log"
"time"
)

import (
@@ -11,10 +21,99 @@ import (

type TCCService interface {
Prepare(ctx context.Context, params interface{}) error
Commit(ctx context.Context, businessActionContext api.BusinessActionContext) error
Rollback(ctx context.Context, businessActionContext api.BusinessActionContext) error
Commit(ctx context.Context, businessActionContext api2.BusinessActionContext) error
Rollback(ctx context.Context, businessActionContext api2.BusinessActionContext) error

GetActionName() string
GetRemoteType() remoting.RemoteType
GetServiceType() remoting.ServiceType
}

type TCCServiceProxy struct {
TCCService
}

func NewTCCServiceProxy(tccService TCCService) TCCService {
if tccService == nil {
panic("param tccService should not be nil")
}

// register resource
tccResource := TCCResource{
TCCServiceBean: tccService,
ResourceGroupId: "DEFAULT",
AppName: "",
ActionName: tccService.GetActionName(),
}
err := rm.GetResourceManagerFacadeInstance().GetResourceManager(model.BranchTypeTCC).RegisterResource(&tccResource)
if err != nil {
panic(fmt.Sprintf("NewTCCServiceProxy registerResource error: {%#v}", err.Error()))
}

return &TCCServiceProxy{
TCCService: tccService,
}
}

func (t *TCCServiceProxy) Prepare(ctx context.Context, param interface{}) error {
var err error
if model.IsSeataContext(ctx) {
// execute transaction
_, err = api.GetTransactionTemplate().Execute(ctx, t, param)
} else {
log.Warn("context is not inited as seata context, will not execute transaction!")
err = t.TCCService.Prepare(ctx, param)
}
return err
}

// register transaction branch, and then execute business
func (t *TCCServiceProxy) Execute(ctx context.Context, param interface{}) (interface{}, error) {
// register transaction branch
err := t.RegisteBranch(ctx, param)
if err != nil {
return nil, err
}
return nil, t.TCCService.Prepare(ctx, param)
}

func (t *TCCServiceProxy) RegisteBranch(ctx context.Context, param interface{}) error {
// register transaction branch
if !model.HasXID(ctx) {
err := errors.New("BranchRegister error, xid should not be nil")
log.Errorf(err.Error())
return err
}
tccContext := make(map[string]interface{}, 0)
tccContext[common.StartTime] = time.Now().UnixNano() / 1e6
tccContext[common.HostName] = utils.GetLocalIp()
tccContextStr, _ := json.Marshal(tccContext)

branchId, err := rm.GetResourceManagerFacadeInstance().GetResourceManager(model.BranchTypeTCC).BranchRegister(
ctx, model.BranchTypeTCC, t.GetActionName(), "", model.GetXID(ctx), string(tccContextStr), "")
if err != nil {
err = errors.New(fmt.Sprintf("BranchRegister error: %v", err.Error()))
log.Error(err.Error())
return err
}

actionContext := &api2.BusinessActionContext{
Xid: model.GetXID(ctx),
BranchId: string(branchId),
ActionName: t.GetActionName(),
ActionContext: param,
}
model.SetBusinessActionContext(ctx, actionContext)
return nil
}

func (t *TCCServiceProxy) GetTransactionInfo() api.TransactionInfo {
// todo replace with config
return api.TransactionInfo{
TimeOut: 10000,
Name: t.GetActionName(),
//Propagation, Propagation
//LockRetryInternal, int64
//LockRetryTimes int64
}
}

+ 11
- 77
pkg/rpc/getty/getty_client.go View File

@@ -20,14 +20,7 @@ var (
)

type GettyRemotingClient struct {
//conf *config.ClientConfig
idGenerator *atomic.Uint32
//futures *sync.Map
//mergeMsgMap *sync.Map
//rpcMessageChannel chan protocol.RpcMessage
//BranchCommitRequestChannel chan RpcRMMessage
//BranchRollbackRequestChannel chan RpcRMMessage
//GettySessionOnOpenChannel chan string
}

func GetGettyRemotingClient() *GettyRemotingClient {
@@ -58,6 +51,17 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg interface{}) error {
return GetGettyRemotingInstance().SendASync(rpcMessage)
}

func (client *GettyRemotingClient) SendAsyncResponse(msg interface{}) error {
rpcMessage := protocol.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Type: protocol.MSGTypeResponse,
Codec: codec.SEATA,
Compressor: 0,
Body: msg,
}
return GetGettyRemotingInstance().SendASync(rpcMessage)
}

func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{}, error) {
rpcMessage := protocol.RpcMessage{
ID: int32(client.idGenerator.Inc()),
@@ -79,73 +83,3 @@ func (client *GettyRemotingClient) SendSyncRequestWithTimeout(msg interface{}, t
}
return GetGettyRemotingInstance().SendSyncWithTimeout(rpcMessage, timeout)
}

//
//func (client *GettyRemotingClient) RegisterResource(serverAddress string, request protocol.RegisterRMRequest) {
// session := clientSessionManager.AcquireGettySessionByServerAddress(serverAddress)
// if session != nil {
// err := client.sendAsyncRequestWithoutResponse(session, request)
// if err != nil {
// log.Errorf("register resource failed, session:{},resourceID:{}", session, request.ResourceIds)
// }
// }
//}
//
//func loadBalance(transactionServiceGroup string) string {
// addressList := getAddressList(transactionServiceGroup)
// if len(addressList) == 1 {
// return addressList[0]
// }
// return addressList[rand.Intn(len(addressList))]
//}
//
//func getAddressList(transactionServiceGroup string) []string {
// addressList := strings.Split(transactionServiceGroup, ",")
// return addressList
//}
//
//func (client *GettyRemotingClient) processMergedMessage() {
// ticker := time.NewTicker(5 * time.Millisecond)
// mergedMessage := protocol.MergedWarpMessage{
// Msgs: make([]protocol.MessageTypeAware, 0),
// MsgIds: make([]int32, 0),
// }
// for {
// select {
// case rpcMessage := <-client.rpcMessageChannel:
// message := rpcMessage.Body.(protocol.MessageTypeAware)
// mergedMessage.Msgs = append(mergedMessage.Msgs, message)
// mergedMessage.MsgIds = append(mergedMessage.MsgIds, rpcMessage.ID)
// if len(mergedMessage.Msgs) == 20 {
// client.sendMergedMessage(mergedMessage)
// mergedMessage = protocol.MergedWarpMessage{
// Msgs: make([]protocol.MessageTypeAware, 0),
// MsgIds: make([]int32, 0),
// }
// }
// case <-ticker.C:
// if len(mergedMessage.Msgs) > 0 {
// client.sendMergedMessage(mergedMessage)
// mergedMessage = protocol.MergedWarpMessage{
// Msgs: make([]protocol.MessageTypeAware, 0),
// MsgIds: make([]int32, 0),
// }
// }
// }
// }
//}
//
//func (client *GettyRemotingClient) sendMergedMessage(mergedMessage protocol.MergedWarpMessage) {
// ss := clientSessionManager.AcquireGettySession()
// err := client.sendAsync(ss, mergedMessage)
// if err != nil {
// for _, id := range mergedMessage.MsgIds {
// resp, loaded := client.futures.Load(id)
// if loaded {
// response := resp.(*protocol.MessageFuture)
// response.Done <- true
// client.futures.Delete(id)
// }
// }
// }
//}

+ 18
- 6
pkg/rpc/getty/getty_remoting.go View File

@@ -60,6 +60,7 @@ func (client *GettyRemoting) SendASync(msg protocol.RpcMessage) error {
return err
}

// TODO 待重构
func (client *GettyRemoting) sendAsync(session getty.Session, msg protocol.RpcMessage, timeout time.Duration) (interface{}, error) {
var err error
if session == nil || session.IsClosed() {
@@ -76,9 +77,15 @@ func (client *GettyRemoting) sendAsync(session getty.Session, msg protocol.RpcMe

log.Debugf("send message: %#v, session: %s", msg, session.Stat())

if timeout > time.Duration(0) {
actualTimeOut := timeout
if timeout <= time.Duration(0) {
// todo timeoue use config
actualTimeOut = time.Duration(200)
}

wait := func() (interface{}, error) {
select {
case <-gxtime.GetDefaultTimerWheel().After(timeout):
case <-gxtime.GetDefaultTimerWheel().After(actualTimeOut):
client.futures.Delete(msg.ID)
if session != nil {
return nil, errors.Errorf("wait response timeout, ip: %s, request: %#v", session.RemoteAddr(), msg)
@@ -91,6 +98,11 @@ func (client *GettyRemoting) sendAsync(session getty.Session, msg protocol.RpcMe
}
}

if timeout > time.Duration(0) {
return wait()
} else {
go wait()
}
return nil, err
}

@@ -116,15 +128,15 @@ func (client *GettyRemoting) GetMergedMessage(msgID int32) *protocol.MergedWarpM
return nil
}

func (client *GettyRemoting) NotifytRpcMessageResponse(rpcMessage protocol.RpcMessage) {
func (client *GettyRemoting) NotifyRpcMessageResponse(rpcMessage protocol.RpcMessage) {
messageFuture := client.GetMessageFuture(rpcMessage.ID)
if messageFuture != nil {
messageFuture.Response = rpcMessage.Body
// todo messageFuture.Err怎么配置呢?
// todo add messageFuture.Err
//messageFuture.Err = rpcMessage.Err
messageFuture.Done <- true
//client.futures.Delete(rpcMessage.ID)
//client.msgFutures.Delete(rpcMessage.ID)
} else {
log.Infof("msg: {} is not found in futures.", rpcMessage.ID)
log.Infof("msg: {} is not found in msgFutures.", rpcMessage.ID)
}
}

+ 20
- 25
pkg/rpc/getty/listener.go View File

@@ -26,7 +26,7 @@ var (
type gettyClientHandler struct {
conf *config.ClientConfig
idGenerator *atomic.Uint32
futures *sync.Map
msgFutures *sync.Map
mergeMsgMap *sync.Map
processorTable map[protocol.MessageType]processor.RemotingProcessor
}
@@ -37,7 +37,7 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
clientHandler = &gettyClientHandler{
conf: config.GetDefaultClientConfig("seata-go"),
idGenerator: &atomic.Uint32{},
futures: &sync.Map{},
msgFutures: &sync.Map{},
mergeMsgMap: &sync.Map{},
processorTable: make(map[protocol.MessageType]processor.RemotingProcessor, 0),
}
@@ -46,41 +46,37 @@ func GetGettyClientHandlerInstance() *gettyClientHandler {
return clientHandler
}

// OnOpen ...
func (client *gettyClientHandler) OnOpen(session getty.Session) error {
clientSessionManager.RegisterGettySession(session)
go func() {
request := protocol.RegisterTMRequest{AbstractIdentifyRequest: protocol.AbstractIdentifyRequest{
Version: client.conf.SeataVersion,
ApplicationId: client.conf.ApplicationID,
TransactionServiceGroup: client.conf.TransactionServiceGroup,
}}
err := GetGettyRemotingClient().SendAsyncRequest(request)
//client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err != nil {
log.Error("OnOpen error: {%#v}", err.Error())
clientSessionManager.ReleaseGettySession(session)
return
}
//todo
//client.GettySessionOnOpenChannel <- session.RemoteAddr()
}()
//go func() {
// request := protocol.RegisterTMRequest{AbstractIdentifyRequest: protocol.AbstractIdentifyRequest{
// Version: client.conf.SeataVersion,
// ApplicationId: client.conf.ApplicationID,
// TransactionServiceGroup: client.conf.TransactionServiceGroup,
// }}
// err := GetGettyRemotingClient().SendAsyncRequest(request)
// //client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
// if err != nil {
// log.Error("OnOpen error: {%#v}", err.Error())
// clientSessionManager.ReleaseGettySession(session)
// return
// }
//
// //todo
// //client.GettySessionOnOpenChannel <- session.RemoteAddr()
//}()

return nil
}

// OnError ...
func (client *gettyClientHandler) OnError(session getty.Session, err error) {
clientSessionManager.ReleaseGettySession(session)
}

// OnClose ...
func (client *gettyClientHandler) OnClose(session getty.Session) {
clientSessionManager.ReleaseGettySession(session)
}

// OnMessage ...
func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface{}) {
// TODO 需要把session里面的关键信息存储到context中,以方便在后面流程中获取使用。比如,XID等等
ctx := context.Background()
@@ -104,9 +100,8 @@ func (client *gettyClientHandler) OnMessage(session getty.Session, pkg interface
}
}

// OnCron ...
func (client *gettyClientHandler) OnCron(session getty.Session) {
//GetGettyRemotingClient().SendAsyncRequest(protocol.HeartBeatMessagePing)
// todo 发送心跳消息
}

func (client *gettyClientHandler) RegisterProcessor(msgType protocol.MessageType, processor processor.RemotingProcessor) {


+ 1
- 0
pkg/rpc/getty/readwriter.go View File

@@ -50,6 +50,7 @@ var (
rpcPkgHandler = &RpcPackageHandler{}
)

// TODO 待重构
var (
ErrNotEnoughStream = errors.New("packet stream is not enough")
ErrTooLargePackage = errors.New("package length is exceed the getty package's legal maximum length.")


+ 1
- 14
pkg/rpc/getty/rpc_client.go View File

@@ -20,8 +20,7 @@ import (
type RpcClient struct {
conf *config.ClientConfig
gettyClients []getty.Client
//rpcHandler RpcRemoteClient
futures *sync.Map
futures *sync.Map
}

func init() {
@@ -32,7 +31,6 @@ func newRpcClient() *RpcClient {
rpcClient := &RpcClient{
conf: config.GetClientConfig(),
gettyClients: make([]getty.Client, 0),
//rpcHandler: *InitRpcRemoteClient(),
}
rpcClient.init()
return rpcClient
@@ -57,17 +55,6 @@ func (c *RpcClient) init() {

// todo mock
func getAvailServerList(config *config.ClientConfig) []string {
//reg, err := extension.GetRegistry(config.RegistryConfig.Mode)
//if err != nil {
// logger.Errorf("Registry can not connect success, program is going to panic.Error message is %s", err.Error())
// panic(err.Error())
//}
//addrs, err := reg.Lookup()
//if err != nil {
// logger.Errorf("no hava valid server list", err.Error())
// return nil
//}
//return addrs
return []string{"127.0.0.1:8091"}
}



+ 0
- 10
pkg/rpc/getty/rpc_rm_message.go View File

@@ -1,10 +0,0 @@
package getty

import (
"github.com/seata/seata-go/pkg/protocol"
)

type RpcRMMessage struct {
RpcMessage protocol.RpcMessage
ServerAddress string
}

+ 3
- 1
pkg/rpc/processor/client/client_on_response_processor.go View File

@@ -17,6 +17,8 @@ func init() {
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeBranchStatusReportResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeGlobalLockQueryResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeRegRmResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeGlobalBeginResult, clientOnResponseProcessor)
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeGlobalCommitResult, clientOnResponseProcessor)
}

type clientOnResponseProcessor struct {
@@ -44,7 +46,7 @@ func (f *clientOnResponseProcessor) Process(ctx context.Context, rpcMessage prot
// 如果是请求消息,做处理逻辑
msgFuture := getty.GetGettyRemotingInstance().GetMessageFuture(rpcMessage.ID)
if msgFuture != nil {
getty.GetGettyRemotingInstance().NotifytRpcMessageResponse(rpcMessage)
getty.GetGettyRemotingInstance().NotifyRpcMessageResponse(rpcMessage)
getty.GetGettyRemotingInstance().RemoveMessageFuture(rpcMessage.ID)
} else {
if _, ok := rpcMessage.Body.(protocol.AbstractResultMessage); ok {


+ 48
- 0
pkg/rpc/processor/client/rm_branch_commit_processor.go View File

@@ -0,0 +1,48 @@
package client

import (
"context"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/utils/log"
)

func init() {
rmBranchCommitProcessor := &rmBranchCommitProcessor{}
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeBranchCommit, rmBranchCommitProcessor)
}

type rmBranchCommitProcessor struct {
}

func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage protocol.RpcMessage) error {
log.Infof("rm client handle branch commit process %v", rpcMessage)
request := rpcMessage.Body.(protocol.BranchCommitRequest)
xid := request.Xid
branchID := request.BranchId
resourceID := request.ResourceId
applicationData := request.ApplicationData
log.Infof("Branch committing: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := rm.GetResourceManagerFacadeInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
log.Infof("Branch commit error: %s", err.Error())
return err
}

// reply commit response to tc server
response := protocol.BranchCommitResponse{
AbstractBranchEndResponse: protocol.AbstractBranchEndResponse{
Xid: xid,
BranchId: branchID,
BranchStatus: status,
},
}
err = getty.GetGettyRemotingClient().SendAsyncResponse(response)
if err != nil {
log.Error("BranchCommitResponse error: {%#v}", err.Error())
return err
}
return nil
}

+ 48
- 0
pkg/rpc/processor/client/rm_branch_rollback_processor.go View File

@@ -0,0 +1,48 @@
package client

import (
"context"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/utils/log"
)

func init() {
rmBranchCommitProcessor := &rmBranchCommitProcessor{}
getty.GetGettyClientHandlerInstance().RegisterProcessor(protocol.MessageTypeBranchCommit, rmBranchCommitProcessor)
}

type rmBranchRollbackProcessor struct {
}

func (f *rmBranchRollbackProcessor) Process(ctx context.Context, rpcMessage protocol.RpcMessage) error {
log.Infof("rm client handle branch commit process %v", rpcMessage)
request := rpcMessage.Body.(protocol.BranchCommitRequest)
xid := request.Xid
branchID := request.BranchId
resourceID := request.ResourceId
applicationData := request.ApplicationData
log.Infof("Branch committing: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := rm.GetResourceManagerFacadeInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
log.Infof("Branch commit error: %s", err.Error())
return err
}

// reply commit response to tc server
response := protocol.BranchCommitResponse{
AbstractBranchEndResponse: protocol.AbstractBranchEndResponse{
Xid: xid,
BranchId: branchID,
BranchStatus: status,
},
}
err = getty.GetGettyRemotingClient().SendAsyncResponse(response)
if err != nil {
log.Error("BranchCommitResponse error: {%#v}", err.Error())
return err
}
return nil
}

+ 0
- 45
pkg/tm/api/global_transaction.go View File

@@ -1,45 +0,0 @@
package api

import (
"github.com/seata/seata-go/pkg/common/model"
)

type GlobalTransactionRole int8

const (
LAUNCHER GlobalTransactionRole = 0
PARTICIPANT GlobalTransactionRole = 1
)

type GlobalTransaction interface {

// Begin a new global transaction with given timeout and given name.
begin(timeout int64, name string) error

// Commit the global transaction.
commit() error

// Rollback the global transaction.
rollback() error

// Suspend the global transaction.
suspend() (SuspendedResourcesHolder, error)

// Resume the global transaction.
resume(suspendedResourcesHolder SuspendedResourcesHolder) error

// Ask TC for current status of the corresponding global transaction.
getStatus() (model.GlobalStatus, error)

// Get XID.
getXid() string

// report the global transaction status.
globalReport(globalStatus model.GlobalStatus) error

// local status of the global transaction.
getLocalStatus() model.GlobalStatus

// get global transaction role.
getGlobalTransactionRole() GlobalTransactionRole
}

+ 154
- 0
pkg/tm/global_transaction.go View File

@@ -0,0 +1,154 @@
package tm

import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common/model"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/rpc/getty"
"github.com/seata/seata-go/pkg/tm/api"
"github.com/seata/seata-go/pkg/utils/log"
"sync"
)

type GlobalTransaction struct {
Xid string
Status model.GlobalStatus
Role model.GlobalTransactionRole
}

var (
// singletone ResourceManagerFacade
globalTransactionManager *GlobalTransactionManager
onceGlobalTransactionManager = &sync.Once{}
)

func GetGlobalTransactionManager() *GlobalTransactionManager {
if globalTransactionManager == nil {
onceGlobalTransactionManager.Do(func() {
globalTransactionManager = &GlobalTransactionManager{}
})
}
return globalTransactionManager
}

type GlobalTransactionManager struct {
}

// Begin a new global transaction with given timeout and given name.
func (g *GlobalTransactionManager) Begin(ctx context.Context, transaction *GlobalTransaction, timeout int32, name string) error {
if transaction.Role != model.LAUNCHER {
log.Infof("Ignore Begin(): just involved in global transaction %s", transaction.Xid)
return nil
}
if transaction.Xid != "" {
return errors.New(fmt.Sprintf("Global transaction already exists,can't begin a new global transaction, currentXid = %s ", transaction.Xid))
}

req := protocol.GlobalBeginRequest{
TransactionName: name,
Timeout: timeout,
}
res, err := getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("GlobalBeginRequest error, xid %s, error %v", transaction.Xid, err)
return err
}
if res == nil || res.(protocol.GlobalBeginResponse).ResultCode == protocol.ResultCodeFailed {
log.Errorf("GlobalBeginRequest error, xid %s, res %v", transaction.Xid, res)
return err
}
log.Infof("GlobalBeginRequest success, xid %s, res %v", transaction.Xid, res)

transaction.Status = model.Begin
transaction.Xid = res.(protocol.GlobalBeginResponse).Xid
model.SetXID(ctx, res.(protocol.GlobalBeginResponse).Xid)
return nil
}

// Commit the global transaction.
func (g *GlobalTransactionManager) Commit(ctx context.Context, transaction *GlobalTransaction) error {
if transaction.Role != model.LAUNCHER {
log.Infof("Ignore Commit(): just involved in global transaction [{}]", transaction.Xid)
return nil
}
if transaction.Xid == "" {
return errors.New("Commit xid should not be empty")
}

// todo: replace retry with config
var (
err error
res interface{}
)
for retry := 5; retry > 0; retry-- {
req := protocol.GlobalCommitRequest{
AbstractGlobalEndRequest: protocol.AbstractGlobalEndRequest{
Xid: transaction.Xid,
},
}
res, err = getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("GlobalCommitRequest error, xid %s, error %v", transaction.Xid, err)
} else {
break
}
}
if err == nil && res != nil {
transaction.Status = res.(protocol.GlobalCommitResponse).GlobalStatus
}
model.UnbindXid(ctx)
log.Infof("GlobalCommitRequest commit success, xid %s", transaction.Xid)
return err
}

// Rollback the global transaction.
func (g *GlobalTransactionManager) Rollback(ctx context.Context, transaction *GlobalTransaction) error {
if transaction.Role != model.LAUNCHER {
log.Infof("Ignore Commit(): just involved in global transaction [{}]", transaction.Xid)
return nil
}
if transaction.Xid == "" {
return errors.New("Commit xid should not be empty")
}

// todo: replace retry with config
var (
err error
res interface{}
)
for retry := 5; retry > 0; retry-- {
req := protocol.GlobalRollbackRequest{
AbstractGlobalEndRequest: protocol.AbstractGlobalEndRequest{
Xid: transaction.Xid,
},
}
res, err = getty.GetGettyRemotingClient().SendSyncRequest(req)
if err != nil {
log.Errorf("GlobalRollbackRequest error, xid %s, error %v", transaction.Xid, err)
} else {
break
}
}
if err == nil && res != nil {
transaction.Status = res.(protocol.GlobalRollbackResponse).GlobalStatus
}
model.UnbindXid(ctx)
return err
}

// Suspend the global transaction.
func (g *GlobalTransactionManager) Suspend() (api.SuspendedResourcesHolder, error) {
panic("implement me")
}

// Resume the global transaction.
func (g *GlobalTransactionManager) Resume(suspendedResourcesHolder api.SuspendedResourcesHolder) error {
panic("implement me")
}

// report the global transaction status.
func (g *GlobalTransactionManager) GlobalReport(globalStatus model.GlobalStatus) error {
panic("implement me")
}

+ 7
- 0
pkg/utils/error/error.go View File

@@ -0,0 +1,7 @@
package error

type ErrorCode int32

const (
ErrorCode_IllegalState ErrorCode = 40001
)

+ 6
- 0
pkg/utils/net_utils.go View File

@@ -0,0 +1,6 @@
package utils

func GetLocalIp() string {
// todo
return "127.0.0.1"
}

+ 3
- 6
pkg/utils/xid/xid_utils.go View File

@@ -2,14 +2,11 @@ package xid_utils

import (
"context"
)

import (
"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/model"
)

func GetXID(ctx context.Context) string {
xid := ctx.Value(common.XID)
xid := ctx.Value(model.XID)
if xid == nil {
return ""
}
@@ -21,5 +18,5 @@ func HasXID(ctx context.Context) bool {
}

func SetXID(ctx context.Context, xid string) context.Context {
return context.WithValue(ctx, common.XID, xid)
return context.WithValue(ctx, model.XID, xid)
}

+ 49
- 0
test/tcc_service_test.go View File

@@ -0,0 +1,49 @@
package test

import (
"context"
"github.com/seata/seata-go/pkg/common/model"
_ "github.com/seata/seata-go/pkg/imports"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/pkg/rm/tcc/api"
"github.com/seata/seata-go/pkg/rm/tcc/remoting"
"github.com/seata/seata-go/pkg/utils/log"
"testing"
)

type TestTCCServiceBusiness struct {
}

func (T TestTCCServiceBusiness) Prepare(ctx context.Context, params interface{}) error {
log.Infof("TestTCCServiceBusiness Prepare, param %v", params)
return nil
}

func (T TestTCCServiceBusiness) Commit(ctx context.Context, businessActionContext api.BusinessActionContext) error {
log.Infof("TestTCCServiceBusiness Commit, param %v", businessActionContext)
return nil
}

func (T TestTCCServiceBusiness) Rollback(ctx context.Context, businessActionContext api.BusinessActionContext) error {
log.Infof("TestTCCServiceBusiness Rollback, param %v", businessActionContext)
return nil
}

func (T TestTCCServiceBusiness) GetActionName() string {
return "TestTCCServiceBusiness"
}

func (T TestTCCServiceBusiness) GetRemoteType() remoting.RemoteType {
return remoting.RemoteTypeLocalService
}

func (T TestTCCServiceBusiness) GetServiceType() remoting.ServiceType {
return remoting.ServiceTypeProvider
}

func TestNew(test *testing.T) {
tccService := tcc.NewTCCServiceProxy(TestTCCServiceBusiness{})
tccService.Prepare(model.InitSeataContext(context.Background()), 1)

//time.Sleep(time.Second * 1000)
}

+ 1
- 1
testdata/mock/mock_tcc_service.go View File

@@ -3,10 +3,10 @@ package mock
import (
"context"
"fmt"
"github.com/seata/seata-go/pkg/rm/tcc/api"
)

import (
"github.com/seata/seata-go/pkg/rm/api"
"github.com/seata/seata-go/pkg/rm/tcc/remoting"
_ "github.com/seata/seata-go/pkg/utils/xid"
xid_utils "github.com/seata/seata-go/pkg/utils/xid"


Loading…
Cancel
Save