Browse Source

init

tags/v0.1.0-rc1
luky116_Liuyuecai 3 years ago
parent
commit
4322c24d4a
58 changed files with 6096 additions and 0 deletions
  1. +8
    -0
      .idea/.gitignore
  2. +15
    -0
      .idea/git_toolbox_prj.xml
  3. +8
    -0
      .idea/modules.xml
  4. +9
    -0
      .idea/seata-go.iml
  5. +6
    -0
      .idea/vcs.xml
  6. +5
    -0
      cmd/start.go
  7. +37
    -0
      conf/config.yaml
  8. +4
    -0
      coverage.txt
  9. +15
    -0
      go.mod
  10. +1221
    -0
      go.sum
  11. +58
    -0
      makefile
  12. +51
    -0
      pkg/config/client_config.go
  13. +31
    -0
      pkg/config/config_center_config.go
  14. +56
    -0
      pkg/config/getty_config.go
  15. +42
    -0
      pkg/config/registry_config.go
  16. +13
    -0
      pkg/config/tm_config.go
  17. +112
    -0
      pkg/model/branch.go
  18. +102
    -0
      pkg/model/global_status.go
  19. +41
    -0
      pkg/model/resource.go
  20. +110
    -0
      pkg/model/transaction_exception_code.go
  21. +18
    -0
      pkg/model/transaction_manager.go
  22. +247
    -0
      pkg/protocol/codec/codec.go
  23. +779
    -0
      pkg/protocol/codec/seata_decoder.go
  24. +565
    -0
      pkg/protocol/codec/seata_encoder.go
  25. +28
    -0
      pkg/protocol/constant.go
  26. +16
    -0
      pkg/protocol/heart_beat_message.go
  27. +26
    -0
      pkg/protocol/identify.go
  28. +18
    -0
      pkg/protocol/merged_message.go
  29. +17
    -0
      pkg/protocol/message_future.go
  30. +119
    -0
      pkg/protocol/message_type.go
  31. +5
    -0
      pkg/protocol/message_type_aware.go
  32. +18
    -0
      pkg/protocol/result_code.go
  33. +18
    -0
      pkg/protocol/rm.go
  34. +10
    -0
      pkg/protocol/rpc_message.go
  35. +17
    -0
      pkg/protocol/tm.go
  36. +226
    -0
      pkg/protocol/transaction.go
  37. +8
    -0
      pkg/rm/api/business_action_context.go
  38. +94
    -0
      pkg/rm/default_resource_manager.go
  39. +33
    -0
      pkg/rm/tcc/tcc_resource.go
  40. +15
    -0
      pkg/rm/tcc/tcc_service.go
  41. +1
    -0
      pkg/rm/tcc/tcc_service_test.go
  42. +279
    -0
      pkg/rpc11/readwriter.go
  43. +159
    -0
      pkg/rpc11/rpc_client.go
  44. +93
    -0
      pkg/rpc11/rpc_remoting_client.go
  45. +27
    -0
      pkg/rpc11/rpc_remoting_client_test.go
  46. +21
    -0
      pkg/rpc_client/client_message_sender.go
  47. +101
    -0
      pkg/rpc_client/getty_client_session_manager.go
  48. +279
    -0
      pkg/rpc_client/readwriter.go
  49. +102
    -0
      pkg/rpc_client/rpc_client.go
  50. +412
    -0
      pkg/rpc_client/rpc_remoting_client.go
  51. +24
    -0
      pkg/rpc_client/rpc_remoting_client_test.go
  52. +10
    -0
      pkg/rpc_client/rpc_rm_message.go
  53. +45
    -0
      pkg/tm/api/global_transaction.go
  54. +11
    -0
      pkg/tm/api/suspended_resources_holder.go
  55. +218
    -0
      pkg/util/log/logging.go
  56. +38
    -0
      pkg/util/runtime/goroutine.go
  57. +26
    -0
      test/integration_test.go
  58. +29
    -0
      testdata/sql/all_in_one.sql

+ 8
- 0
.idea/.gitignore View File

@@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

+ 15
- 0
.idea/git_toolbox_prj.xml View File

@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="GitToolBoxProjectSettings">
<option name="commitMessageIssueKeyValidationOverride">
<BoolValueOverride>
<option name="enabled" value="true" />
</BoolValueOverride>
</option>
<option name="commitMessageValidationConfigOverride">
<CommitMessageValidationOverride>
<option name="enabled" value="true" />
</CommitMessageValidationOverride>
</option>
</component>
</project>

+ 8
- 0
.idea/modules.xml View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/seata-go.iml" filepath="$PROJECT_DIR$/.idea/seata-go.iml" />
</modules>
</component>
</project>

+ 9
- 0
.idea/seata-go.iml View File

@@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

+ 6
- 0
.idea/vcs.xml View File

@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

+ 5
- 0
cmd/start.go View File

@@ -0,0 +1,5 @@
package main

func main() {
// start the server
}

+ 37
- 0
conf/config.yaml View File

@@ -0,0 +1,37 @@
# 配置类;io.seata.spring.boot.autoconfigure.StarterConstants

seata:
transport:
type: "TCP"
#NIO NATIVE
server: "NIO"
#enable heartbeat
heartbeat: true
# the client batch send request enable
enableClientBatchSendRequest: true
compressor: nome
service:
client:
rm:
asyncCommitBufferLimit: 10000
reportRetryCount: 5
tableMetaCheckEnable: false
reportSuccessEnable: false
sagaBranchRegisterEnable: 10000
sagaJsonParser: fastjson
sagaRetryPersistModeUpdate: false
sagaCompensatePersistModeUpdate: false
tm:
commitRetryCount: 5
rollbackRetryCount: 5
defaultGlobalTransactionTimeout: 60000
degradeCheck: false
degradeCheckAllowTimes: 10
degradeCheckPeriod: 2000
undo:
dataValidation: true
logSerialization: jackson
logTable: undo_log
onlyCareUpdateColumns: true

+ 4
- 0
coverage.txt View File

@@ -0,0 +1,4 @@
mode: atomic
github.com/seata/seata-go/pkg/rm/tcc/tcc_resource.go:20.51,22.2 1 0
github.com/seata/seata-go/pkg/rm/tcc/tcc_resource.go:24.46,26.2 1 0
github.com/seata/seata-go/pkg/rm/tcc/tcc_resource.go:28.56,30.2 1 0

+ 15
- 0
go.mod View File

@@ -1,3 +1,18 @@
module github.com/seata/seata-go module github.com/seata/seata-go


go 1.16 go 1.16

require (
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/apache/dubbo-getty v1.4.8
github.com/dubbogo/gost v1.11.23
github.com/dubbogo/tools v1.0.9 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pkg/errors v0.9.1
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.19.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect
)

+ 1221
- 0
go.sum
File diff suppressed because it is too large
View File


+ 58
- 0
makefile View File

@@ -0,0 +1,58 @@
VERSION=$(shell cat "./VERSION" 2> /dev/null)

GO_FLAGS := -ldflags "-X main.Branch=$(GIT_BRANCH) -X main.Revision=$(GIT_REVISION) -X main.Version=$(VERSION) -extldflags \"-static\" -s -w" -tags netgo
GO = go
GO_PATH = $(shell $(GO) env GOPATH)
GO_OS = $(shell $(GO) env GOOS)
ifeq ($(GO_OS), darwin)
GO_OS = mac
endif

# License environment
GO_LICENSE_CHECKER_DIR = license-header-checker-$(GO_OS)
GO_LICENSE_CHECKER = $(GO_PATH)/bin/license-header-checker
LICENSE_DIR = /tmp/tools/license

# format import code
format-import:
go get -d github.com/dubbogo/tools/cmd/imports-formatter
imports-formatter -path . -module github.com/seata/seata-go -bl false

unit-test:
go test ./pkg/... -coverprofile=coverage.txt -covermode=atomic

# Generate binaries for a Cortex release
dist dist/seatago-linux-amd64 dist/seatago-darwin-amd64 dist/seatago-linux-amd64-sha-256 dist/seatago-darwin-amd64-sha-256:
rm -fr ./dist
mkdir -p ./dist
GOOS="linux" GOARCH="amd64" CGO_ENABLED=0 go build $(GO_FLAGS) -o ./dist/seatago-linux-amd64 ./cmd
GOOS="darwin" GOARCH="amd64" CGO_ENABLED=0 go build $(GO_FLAGS) -o ./dist/seatago-darwin-amd64 ./cmd
sha256sum ./dist/seatago-darwin-amd64 | cut -d ' ' -f 1 > ./dist/seatago-darwin-amd64-sha-256
sha256sum ./dist/seatago-linux-amd64 | cut -d ' ' -f 1 > ./dist/seatago-linux-amd64-sha-256

# Generate binaries for a Cortex release
build dist/seatago dist/seatago-sha-256:
rm -fr ./dist
mkdir -p ./dist
CGO_ENABLED=0 go build $(GO_FLAGS) -o ./dist/seatago ./cmd
sha256sum ./dist/seatago | cut -d ' ' -f 1 > ./dist/seatago-sha-256

#docker-build:
# docker build -t seatago/seatago:latest .

integration-test:
@go clean -testcache
go test -tags integration -v ./test/...

clean:
@rm -rf coverage.txt
@rm -rf dist

prepareLic:
echo 'The makefile is for ci test and has dependencies. Do not run it locally. If you want to run the unit tests, run command `go test ./...` directly.'
$(GO_LICENSE_CHECKER) -version || (wget https://github.com/lsm-dev/license-header-checker/releases/download/v1.2.0/$(GO_LICENSE_CHECKER_DIR).zip -O $(GO_LICENSE_CHECKER_DIR).zip && unzip -o $(GO_LICENSE_CHECKER_DIR).zip && mkdir -p $(GO_PATH)/bin/ && cp $(GO_LICENSE_CHECKER_DIR)/64bit/license-header-checker $(GO_PATH)/bin/)
ls /tmp/tools/license/license.txt || wget -P $(LICENSE_DIR) https://github.com/dubbogo/resources/raw/master/tools/license/license.txt

.PHONY: license
license: prepareLic
$(GO_LICENSE_CHECKER) -v -a -r -i vendor $(LICENSE_DIR)/license.txt . go && [[ -z `git status -s` ]]

+ 51
- 0
pkg/config/client_config.go View File

@@ -0,0 +1,51 @@
package config

import (
"time"
)

var clientConfig *ClientConfig

type ClientConfig struct {
ApplicationID string `yaml:"application_id" json:"application_id,omitempty"`
TransactionServiceGroup string `yaml:"transaction_service_group" json:"transaction_service_group,omitempty"`
EnableClientBatchSendRequest bool `yaml:"enable-rpc_client-batch-send-request" json:"enable-rpc_client-batch-send-request,omitempty"`
SeataVersion string `yaml:"seata_version" json:"seata_version,omitempty"`
GettyConfig GettyConfig `yaml:"getty" json:"getty,omitempty"`

TMConfig TMConfig `yaml:"tm" json:"tm,omitempty"`

ATConfig struct {
DSN string `yaml:"dsn" json:"dsn,omitempty"`
ReportRetryCount int `default:"5" yaml:"report_retry_count" json:"report_retry_count,omitempty"`
ReportSuccessEnable bool `default:"false" yaml:"report_success_enable" json:"report_success_enable,omitempty"`
LockRetryInterval time.Duration `default:"10ms" yaml:"lock_retry_interval" json:"lock_retry_interval,omitempty"`
LockRetryTimes int `default:"30" yaml:"lock_retry_times" json:"lock_retry_times,omitempty"`
} `yaml:"at" json:"at,omitempty"`

RegistryConfig RegistryConfig `yaml:"registry_config" json:"registry_config,omitempty"` //注册中心配置信息
ConfigCenterConfig ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` //配置中心配置信息
}

func GetClientConfig() *ClientConfig {
// todo mock data
//return clientConfig
return &ClientConfig{
GettyConfig: GetDefaultGettyConfig(),
}
}

func GetTMConfig() TMConfig {
return clientConfig.TMConfig
}

func GetDefaultClientConfig(applicationID string) ClientConfig {
return ClientConfig{
ApplicationID: applicationID,
TransactionServiceGroup: "127.0.0.1:8091",
EnableClientBatchSendRequest: false,
SeataVersion: "1.1.0",
GettyConfig: GetDefaultGettyConfig(),
TMConfig: GetDefaultTmConfig(),
}
}

+ 31
- 0
pkg/config/config_center_config.go View File

@@ -0,0 +1,31 @@
package config

import (
"time"
)

// ConfigCenterConfig config center config
type ConfigCenterConfig struct {
Mode string `yaml:"type" json:"type,omitempty"` //类型
NacosConfig NacosConfigCenter `yaml:"nacos" json:"nacos,omitempty"`
ETCDConfig EtcdConfigCenter `yaml:"etcdv3" json:"etcdv3,omitempty"`
}

// NacosConfigCenter nacos config center
type NacosConfigCenter struct {
ServerAddr string `yaml:"server_addr" json:"server_addr,omitempty"`
Group string `default:"SEATA_GROUP" yaml:"group" json:"group,omitempty"`
Namespace string `yaml:"namespace" json:"namespace,omitempty"`
Cluster string `yaml:"cluster" json:"cluster,omitempty"`
UserName string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
DataID string `default:"seata" yaml:"data_id" json:"data_id,omitempty"`
}

type EtcdConfigCenter struct {
Name string `default:"seata-config-center" yaml:"name" json:"name"`
ConfigKey string `default:"config-seata" yaml:"config_key" json:"config_key,omitempty"`
Endpoints string `yaml:"endpoints" json:"endpoints,omitempty"`
Heartbeats int `yaml:"heartbeats" json:"heartbeats"`
Timeout time.Duration `yaml:"timeout" json:"timeout"`
}

+ 56
- 0
pkg/config/getty_config.go View File

@@ -0,0 +1,56 @@
package config

import (
"time"
)

// GettyConfig
//Config holds supported types by the multiconfig package
type GettyConfig struct {
ReconnectInterval int `default:"0" yaml:"reconnect_interval" json:"reconnect_interval,omitempty"`
// getty_session pool
ConnectionNum int `default:"16" yaml:"connection_number" json:"connection_number,omitempty"`

// heartbeat
HeartbeatPeriod time.Duration `default:"15s" yaml:"heartbeat_period" json:"heartbeat_period,omitempty"`

// getty_session tcp parameters
GettySessionParam GettySessionParam `required:"true" yaml:"getty_session_param" json:"getty_session_param,omitempty"`
}

// GetDefaultGettyConfig ...
func GetDefaultGettyConfig() GettyConfig {
return GettyConfig{
ReconnectInterval: 0,
ConnectionNum: 2,
HeartbeatPeriod: 10 * time.Second,
GettySessionParam: GettySessionParam{
CompressEncoding: false,
TCPNoDelay: true,
TCPKeepAlive: true,
KeepAlivePeriod: 180 * time.Second,
TCPRBufSize: 262144,
TCPWBufSize: 65536,
TCPReadTimeout: time.Second,
TCPWriteTimeout: 5 * time.Second,
WaitTimeout: time.Second,
MaxMsgLen: 4096,
SessionName: "rpc_client",
},
}
}

// GettySessionParam getty session param
type GettySessionParam struct {
CompressEncoding bool `default:"false" yaml:"compress_encoding" json:"compress_encoding,omitempty"`
TCPNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"`
TCPKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"`
KeepAlivePeriod time.Duration `default:"180s" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"`
TCPRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"`
TCPWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"`
TCPReadTimeout time.Duration `default:"1s" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"`
TCPWriteTimeout time.Duration `default:"5s" yaml:"tcp_write_timeout" json:"tcp_write_timeout,omitempty"`
WaitTimeout time.Duration `default:"7s" yaml:"wait_timeout" json:"wait_timeout,omitempty"`
MaxMsgLen int `default:"4096" yaml:"max_msg_len" json:"max_msg_len,omitempty"`
SessionName string `default:"rpc" yaml:"session_name" json:"session_name,omitempty"`
}

+ 42
- 0
pkg/config/registry_config.go View File

@@ -0,0 +1,42 @@
package config

import (
"time"
)

var config *RegistryConfig

// RegistryConfig registry config
type RegistryConfig struct {
Mode string `yaml:"type" json:"type,omitempty"` //类型
NacosConfig NacosConfig `yaml:"nacos" json:"nacos,omitempty"`
EtcdConfig EtcdConfig `yaml:"etcdv3" json:"etcdv3"`
}

// NacosConfig nacos config
type NacosConfig struct {
Application string `yaml:"application" json:"application,omitempty"`
ServerAddr string `yaml:"server_addr" json:"server_addr,omitempty"`
Group string `default:"SEATA_GROUP" yaml:"group" json:"group,omitempty"`
Namespace string `yaml:"namespace" json:"namespace,omitempty"`
Cluster string `yaml:"cluster" json:"cluster,omitempty"`
UserName string `yaml:"username" json:"username,omitempty"`
Password string `yaml:"password" json:"password,omitempty"`
}

// InitRegistryConfig init registry config
func InitRegistryConfig(registryConfig *RegistryConfig) {
config = registryConfig
}

// GetRegistryConfig get registry config
func GetRegistryConfig() *RegistryConfig {
return config
}

type EtcdConfig struct {
ClusterName string `default:"seata-golang-etcdv3" yaml:"cluster_name" json:"cluster_name,omitempty"`
Endpoints string `yaml:"endpoints" json:"endpoints,omitempty"`
Heartbeats int `yaml:"heartbeats" json:"heartbeats"`
Timeout time.Duration `yaml:"timeout" json:"timeout"`
}

+ 13
- 0
pkg/config/tm_config.go View File

@@ -0,0 +1,13 @@
package config

type TMConfig struct {
CommitRetryCount int32 `default:"5" yaml:"commit_retry_count" json:"commit_retry_count,omitempty"`
RollbackRetryCount int32 `default:"5" yaml:"rollback_retry_count" json:"rollback_retry_count,omitempty"`
}

func GetDefaultTmConfig() TMConfig {
return TMConfig{
CommitRetryCount: 5,
RollbackRetryCount: 5,
}
}

+ 112
- 0
pkg/model/branch.go View File

@@ -0,0 +1,112 @@
package model

import (
"fmt"
)

type BranchType int8
type BranchStatus int8

const (
AT BranchType = 0
TCC BranchType = 1
SAGA BranchType = 2
XA BranchType = 3
)

const (
/**
* The BranchStatus_Unknown.
* description:BranchStatus_Unknown branch status.
*/
BranchStatusUnknown BranchStatus = iota

/**
* The BranchStatus_Registered.
* description:BranchStatus_Registered to TC.
*/
BranchStatusRegistered

/**
* The Phase one done.
* description:Branch logic is successfully done at phase one.
*/
BranchStatusPhaseoneDone

/**
* The Phase one failed.
* description:Branch logic is failed at phase one.
*/
BranchStatusPhaseoneFailed

/**
* The Phase one timeout.
* description:Branch logic is NOT reported for a timeout.
*/
BranchStatusPhaseoneTimeout

/**
* The Phase two committed.
* description:Commit logic is successfully done at phase two.
*/
BranchStatusPhasetwoCommitted

/**
* The Phase two commit failed retryable.
* description:Commit logic is failed but retryable.
*/
BranchStatusPhasetwoCommitFailedRetryable

/**
* The Phase two commit failed unretryable.
* description:Commit logic is failed and NOT retryable.
*/
BranchStatusPhasetwoCommitFailedUnretryable

/**
* The Phase two rollbacked.
* description:Rollback logic is successfully done at phase two.
*/
BranchStatusPhasetwoRollbacked

/**
* The Phase two rollback failed retryable.
* description:Rollback logic is failed but retryable.
*/
BranchStatusPhasetwoRollbackFailedRetryable

/**
* The Phase two rollback failed unretryable.
* description:Rollback logic is failed but NOT retryable.
*/
BranchStatusPhasetwoRollbackFailedUnretryable
)

func (s BranchStatus) String() string {
switch s {
case BranchStatusUnknown:
return "Unknown"
case BranchStatusRegistered:
return "Registered"
case BranchStatusPhaseoneDone:
return "PhaseoneDone"
case BranchStatusPhaseoneFailed:
return "PhaseoneFailed"
case BranchStatusPhaseoneTimeout:
return "PhaseoneTimeout"
case BranchStatusPhasetwoCommitted:
return "PhasetwoCommitted"
case BranchStatusPhasetwoCommitFailedRetryable:
return "PhasetwoCommitFailedRetryable"
case BranchStatusPhasetwoCommitFailedUnretryable:
return "CommitFailedUnretryable"
case BranchStatusPhasetwoRollbacked:
return "PhasetwoRollbacked"
case BranchStatusPhasetwoRollbackFailedRetryable:
return "RollbackFailedRetryable"
case BranchStatusPhasetwoRollbackFailedUnretryable:
return "RollbackFailedUnretryable"
default:
return fmt.Sprintf("%d", s)
}
}

+ 102
- 0
pkg/model/global_status.go View File

@@ -0,0 +1,102 @@
package model

type GlobalStatus int64

const (

/**
* Un known global status.
*/
// Unknown
UnKnown GlobalStatus = 0

/**
* The Begin.
*/
// PHASE 1: can accept new branch registering.
Begin GlobalStatus = 1

/**
* PHASE 2: Running Status: may be changed any time.
*/
// Committing.
Committing GlobalStatus = 2

/**
* The Commit retrying.
*/
// Retrying commit after a recoverable failure.
CommitRetrying GlobalStatus = 3

/**
* Rollbacking global status.
*/
// Rollbacking
Rollbacking GlobalStatus = 4

/**
* The Rollback retrying.
*/
// Retrying rollback after a recoverable failure.
RollbackRetrying GlobalStatus = 5

/**
* The Timeout rollbacking.
*/
// Rollbacking since timeout
TimeoutRollbacking GlobalStatus = 6

/**
* The Timeout rollback retrying.
*/
// Retrying rollback GlobalStatus = since timeout) after a recoverable failure.
TimeoutRollbackRetrying GlobalStatus = 7

/**
* All branches can be async committed. The committing is NOT done yet, but it can be seen as committed for TM/RM
* client.
*/
AsyncCommitting GlobalStatus = 8

/**
* PHASE 2: Final Status: will NOT change any more.
*/
// Finally: global transaction is successfully committed.
Committed GlobalStatus = 9

/**
* The Commit failed.
*/
// Finally: failed to commit
CommitFailed GlobalStatus = 10

/**
* The Rollbacked.
*/
// Finally: global transaction is successfully rollbacked.
Rollbacked GlobalStatus = 11

/**
* The Rollback failed.
*/
// Finally: failed to rollback
RollbackFailed GlobalStatus = 12

/**
* The Timeout rollbacked.
*/
// Finally: global transaction is successfully rollbacked since timeout.
TimeoutRollbacked GlobalStatus = 13

/**
* The Timeout rollback failed.
*/
// Finally: failed to rollback since timeout
TimeoutRollbackFailed GlobalStatus = 14

/**
* The Finished.
*/
// Not managed in session MAP any more
Finished GlobalStatus = 15
)

+ 41
- 0
pkg/model/resource.go View File

@@ -0,0 +1,41 @@
package model

// Resource that can be managed by Resource Manager and involved into global transaction
type Resource interface {
GetResourceGroupId() string
GetResourceId() string
GetBranchType() BranchType
}

// Control a branch transaction commit or rollback
type ResourceManagerInbound interface {
// Commit a branch transaction
BranchCommit(branchType BranchType, xid, branchId int64, resourceId, applicationData string) (BranchStatus, error)
// Rollback a branch transaction
BranchRollback(branchType BranchType, xid string, branchId int64, resourceId, applicationData string) (BranchStatus, error)
}

// Resource Manager: send outbound request to TC
type ResourceManagerOutbound interface {
// Branch register long
BranchRegister(branchType BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error)
// Branch report
BranchReport(branchType BranchType, xid string, branchId int64, status BranchStatus, applicationData string) error
// Lock query boolean
LockQuery(branchType BranchType, resourceId, xid, lockKeys string) (bool, error)
}

// Resource Manager: common behaviors
type ResourceManager interface {
ResourceManagerInbound
ResourceManagerOutbound

// Register a Resource to be managed by Resource Manager
RegisterResource(resource Resource) error
// Unregister a Resource from the Resource Manager
UnregisterResource(resource Resource) error
// Get all resources managed by this manager
GetManagedResources() map[string]Resource
// Get the BranchType
GetBranchType() BranchType
}

+ 110
- 0
pkg/model/transaction_exception_code.go View File

@@ -0,0 +1,110 @@
package model

type TransactionExceptionCode byte

const (
/**
* Unknown transaction exception code.
*/
TransactionExceptionCodeUnknown TransactionExceptionCode = iota

/**
* BeginFailed
*/
TransactionExceptionCodeBeginFailed

/**
* Lock key conflict transaction exception code.
*/
TransactionExceptionCodeLockKeyConflict

/**
* Io transaction exception code.
*/
IO

/**
* Branch rollback failed retriable transaction exception code.
*/
TransactionExceptionCodeBranchRollbackFailedRetriable

/**
* Branch rollback failed unretriable transaction exception code.
*/
TransactionExceptionCodeBranchRollbackFailedUnretriable

/**
* Branch register failed transaction exception code.
*/
TransactionExceptionCodeBranchRegisterFailed

/**
* Branch report failed transaction exception code.
*/
TransactionExceptionCodeBranchReportFailed

/**
* Lockable check failed transaction exception code.
*/
TransactionExceptionCodeLockableCheckFailed

/**
* Branch transaction not exist transaction exception code.
*/
TransactionExceptionCodeBranchTransactionNotExist

/**
* Global transaction not exist transaction exception code.
*/
TransactionExceptionCodeGlobalTransactionNotExist

/**
* Global transaction not active transaction exception code.
*/
TransactionExceptionCodeGlobalTransactionNotActive

/**
* Global transaction status invalid transaction exception code.
*/
TransactionExceptionCodeGlobalTransactionStatusInvalid

/**
* Failed to send branch commit request transaction exception code.
*/
TransactionExceptionCodeFailedToSendBranchCommitRequest

/**
* Failed to send branch rollback request transaction exception code.
*/
TransactionExceptionCodeFailedToSendBranchRollbackRequest

/**
* Failed to add branch transaction exception code.
*/
TransactionExceptionCodeFailedToAddBranch

/**
* Failed to lock global transaction exception code.
*/
TransactionExceptionCodeFailedLockGlobalTranscation

/**
* FailedWriteSession
*/
TransactionExceptionCodeFailedWriteSession

/**
* Failed to holder exception code
*/
FailedStore
)

type TransactionException struct {
Code TransactionExceptionCode
Message string
}

//Error 隐式继承 builtin.error 接口
func (e TransactionException) Error() string {
return "TransactionException: " + e.Message
}

+ 18
- 0
pkg/model/transaction_manager.go View File

@@ -0,0 +1,18 @@
package model

type TransactionManager interface {
// Begin a new global transaction.
Begin(applicationId, transactionServiceGroup, name string, timeout int64) (string, error)

// Global commit.
Commit(xid string) (GlobalStatus, error)

//Global rollback.
Rollback(xid string) (GlobalStatus, error)

// Get current status of the give transaction.
GetStatus(xid string) (GlobalStatus, error)

// Global report.
GlobalReport(xid string, globalStatus GlobalStatus) (GlobalStatus, error)
}

+ 247
- 0
pkg/protocol/codec/codec.go View File

@@ -0,0 +1,247 @@
package codec

import (
"bytes"
)

import (
"vimagination.zapto.org/byteio"
)

import (
"github.com/seata/seata-go/pkg/protocol"
log "github.com/seata/seata-go/pkg/util/log"
)

type SerializerType byte

const (
SEATA = byte(0x1)
PROTOBUF = byte(0x2)
KRYO = byte(0x4)
FST = byte(0x8)
)

type Encoder func(in interface{}) []byte

type Decoder func(in []byte) (interface{}, int)

func MessageEncoder(codecType byte, in interface{}) []byte {
switch codecType {
case SEATA:
return SeataEncoder(in)
default:
log.Errorf("not support codecType, %s", codecType)
return nil
}
}

func MessageDecoder(codecType byte, in []byte) (interface{}, int) {
switch codecType {
case SEATA:
return SeataDecoder(in)
default:
log.Errorf("not support codecType, %s", codecType)
return nil, 0
}
}

func SeataEncoder(in interface{}) []byte {
var result = make([]byte, 0)
msg := in.(protocol.MessageTypeAware)
typeCode := msg.GetTypeCode()
encoder := getMessageEncoder(typeCode)

typeC := uint16(typeCode)
if encoder != nil {
body := encoder(in)
result = append(result, []byte{byte(typeC >> 8), byte(typeC)}...)
result = append(result, body...)
}
return result
}

func SeataDecoder(in []byte) (interface{}, int) {
r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
typeCode, _, _ := r.ReadInt16()

decoder := getMessageDecoder(typeCode)
if decoder != nil {
return decoder(in[2:])
}
return nil, 0
}

func getMessageEncoder(typeCode int16) Encoder {
switch typeCode {
case protocol.TypeSeataMerge:
return MergedWarpMessageEncoder
case protocol.TypeSeataMergeResult:
return MergeResultMessageEncoder
case protocol.TypeRegClt:
return RegisterTMRequestEncoder
case protocol.TypeRegCltResult:
return RegisterTMResponseEncoder
case protocol.TypeRegRm:
return RegisterRMRequestEncoder
case protocol.TypeRegRmResult:
return RegisterRMResponseEncoder
case protocol.TypeBranchCommit:
return BranchCommitRequestEncoder
case protocol.TypeBranchRollback:
return BranchRollbackRequestEncoder
case protocol.TypeGlobalReport:
return GlobalReportRequestEncoder
default:
var encoder Encoder
encoder = getMergeRequestMessageEncoder(typeCode)
if encoder != nil {
return encoder
}
encoder = getMergeResponseMessageEncoder(typeCode)
if encoder != nil {
return encoder
}
log.Errorf("not support typeCode, %d", typeCode)
return nil
}
}

func getMergeRequestMessageEncoder(typeCode int16) Encoder {
switch typeCode {
case protocol.TypeGlobalBegin:
return GlobalBeginRequestEncoder
case protocol.TypeGlobalCommit:
return GlobalCommitRequestEncoder
case protocol.TypeGlobalRollback:
return GlobalRollbackRequestEncoder
case protocol.TypeGlobalStatus:
return GlobalStatusRequestEncoder
case protocol.TypeGlobalLockQuery:
return GlobalLockQueryRequestEncoder
case protocol.TypeBranchRegister:
return BranchRegisterRequestEncoder
case protocol.TypeBranchStatusReport:
return BranchReportRequestEncoder
case protocol.TypeGlobalReport:
return GlobalReportRequestEncoder
default:
break
}
return nil
}

func getMergeResponseMessageEncoder(typeCode int16) Encoder {
switch typeCode {
case protocol.TypeGlobalBeginResult:
return GlobalBeginResponseEncoder
case protocol.TypeGlobalCommitResult:
return GlobalCommitResponseEncoder
case protocol.TypeGlobalRollbackResult:
return GlobalRollbackResponseEncoder
case protocol.TypeGlobalStatusResult:
return GlobalStatusResponseEncoder
case protocol.TypeGlobalLockQueryResult:
return GlobalLockQueryResponseEncoder
case protocol.TypeBranchRegisterResult:
return BranchRegisterResponseEncoder
case protocol.TypeBranchStatusReportResult:
return BranchReportResponseEncoder
case protocol.TypeBranchCommitResult:
return BranchCommitResponseEncoder
case protocol.TypeBranchRollbackResult:
return BranchRollbackResponseEncoder
case protocol.TypeGlobalReportResult:
return GlobalReportResponseEncoder
default:
break
}
return nil
}

func getMessageDecoder(typeCode int16) Decoder {
switch typeCode {
case protocol.TypeSeataMerge:
return MergedWarpMessageDecoder
case protocol.TypeSeataMergeResult:
return MergeResultMessageDecoder
case protocol.TypeRegClt:
return RegisterTMRequestDecoder
case protocol.TypeRegCltResult:
return RegisterTMResponseDecoder
case protocol.TypeRegRm:
return RegisterRMRequestDecoder
case protocol.TypeRegRmResult:
return RegisterRMResponseDecoder
case protocol.TypeBranchCommit:
return BranchCommitRequestDecoder
case protocol.TypeBranchRollback:
return BranchRollbackRequestDecoder
case protocol.TypeGlobalReport:
return GlobalReportRequestDecoder
default:
var Decoder Decoder
Decoder = getMergeRequestMessageDecoder(typeCode)
if Decoder != nil {
return Decoder
}
Decoder = getMergeResponseMessageDecoder(typeCode)
if Decoder != nil {
return Decoder
}
log.Errorf("not support typeCode, %d", typeCode)
return nil
}
}

func getMergeRequestMessageDecoder(typeCode int16) Decoder {
switch typeCode {
case protocol.TypeGlobalBegin:
return GlobalBeginRequestDecoder
case protocol.TypeGlobalCommit:
return GlobalCommitRequestDecoder
case protocol.TypeGlobalRollback:
return GlobalRollbackRequestDecoder
case protocol.TypeGlobalStatus:
return GlobalStatusRequestDecoder
case protocol.TypeGlobalLockQuery:
return GlobalLockQueryRequestDecoder
case protocol.TypeBranchRegister:
return BranchRegisterRequestDecoder
case protocol.TypeBranchStatusReport:
return BranchReportRequestDecoder
case protocol.TypeGlobalReport:
return GlobalReportRequestDecoder
default:
break
}
return nil
}

func getMergeResponseMessageDecoder(typeCode int16) Decoder {
switch typeCode {
case protocol.TypeGlobalBeginResult:
return GlobalBeginResponseDecoder
case protocol.TypeGlobalCommitResult:
return GlobalCommitResponseDecoder
case protocol.TypeGlobalRollbackResult:
return GlobalRollbackResponseDecoder
case protocol.TypeGlobalStatusResult:
return GlobalStatusResponseDecoder
case protocol.TypeGlobalLockQueryResult:
return GlobalLockQueryResponseDecoder
case protocol.TypeBranchRegisterResult:
return BranchRegisterResponseDecoder
case protocol.TypeBranchStatusReportResult:
return BranchReportResponseDecoder
case protocol.TypeBranchCommitResult:
return BranchCommitResponseDecoder
case protocol.TypeBranchRollbackResult:
return BranchRollbackResponseDecoder
case protocol.TypeGlobalReportResult:
return GlobalReportResponseDecoder
default:
break
}
return nil
}

+ 779
- 0
pkg/protocol/codec/seata_decoder.go View File

@@ -0,0 +1,779 @@
package codec

import (
"bytes"
)

import (
"vimagination.zapto.org/byteio"
)

import (
"github.com/seata/seata-go/pkg/model"
"github.com/seata/seata-go/pkg/protocol"
)

func AbstractResultMessageDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.AbstractResultMessage{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
msg.ResultCode = protocol.ResultCode(resultCode)
totalReadN += 1
if msg.ResultCode == protocol.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Msg, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}
}

return msg, totalReadN
}

func MergedWarpMessageDecoder(in []byte) (interface{}, int) {
var (
size16 int16 = 0
readN = 0
totalReadN = 0
)
result := protocol.MergedWarpMessage{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

r.ReadInt32()
totalReadN += 4
size16, readN, _ = r.ReadInt16()
totalReadN += readN
result.Msgs = make([]protocol.MessageTypeAware, 0)
for index := 0; index < int(size16); index++ {
typeCode, _, _ := r.ReadInt16()
totalReadN += 2
decoder := getMessageDecoder(typeCode)
if decoder != nil {
msg, readN := decoder(in[totalReadN:])
totalReadN += readN
result.Msgs = append(result.Msgs, msg.(protocol.MessageTypeAware))
}
}
return result, totalReadN
}

func MergeResultMessageDecoder(in []byte) (interface{}, int) {
var (
size16 int16 = 0
readN = 0
totalReadN = 0
)
result := protocol.MergeResultMessage{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

r.ReadInt32()
totalReadN += 4
size16, readN, _ = r.ReadInt16()
totalReadN += readN
result.Msgs = make([]protocol.MessageTypeAware, 0)

for index := 0; index < int(size16); index++ {
typeCode, _, _ := r.ReadInt16()
totalReadN += 2
decoder := getMessageDecoder(typeCode)
if decoder != nil {
msg, readN := decoder(in[totalReadN:])
totalReadN += readN
result.Msgs = append(result.Msgs, msg.(protocol.MessageTypeAware))
}
}
return result, totalReadN
}

func AbstractIdentifyRequestDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.AbstractIdentifyRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Version, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ApplicationId, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.TransactionServiceGroup, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ExtraData = make([]byte, int(length16))
readN, _ := r.Read(msg.ExtraData)
totalReadN += readN
}

return msg, totalReadN
}

func AbstractIdentifyResponseDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.AbstractIdentifyResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

identified, _ := r.ReadByte()
totalReadN += 1
if identified == byte(1) {
msg.Identified = true
} else if identified == byte(0) {
msg.Identified = false
}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Version, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

return msg, totalReadN
}

func RegisterRMRequestDecoder(in []byte) (interface{}, int) {
var (
length32 uint32 = 0
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.RegisterRMRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Version, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ApplicationId, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.TransactionServiceGroup, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ExtraData = make([]byte, int(length16))
readN, _ := r.Read(msg.ExtraData)
totalReadN += readN
}

length32, readN, _ = r.ReadUint32()
totalReadN += readN
if length32 > 0 {
msg.ResourceIds, readN, _ = r.ReadString(int(length32))
totalReadN += readN
}

return msg, totalReadN
}

func RegisterRMResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractIdentifyResponseDecoder(in)
abstractIdentifyResponse := resp.(protocol.AbstractIdentifyResponse)
msg := protocol.RegisterRMResponse{AbstractIdentifyResponse: abstractIdentifyResponse}
return msg, totalReadN
}

func RegisterTMRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := AbstractIdentifyRequestDecoder(in)
abstractIdentifyRequest := req.(protocol.AbstractIdentifyRequest)
msg := protocol.RegisterTMRequest{AbstractIdentifyRequest: abstractIdentifyRequest}
return msg, totalReadN
}

func RegisterTMResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractIdentifyResponseDecoder(in)
abstractIdentifyResponse := resp.(protocol.AbstractIdentifyResponse)
msg := protocol.RegisterRMResponse{AbstractIdentifyResponse: abstractIdentifyResponse}
return msg, totalReadN
}

func AbstractTransactionResponseDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.AbstractTransactionResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
totalReadN += 1
msg.ResultCode = protocol.ResultCode(resultCode)
if msg.ResultCode == protocol.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Msg, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}
}

exceptionCode, _ := r.ReadByte()
totalReadN += 1
msg.TransactionExceptionCode = model.TransactionExceptionCode(exceptionCode)

return msg, totalReadN
}

func AbstractBranchEndRequestDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.AbstractBranchEndRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Xid, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

msg.BranchId, _, _ = r.ReadInt64()
totalReadN += 8
branchType, _ := r.ReadByte()
totalReadN += 1
msg.BranchType = model.BranchType(branchType)

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ResourceId, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ApplicationData = make([]byte, int(length16))
readN, _ := r.Read(msg.ApplicationData)
totalReadN += readN
}

return msg, totalReadN
}

func AbstractBranchEndResponseDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.AbstractBranchEndResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
totalReadN += 1
msg.ResultCode = protocol.ResultCode(resultCode)
if msg.ResultCode == protocol.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Msg, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}
}

exceptionCode, _ := r.ReadByte()
totalReadN += 1
msg.TransactionExceptionCode = model.TransactionExceptionCode(exceptionCode)

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Xid, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

msg.BranchId, _, _ = r.ReadInt64()
totalReadN += 8
branchStatus, _ := r.ReadByte()
totalReadN += 1
msg.BranchStatus = model.BranchStatus(branchStatus)

return msg, totalReadN
}

func AbstractGlobalEndRequestDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.AbstractGlobalEndRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Xid, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ExtraData = make([]byte, int(length16))
readN, _ := r.Read(msg.ExtraData)
totalReadN += readN
}

return msg, totalReadN
}

func AbstractGlobalEndResponseDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.AbstractGlobalEndResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
totalReadN += 1
msg.ResultCode = protocol.ResultCode(resultCode)
if msg.ResultCode == protocol.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Msg, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}
}

exceptionCode, _ := r.ReadByte()
totalReadN += 1
msg.TransactionExceptionCode = model.TransactionExceptionCode(exceptionCode)

globalStatus, _ := r.ReadByte()
totalReadN += 1
msg.GlobalStatus = model.GlobalStatus(globalStatus)

return msg, totalReadN
}

func BranchCommitRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := AbstractBranchEndRequestDecoder(in)
abstractBranchEndRequest := req.(protocol.AbstractBranchEndRequest)
msg := protocol.BranchCommitRequest{AbstractBranchEndRequest: abstractBranchEndRequest}
return msg, totalReadN
}

func BranchCommitResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractBranchEndResponseDecoder(in)
abstractBranchEndResponse := resp.(protocol.AbstractBranchEndResponse)
msg := protocol.BranchCommitResponse{AbstractBranchEndResponse: abstractBranchEndResponse}
return msg, totalReadN
}

func BranchRegisterRequestDecoder(in []byte) (interface{}, int) {
var (
length32 uint32 = 0
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.BranchRegisterRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Xid, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

branchType, _ := r.ReadByte()
totalReadN += 1
msg.BranchType = model.BranchType(branchType)

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ResourceId, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

length32, readN, _ = r.ReadUint32()
totalReadN += readN
if length32 > 0 {
msg.LockKey, readN, _ = r.ReadString(int(length32))
totalReadN += readN
}

length32, readN, _ = r.ReadUint32()
totalReadN += readN
if length32 > 0 {
msg.ApplicationData = make([]byte, int(length32))
readN, _ := r.Read(msg.ApplicationData)
totalReadN += readN
}

return msg, totalReadN
}

func BranchRegisterResponseDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.BranchRegisterResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
totalReadN += 1
msg.ResultCode = protocol.ResultCode(resultCode)
if msg.ResultCode == protocol.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Msg, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}
}

exceptionCode, _ := r.ReadByte()
totalReadN += 1
msg.TransactionExceptionCode = model.TransactionExceptionCode(exceptionCode)

msg.BranchId, readN, _ = r.ReadInt64()
totalReadN += readN

return msg, totalReadN
}

func BranchReportRequestDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.BranchReportRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Xid, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

msg.BranchId, _, _ = r.ReadInt64()
branchStatus, _ := r.ReadByte()
msg.Status = model.BranchStatus(branchStatus)

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ResourceId, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ApplicationData = make([]byte, int(length16))
readN, _ := r.Read(msg.ApplicationData)
totalReadN += readN
}

branchType, _ := r.ReadByte()
totalReadN += 1
msg.BranchType = model.BranchType(branchType)

return msg, totalReadN
}

func BranchReportResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractTransactionResponseDecoder(in)
abstractTransactionResponse := resp.(protocol.AbstractTransactionResponse)
msg := protocol.BranchReportResponse{AbstractTransactionResponse: abstractTransactionResponse}
return msg, totalReadN
}

func BranchRollbackRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := AbstractBranchEndRequestDecoder(in)
abstractBranchEndRequest := req.(protocol.AbstractBranchEndRequest)
msg := protocol.BranchRollbackRequest{AbstractBranchEndRequest: abstractBranchEndRequest}
return msg, totalReadN
}

func BranchRollbackResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractBranchEndResponseDecoder(in)
abstractBranchEndResponse := resp.(protocol.AbstractBranchEndResponse)
msg := protocol.BranchRollbackResponse{AbstractBranchEndResponse: abstractBranchEndResponse}
return msg, totalReadN
}

func GlobalBeginRequestDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.GlobalBeginRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

timeout, readN, _ := r.ReadInt32()
totalReadN += readN
msg.Timeout = timeout

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.TransactionName, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

return msg, totalReadN
}

func GlobalBeginResponseDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.GlobalBeginResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
totalReadN += 1
msg.ResultCode = protocol.ResultCode(resultCode)
if msg.ResultCode == protocol.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Msg, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}
}

exceptionCode, _ := r.ReadByte()
totalReadN += 1
msg.TransactionExceptionCode = model.TransactionExceptionCode(exceptionCode)

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Xid, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ExtraData = make([]byte, int(length16))
readN, _ := r.Read(msg.ExtraData)
totalReadN += readN
}

return msg, totalReadN
}

func GlobalCommitRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := AbstractGlobalEndRequestDecoder(in)
abstractGlobalEndRequest := req.(protocol.AbstractGlobalEndRequest)
msg := protocol.GlobalCommitRequest{AbstractGlobalEndRequest: abstractGlobalEndRequest}
return msg, totalReadN
}

func GlobalCommitResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractGlobalEndResponseDecoder(in)
abstractGlobalEndResponse := resp.(protocol.AbstractGlobalEndResponse)
msg := protocol.GlobalCommitResponse{AbstractGlobalEndResponse: abstractGlobalEndResponse}
return msg, totalReadN
}

func GlobalLockQueryRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := BranchRegisterRequestDecoder(in)
branchRegisterRequest := req.(protocol.BranchRegisterRequest)
msg := protocol.GlobalLockQueryRequest{BranchRegisterRequest: branchRegisterRequest}
return msg, totalReadN
}

func GlobalLockQueryResponseDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.GlobalLockQueryResponse{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
resultCode, _ := r.ReadByte()
totalReadN += 1
msg.ResultCode = protocol.ResultCode(resultCode)
if msg.ResultCode == protocol.ResultCodeFailed {
length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Msg, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}
}

exceptionCode, _ := r.ReadByte()
totalReadN += 1
msg.TransactionExceptionCode = model.TransactionExceptionCode(exceptionCode)

lockable, readN, _ := r.ReadUint16()
totalReadN += readN
if lockable == uint16(1) {
msg.Lockable = true
} else if lockable == uint16(0) {
msg.Lockable = false
}

return msg, totalReadN
}

func GlobalReportRequestDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.GlobalReportRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.Xid, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ExtraData = make([]byte, int(length16))
readN, _ := r.Read(msg.ExtraData)
totalReadN += readN
}

globalStatus, _ := r.ReadByte()
totalReadN += 1
msg.GlobalStatus = model.GlobalStatus(globalStatus)

return msg, totalReadN
}

func GlobalReportResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractGlobalEndResponseDecoder(in)
abstractGlobalEndResponse := resp.(protocol.AbstractGlobalEndResponse)
msg := protocol.GlobalReportResponse{AbstractGlobalEndResponse: abstractGlobalEndResponse}
return msg, totalReadN
}

func GlobalRollbackRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := AbstractGlobalEndRequestDecoder(in)
abstractGlobalEndRequest := req.(protocol.AbstractGlobalEndRequest)
msg := protocol.GlobalRollbackRequest{AbstractGlobalEndRequest: abstractGlobalEndRequest}
return msg, totalReadN
}

func GlobalRollbackResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractGlobalEndResponseDecoder(in)
abstractGlobalEndResponse := resp.(protocol.AbstractGlobalEndResponse)
msg := protocol.GlobalRollbackResponse{AbstractGlobalEndResponse: abstractGlobalEndResponse}
return msg, totalReadN
}

func GlobalStatusRequestDecoder(in []byte) (interface{}, int) {
req, totalReadN := AbstractGlobalEndRequestDecoder(in)
abstractGlobalEndRequest := req.(protocol.AbstractGlobalEndRequest)
msg := protocol.GlobalStatusRequest{AbstractGlobalEndRequest: abstractGlobalEndRequest}
return msg, totalReadN
}

func GlobalStatusResponseDecoder(in []byte) (interface{}, int) {
resp, totalReadN := AbstractGlobalEndResponseDecoder(in)
abstractGlobalEndResponse := resp.(protocol.AbstractGlobalEndResponse)
msg := protocol.GlobalStatusResponse{AbstractGlobalEndResponse: abstractGlobalEndResponse}
return msg, totalReadN
}

func UndoLogDeleteRequestDecoder(in []byte) (interface{}, int) {
var (
length16 uint16 = 0
readN = 0
totalReadN = 0
)
msg := protocol.UndoLogDeleteRequest{}

r := byteio.BigEndianReader{Reader: bytes.NewReader(in)}
branchType, _ := r.ReadByte()
totalReadN += 1
msg.BranchType = model.BranchType(branchType)

length16, readN, _ = r.ReadUint16()
totalReadN += readN
if length16 > 0 {
msg.ResourceId, readN, _ = r.ReadString(int(length16))
totalReadN += readN
}

msg.SaveDays, readN, _ = r.ReadInt16()
totalReadN += readN

return msg, totalReadN
}

+ 565
- 0
pkg/protocol/codec/seata_encoder.go View File

@@ -0,0 +1,565 @@
package codec

import (
"bytes"
)

import (
"vimagination.zapto.org/byteio"
)

import (
"github.com/seata/seata-go/pkg/protocol"
log "github.com/seata/seata-go/pkg/util/log"
)

func AbstractResultMessageEncoder(in interface{}) []byte {
var (
zero16 int16 = 0
b bytes.Buffer
)
w := byteio.BigEndianWriter{Writer: &b}

message := in.(protocol.AbstractResultMessage)

w.WriteByte(byte(message.ResultCode))
if message.ResultCode == protocol.ResultCodeFailed {
var msg string
if message.Msg != "" {
if len(message.Msg) > 128 {
msg = message.Msg[:128]
} else {
msg = message.Msg
}
// 暂时不考虑 message.Msg 包含中文的情况,这样字符串的长度就是 byte 数组的长度

w.WriteInt16(int16(len(msg)))
w.WriteString(msg)
} else {
w.WriteInt16(zero16)
}
}

return b.Bytes()
}

func MergedWarpMessageEncoder(in interface{}) []byte {
var (
b bytes.Buffer
result = make([]byte, 0)
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.MergedWarpMessage)
w.WriteInt16(int16(len(req.Msgs)))

for _, msg := range req.Msgs {
encoder := getMessageEncoder(msg.GetTypeCode())
if encoder != nil {
data := encoder(msg)
w.WriteInt16(msg.GetTypeCode())
w.Write(data)
}
}

size := uint32(b.Len())
result = append(result, []byte{byte(size >> 24), byte(size >> 16), byte(size >> 8), byte(size)}...)
result = append(result, b.Bytes()...)

if len(req.Msgs) > 20 {
log.Debugf("msg in one packet: %s ,buffer size: %s", len(req.Msgs), size)
}
return result
}

func MergeResultMessageEncoder(in interface{}) []byte {
var (
b bytes.Buffer
result = make([]byte, 0)
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.MergeResultMessage)
w.WriteInt16(int16(len(req.Msgs)))

for _, msg := range req.Msgs {
encoder := getMessageEncoder(msg.GetTypeCode())
if encoder != nil {
data := encoder(msg)
w.WriteInt16(msg.GetTypeCode())
w.Write(data)
}
}

size := uint32(b.Len())
result = append(result, []byte{byte(size >> 24), byte(size >> 16), byte(size >> 8), byte(size)}...)
result = append(result, b.Bytes()...)

if len(req.Msgs) > 20 {
log.Debugf("msg in one packet: %s ,buffer size: %s", len(req.Msgs), size)
}
return result
}

func AbstractIdentifyRequestEncoder(in interface{}) []byte {
var (
zero16 int16 = 0
b bytes.Buffer
)
w := byteio.BigEndianWriter{Writer: &b}

req := in.(protocol.AbstractIdentifyRequest)

if req.Version != "" {
w.WriteInt16(int16(len(req.Version)))
w.WriteString(req.Version)
} else {
w.WriteInt16(zero16)
}

if req.ApplicationId != "" {
w.WriteInt16(int16(len(req.ApplicationId)))
w.WriteString(req.ApplicationId)
} else {
w.WriteInt16(zero16)
}

if req.TransactionServiceGroup != "" {
w.WriteInt16(int16(len(req.TransactionServiceGroup)))
w.WriteString(req.TransactionServiceGroup)
} else {
w.WriteInt16(zero16)
}

if req.ExtraData != nil {
w.WriteUint16(uint16(len(req.ExtraData)))
w.Write(req.ExtraData)
} else {
w.WriteInt16(zero16)
}

return b.Bytes()
}

func AbstractIdentifyResponseEncoder(in interface{}) []byte {
resp := in.(protocol.AbstractIdentifyResponse)

var (
zero16 int16 = 0
b bytes.Buffer
)
w := byteio.BigEndianWriter{Writer: &b}

if resp.Identified {
w.WriteByte(byte(1))
} else {
w.WriteByte(byte(0))
}

if resp.Version != "" {
w.WriteInt16(int16(len(resp.Version)))
w.WriteString(resp.Version)
} else {
w.WriteInt16(zero16)
}

return b.Bytes()
}

func RegisterRMRequestEncoder(in interface{}) []byte {
req := in.(protocol.RegisterRMRequest)
data := AbstractIdentifyRequestEncoder(req.AbstractIdentifyRequest)

var (
zero32 int32 = 0
b bytes.Buffer
)
w := byteio.BigEndianWriter{Writer: &b}

if req.ResourceIds != "" {
w.WriteInt32(int32(len(req.ResourceIds)))
w.WriteString(req.ResourceIds)
} else {
w.WriteInt32(zero32)
}

result := append(data, b.Bytes()...)
return result
}

func RegisterRMResponseEncoder(in interface{}) []byte {
resp := in.(protocol.RegisterRMResponse)
return AbstractIdentifyResponseEncoder(resp.AbstractIdentifyResponse)
}

func RegisterTMRequestEncoder(in interface{}) []byte {
req := in.(protocol.RegisterTMRequest)
return AbstractIdentifyRequestEncoder(req.AbstractIdentifyRequest)
}

func RegisterTMResponseEncoder(in interface{}) []byte {
resp := in.(protocol.RegisterTMResponse)
return AbstractIdentifyResponseEncoder(resp.AbstractIdentifyResponse)
}

func AbstractTransactionResponseEncoder(in interface{}) []byte {
resp := in.(protocol.AbstractTransactionResponse)
data := AbstractResultMessageEncoder(resp.AbstractResultMessage)

result := append(data, byte(resp.TransactionExceptionCode))

return result
}

func AbstractBranchEndRequestEncoder(in interface{}) []byte {
var (
zero32 int32 = 0
zero16 int16 = 0
b bytes.Buffer
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.AbstractBranchEndRequest)

if req.Xid != "" {
w.WriteInt16(int16(len(req.Xid)))
w.WriteString(req.Xid)
} else {
w.WriteInt16(zero16)
}

w.WriteInt64(req.BranchId)
w.WriteByte(byte(req.BranchType))

if req.ResourceId != "" {
w.WriteInt16(int16(len(req.ResourceId)))
w.WriteString(req.ResourceId)
} else {
w.WriteInt16(zero16)
}

if req.ApplicationData != nil {
w.WriteUint32(uint32(len(req.ApplicationData)))
w.Write(req.ApplicationData)
} else {
w.WriteInt32(zero32)
}

return b.Bytes()
}

func AbstractBranchEndResponseEncoder(in interface{}) []byte {
resp, _ := in.(protocol.AbstractBranchEndResponse)
data := AbstractTransactionResponseEncoder(resp.AbstractTransactionResponse)

var (
zero16 int16 = 0
b bytes.Buffer
)
w := byteio.BigEndianWriter{Writer: &b}

if resp.Xid != "" {
w.WriteInt16(int16(len(resp.Xid)))
w.WriteString(resp.Xid)
} else {
w.WriteInt16(zero16)
}

w.WriteInt64(resp.BranchId)
w.WriteByte(byte(resp.BranchStatus))

result := append(data, b.Bytes()...)

return result
}

func AbstractGlobalEndRequestEncoder(in interface{}) []byte {
var (
zero16 int16 = 0
b bytes.Buffer
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.AbstractGlobalEndRequest)

if req.Xid != "" {
w.WriteInt16(int16(len(req.Xid)))
w.WriteString(req.Xid)
} else {
w.WriteInt16(zero16)
}
if req.ExtraData != nil {
w.WriteUint16(uint16(len(req.ExtraData)))
w.Write(req.ExtraData)
} else {
w.WriteInt16(zero16)
}

return b.Bytes()
}

func AbstractGlobalEndResponseEncoder(in interface{}) []byte {
resp := in.(protocol.AbstractGlobalEndResponse)
data := AbstractTransactionResponseEncoder(resp.AbstractTransactionResponse)

result := append(data, byte(resp.GlobalStatus))

return result
}

func BranchCommitRequestEncoder(in interface{}) []byte {
req := in.(protocol.BranchCommitRequest)
return AbstractBranchEndRequestEncoder(req.AbstractBranchEndRequest)
}

func BranchCommitResponseEncoder(in interface{}) []byte {
resp := in.(protocol.BranchCommitResponse)
return AbstractBranchEndResponseEncoder(resp.AbstractBranchEndResponse)
}

func BranchRegisterRequestEncoder(in interface{}) []byte {
var (
zero32 int32 = 0
zero16 int16 = 0
b bytes.Buffer
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.BranchRegisterRequest)

if req.Xid != "" {
w.WriteInt16(int16(len(req.Xid)))
w.WriteString(req.Xid)
} else {
w.WriteInt16(zero16)
}

w.WriteByte(byte(req.BranchType))

if req.ResourceId != "" {
w.WriteInt16(int16(len(req.ResourceId)))
w.WriteString(req.ResourceId)
} else {
w.WriteInt16(zero16)
}

if req.LockKey != "" {
w.WriteInt32(int32(len(req.LockKey)))
w.WriteString(req.LockKey)
} else {
w.WriteInt32(zero32)
}

if req.ApplicationData != nil {
w.WriteUint32(uint32(len(req.ApplicationData)))
w.Write(req.ApplicationData)
} else {
w.WriteInt32(zero32)
}

return b.Bytes()
}

func BranchRegisterResponseEncoder(in interface{}) []byte {
resp := in.(protocol.BranchRegisterResponse)
data := AbstractTransactionResponseEncoder(resp.AbstractTransactionResponse)

c := uint64(resp.BranchId)
branchIdBytes := []byte{
byte(c >> 56),
byte(c >> 48),
byte(c >> 40),
byte(c >> 32),
byte(c >> 24),
byte(c >> 16),
byte(c >> 8),
byte(c),
}
result := append(data, branchIdBytes...)

return result
}

func BranchReportRequestEncoder(in interface{}) []byte {
var (
zero32 int32 = 0
zero16 int16 = 0
b bytes.Buffer
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.BranchReportRequest)

if req.Xid != "" {
w.WriteInt16(int16(len(req.Xid)))
w.WriteString(req.Xid)
} else {
w.WriteInt16(zero16)
}

w.WriteInt64(req.BranchId)
w.WriteByte(byte(req.Status))

if req.ResourceId != "" {
w.WriteInt16(int16(len(req.ResourceId)))
w.WriteString(req.ResourceId)
} else {
w.WriteInt16(zero16)
}

if req.ApplicationData != nil {
w.WriteUint32(uint32(len(req.ApplicationData)))
w.Write(req.ApplicationData)
} else {
w.WriteInt32(zero32)
}

w.WriteByte(byte(req.BranchType))

return b.Bytes()
}

func BranchReportResponseEncoder(in interface{}) []byte {
resp := in.(protocol.BranchReportResponse)
return AbstractTransactionResponseEncoder(resp.AbstractTransactionResponse)
}

func BranchRollbackRequestEncoder(in interface{}) []byte {
req := in.(protocol.BranchRollbackRequest)
return AbstractBranchEndRequestEncoder(req.AbstractBranchEndRequest)
}

func BranchRollbackResponseEncoder(in interface{}) []byte {
resp := in.(protocol.BranchRollbackResponse)
return AbstractBranchEndResponseEncoder(resp.AbstractBranchEndResponse)
}

func GlobalBeginRequestEncoder(in interface{}) []byte {
var (
zero16 int16 = 0
b bytes.Buffer
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.GlobalBeginRequest)

w.WriteInt32(req.Timeout)
if req.TransactionName != "" {
w.WriteInt16(int16(len(req.TransactionName)))
w.WriteString(req.TransactionName)
} else {
w.WriteInt16(zero16)
}

return b.Bytes()
}

func GlobalBeginResponseEncoder(in interface{}) []byte {
resp := in.(protocol.GlobalBeginResponse)
data := AbstractTransactionResponseEncoder(resp.AbstractTransactionResponse)

var (
zero16 int16 = 0
b bytes.Buffer
)
w := byteio.BigEndianWriter{Writer: &b}

if resp.Xid != "" {
w.WriteInt16(int16(len(resp.Xid)))
w.WriteString(resp.Xid)
} else {
w.WriteInt16(zero16)
}
if resp.ExtraData != nil {
w.WriteUint16(uint16(len(resp.ExtraData)))
w.Write(resp.ExtraData)
} else {
w.WriteInt16(zero16)
}

result := append(data, b.Bytes()...)

return result
}

func GlobalCommitRequestEncoder(in interface{}) []byte {
req := in.(protocol.GlobalCommitRequest)
return AbstractGlobalEndRequestEncoder(req.AbstractGlobalEndRequest)
}

func GlobalCommitResponseEncoder(in interface{}) []byte {
resp := in.(protocol.GlobalCommitResponse)
return AbstractGlobalEndResponseEncoder(resp.AbstractGlobalEndResponse)
}

func GlobalLockQueryRequestEncoder(in interface{}) []byte {
return BranchRegisterRequestEncoder(in)
}

func GlobalLockQueryResponseEncoder(in interface{}) []byte {
resp, _ := in.(protocol.GlobalLockQueryResponse)
data := AbstractTransactionResponseEncoder(resp.AbstractTransactionResponse)

var result []byte
if resp.Lockable {
result = append(data, byte(0), byte(1))
} else {
result = append(data, byte(0), byte(0))
}

return result
}

func GlobalReportRequestEncoder(in interface{}) []byte {
req, _ := in.(protocol.GlobalReportRequest)
data := AbstractGlobalEndRequestEncoder(req.AbstractGlobalEndRequest)

result := append(data, byte(req.GlobalStatus))
return result
}

func GlobalReportResponseEncoder(in interface{}) []byte {
resp := in.(protocol.GlobalReportResponse)
return AbstractGlobalEndResponseEncoder(resp.AbstractGlobalEndResponse)
}

func GlobalRollbackRequestEncoder(in interface{}) []byte {
req := in.(protocol.GlobalRollbackRequest)
return AbstractGlobalEndRequestEncoder(req.AbstractGlobalEndRequest)
}

func GlobalRollbackResponseEncoder(in interface{}) []byte {
resp := in.(protocol.GlobalRollbackResponse)
return AbstractGlobalEndResponseEncoder(resp.AbstractGlobalEndResponse)
}

func GlobalStatusRequestEncoder(in interface{}) []byte {
req := in.(protocol.GlobalStatusRequest)
return AbstractGlobalEndRequestEncoder(req.AbstractGlobalEndRequest)
}

func GlobalStatusResponseEncoder(in interface{}) []byte {
resp := in.(protocol.GlobalStatusResponse)
return AbstractGlobalEndResponseEncoder(resp.AbstractGlobalEndResponse)
}

func UndoLogDeleteRequestEncoder(in interface{}) []byte {
var (
zero16 int16 = 0
b bytes.Buffer
)
w := byteio.BigEndianWriter{Writer: &b}

req, _ := in.(protocol.UndoLogDeleteRequest)

w.WriteByte(byte(req.BranchType))
if req.ResourceId != "" {
w.WriteInt16(int16(len(req.ResourceId)))
w.WriteString(req.ResourceId)
} else {
w.WriteInt16(zero16)
}
w.WriteInt16(req.SaveDays)

return b.Bytes()
}

+ 28
- 0
pkg/protocol/constant.go View File

@@ -0,0 +1,28 @@
package protocol

var MAGIC_CODE_BYTES = [2]byte{0xda, 0xda}

const (
VERSION = 1

// MaxFrameLength max frame length
MaxFrameLength = 8 * 1024 * 1024

// V1HeadLength v1 head length
V1HeadLength = 16

// MSGTypeRequest request message type
MSGTypeRequest = 0

// MSGTypeResponse response message type
MSGTypeResponse = 1

// MSGTypeRequestOneway request one way
MSGTypeRequestOneway = 2

// MSGTypeHeartbeatRequest heart beat request
MSGTypeHeartbeatRequest = 3

// MSGTypeHeartbeatResponse heart beat response
MSGTypeHeartbeatResponse = 4
)

+ 16
- 0
pkg/protocol/heart_beat_message.go View File

@@ -0,0 +1,16 @@
package protocol

type HeartBeatMessage struct {
Ping bool
}

var HeartBeatMessagePing = HeartBeatMessage{true}
var HeartBeatMessagePong = HeartBeatMessage{false}

func (msg HeartBeatMessage) ToString() string {
if msg.Ping {
return "services ping"
} else {
return "services pong"
}
}

+ 26
- 0
pkg/protocol/identify.go View File

@@ -0,0 +1,26 @@
package protocol

type AbstractResultMessage struct {
ResultCode ResultCode
Msg string
}

type AbstractIdentifyRequest struct {
Version string

ApplicationId string `json:"applicationId"`

TransactionServiceGroup string

ExtraData []byte
}

type AbstractIdentifyResponse struct {
AbstractResultMessage

Version string

ExtraData []byte

Identified bool
}

+ 18
- 0
pkg/protocol/merged_message.go View File

@@ -0,0 +1,18 @@
package protocol

type MergedWarpMessage struct {
Msgs []MessageTypeAware
MsgIds []int32
}

func (req MergedWarpMessage) GetTypeCode() int16 {
return TypeSeataMerge
}

type MergeResultMessage struct {
Msgs []MessageTypeAware
}

func (resp MergeResultMessage) GetTypeCode() int16 {
return TypeSeataMergeResult
}

+ 17
- 0
pkg/protocol/message_future.go View File

@@ -0,0 +1,17 @@
package protocol

// MessageFuture ...
type MessageFuture struct {
ID int32
Err error
Response interface{}
Done chan bool
}

// NewMessageFuture ...
func NewMessageFuture(message RpcMessage) *MessageFuture {
return &MessageFuture{
ID: message.ID,
Done: make(chan bool),
}
}

+ 119
- 0
pkg/protocol/message_type.go View File

@@ -0,0 +1,119 @@
package protocol

const (
/**
* The constant TYPE_GLOBAL_BEGIN.
*/
TypeGlobalBegin = 1
/**
* The constant TYPE_GLOBAL_BEGIN_RESULT.
*/
TypeGlobalBeginResult = 2
/**
* The constant TYPE_GLOBAL_COMMIT.
*/
TypeGlobalCommit = 7
/**
* The constant TYPE_GLOBAL_COMMIT_RESULT.
*/
TypeGlobalCommitResult = 8
/**
* The constant TYPE_GLOBAL_ROLLBACK.
*/
TypeGlobalRollback = 9
/**
* The constant TYPE_GLOBAL_ROLLBACK_RESULT.
*/
TypeGlobalRollbackResult = 10
/**
* The constant TYPE_GLOBAL_STATUS.
*/
TypeGlobalStatus = 15
/**
* The constant TYPE_GLOBAL_STATUS_RESULT.
*/
TypeGlobalStatusResult = 16
/**
* The constant TYPE_GLOBAL_REPORT.
*/
TypeGlobalReport = 17
/**
* The constant TYPE_GLOBAL_REPORT_RESULT.
*/
TypeGlobalReportResult = 18
/**
* The constant TYPE_GLOBAL_LOCK_QUERY.
*/
TypeGlobalLockQuery = 21
/**
* The constant TYPE_GLOBAL_LOCK_QUERY_RESULT.
*/
TypeGlobalLockQueryResult = 22

/**
* The constant TYPE_BRANCH_COMMIT.
*/
TypeBranchCommit = 3
/**
* The constant TYPE_BRANCH_COMMIT_RESULT.
*/
TypeBranchCommitResult = 4
/**
* The constant TYPE_BRANCH_ROLLBACK.
*/
TypeBranchRollback = 5
/**
* The constant TYPE_BRANCH_ROLLBACK_RESULT.
*/
TypeBranchRollbackResult = 6
/**
* The constant TYPE_BRANCH_REGISTER.
*/
TypeBranchRegister = 11
/**
* The constant TYPE_BRANCH_REGISTER_RESULT.
*/
TypeBranchRegisterResult = 12
/**
* The constant TYPE_BRANCH_STATUS_REPORT.
*/
TypeBranchStatusReport = 13
/**
* The constant TYPE_BRANCH_STATUS_REPORT_RESULT.
*/
TypeBranchStatusReportResult = 14

/**
* The constant TYPE_SEATA_MERGE.
*/
TypeSeataMerge = 59
/**
* The constant TYPE_SEATA_MERGE_RESULT.
*/
TypeSeataMergeResult = 60

/**
* The constant TYPE_REG_CLT.
*/
TypeRegClt = 101
/**
* The constant TYPE_REG_CLT_RESULT.
*/
TypeRegCltResult = 102
/**
* The constant TYPE_REG_RM.
*/
TypeRegRm = 103
/**
* The constant TYPE_REG_RM_RESULT.
*/
TypeRegRmResult = 104
/**
* The constant TYPE_RM_DELETE_UNDOLOG.
*/
TypeRmDeleteUndolog = 111
/**
* the constant TYPE_HEARTBEAT_MSG
*/
TypeHeartbeatMsg = 120
)

+ 5
- 0
pkg/protocol/message_type_aware.go View File

@@ -0,0 +1,5 @@
package protocol

type MessageTypeAware interface {
GetTypeCode() int16
}

+ 18
- 0
pkg/protocol/result_code.go View File

@@ -0,0 +1,18 @@
package protocol

type ResultCode byte

const (

/**
* ResultCodeFailed result code.
*/
// ResultCodeFailed
ResultCodeFailed ResultCode = iota

/**
* Success result code.
*/
// Success
ResultCodeSuccess
)

+ 18
- 0
pkg/protocol/rm.go View File

@@ -0,0 +1,18 @@
package protocol

type RegisterRMRequest struct {
AbstractIdentifyRequest
ResourceIds string
}

func (req RegisterRMRequest) GetTypeCode() int16 {
return TypeRegRm
}

type RegisterRMResponse struct {
AbstractIdentifyResponse
}

func (resp RegisterRMResponse) GetTypeCode() int16 {
return TypeRegRmResult
}

+ 10
- 0
pkg/protocol/rpc_message.go View File

@@ -0,0 +1,10 @@
package protocol

type RpcMessage struct {
ID int32
MessageType byte
Codec byte
Compressor byte
HeadMap map[string]string
Body interface{}
}

+ 17
- 0
pkg/protocol/tm.go View File

@@ -0,0 +1,17 @@
package protocol

type RegisterTMRequest struct {
AbstractIdentifyRequest
}

func (req RegisterTMRequest) GetTypeCode() int16 {
return TypeRegClt
}

type RegisterTMResponse struct {
AbstractIdentifyResponse
}

func (resp RegisterTMResponse) GetTypeCode() int16 {
return TypeRegCltResult
}

+ 226
- 0
pkg/protocol/transaction.go View File

@@ -0,0 +1,226 @@
package protocol

import (
"github.com/seata/seata-go/pkg/model"
)

type AbstractTransactionResponse struct {
AbstractResultMessage
TransactionExceptionCode model.TransactionExceptionCode
}

type AbstractBranchEndRequest struct {
Xid string
BranchId int64
BranchType model.BranchType
ResourceId string
ApplicationData []byte
}

type AbstractBranchEndResponse struct {
AbstractTransactionResponse

Xid string
BranchId int64
BranchStatus model.BranchStatus
}

type AbstractGlobalEndRequest struct {
Xid string
ExtraData []byte
}

type AbstractGlobalEndResponse struct {
AbstractTransactionResponse

GlobalStatus model.GlobalStatus
}

type BranchRegisterRequest struct {
Xid string
BranchType model.BranchType
ResourceId string
LockKey string
ApplicationData []byte
}

func (req BranchRegisterRequest) GetTypeCode() int16 {
return TypeBranchRegister
}

type BranchRegisterResponse struct {
AbstractTransactionResponse

BranchId int64
}

func (resp BranchRegisterResponse) GetTypeCode() int16 {
return TypeBranchRegisterResult
}

type BranchReportRequest struct {
Xid string
BranchId int64
ResourceId string
Status model.BranchStatus
ApplicationData []byte
BranchType model.BranchType
}

func (req BranchReportRequest) GetTypeCode() int16 {
return TypeBranchStatusReport
}

type BranchReportResponse struct {
AbstractTransactionResponse
}

func (resp BranchReportResponse) GetTypeCode() int16 {
return TypeBranchStatusReportResult
}

type BranchCommitRequest struct {
AbstractBranchEndRequest
}

func (req BranchCommitRequest) GetTypeCode() int16 {
return TypeBranchCommit
}

type BranchCommitResponse struct {
AbstractBranchEndResponse
}

func (resp BranchCommitResponse) GetTypeCode() int16 {
return TypeBranchCommitResult
}

type BranchRollbackRequest struct {
AbstractBranchEndRequest
}

func (req BranchRollbackRequest) GetTypeCode() int16 {
return TypeBranchRollback
}

type BranchRollbackResponse struct {
AbstractBranchEndResponse
}

func (resp BranchRollbackResponse) GetTypeCode() int16 {
return TypeGlobalRollbackResult
}

type GlobalBeginRequest struct {
Timeout int32
TransactionName string
}

func (req GlobalBeginRequest) GetTypeCode() int16 {
return TypeGlobalBegin
}

type GlobalBeginResponse struct {
AbstractTransactionResponse

Xid string
ExtraData []byte
}

func (resp GlobalBeginResponse) GetTypeCode() int16 {
return TypeGlobalBeginResult
}

type GlobalStatusRequest struct {
AbstractGlobalEndRequest
}

func (req GlobalStatusRequest) GetTypeCode() int16 {
return TypeGlobalStatus
}

type GlobalStatusResponse struct {
AbstractGlobalEndResponse
}

func (resp GlobalStatusResponse) GetTypeCode() int16 {
return TypeGlobalStatusResult
}

type GlobalLockQueryRequest struct {
BranchRegisterRequest
}

func (req GlobalLockQueryRequest) GetTypeCode() int16 {
return TypeGlobalLockQuery
}

type GlobalLockQueryResponse struct {
AbstractTransactionResponse

Lockable bool
}

func (resp GlobalLockQueryResponse) GetTypeCode() int16 {
return TypeGlobalLockQueryResult
}

type GlobalReportRequest struct {
AbstractGlobalEndRequest

GlobalStatus model.GlobalStatus
}

func (req GlobalReportRequest) GetTypeCode() int16 {
return TypeGlobalStatus
}

type GlobalReportResponse struct {
AbstractGlobalEndResponse
}

func (resp GlobalReportResponse) GetTypeCode() int16 {
return TypeGlobalStatusResult
}

type GlobalCommitRequest struct {
AbstractGlobalEndRequest
}

func (req GlobalCommitRequest) GetTypeCode() int16 {
return TypeGlobalCommit
}

type GlobalCommitResponse struct {
AbstractGlobalEndResponse
}

func (resp GlobalCommitResponse) GetTypeCode() int16 {
return TypeGlobalCommitResult
}

type GlobalRollbackRequest struct {
AbstractGlobalEndRequest
}

func (req GlobalRollbackRequest) GetTypeCode() int16 {
return TypeGlobalRollback
}

type GlobalRollbackResponse struct {
AbstractGlobalEndResponse
}

func (resp GlobalRollbackResponse) GetTypeCode() int16 {
return TypeGlobalRollbackResult
}

type UndoLogDeleteRequest struct {
ResourceId string
SaveDays int16
BranchType model.BranchType
}

func (req UndoLogDeleteRequest) GetTypeCode() int16 {
return TypeRmDeleteUndolog
}

+ 8
- 0
pkg/rm/api/business_action_context.go View File

@@ -0,0 +1,8 @@
package api

type BusinessActionContext struct {
Xid string
BranchId string
ActionName string
ActionContext map[string]interface{}
}

+ 94
- 0
pkg/rm/default_resource_manager.go View File

@@ -0,0 +1,94 @@
package rm

import (
"fmt"
"sync"
)

import (
"github.com/seata/seata-go/pkg/model"
)

var (
// BranchType -> ResourceManager
resourceManagerMap sync.Map
// singletone defaultResourceManager
defaultRM *defaultResourceManager
)

func init() {
defaultRM = &defaultResourceManager{}
}

type defaultResourceManager struct {
}

// 将事务管理器注册到这里
func RegisterResource(resourceManager model.ResourceManager) {
resourceManagerMap.Store(resourceManager.GetBranchType(), resourceManager)
}

func GetDefaultResourceManager() model.ResourceManager {
return defaultRM
}

func (*defaultResourceManager) getResourceManager(branchType model.BranchType) model.ResourceManager {
rm, ok := resourceManagerMap.Load(branchType)
if !ok {
panic(fmt.Sprintf("No ResourceManager for BranchType: %v", branchType))
}
return rm.(model.ResourceManager)
}

// Commit a branch transaction
func (d *defaultResourceManager) BranchCommit(branchType model.BranchType, xid, branchId int64, resourceId, applicationData string) (model.BranchStatus, error) {
return d.getResourceManager(branchType).BranchCommit(branchType, xid, branchId, resourceId, applicationData)
}

// Rollback a branch transaction
func (d *defaultResourceManager) BranchRollback(branchType model.BranchType, xid string, branchId int64, resourceId, applicationData string) (model.BranchStatus, error) {
return d.getResourceManager(branchType).BranchRollback(branchType, xid, branchId, resourceId, applicationData)
}

// Branch register long
func (d *defaultResourceManager) BranchRegister(branchType model.BranchType, resourceId, clientId, xid, applicationData, lockKeys string) (int64, error) {
return d.getResourceManager(branchType).BranchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys)
}

// Branch report
func (d *defaultResourceManager) BranchReport(branchType model.BranchType, xid string, branchId int64, status model.BranchStatus, applicationData string) error {
return d.getResourceManager(branchType).BranchReport(branchType, xid, branchId, status, applicationData)
}

// Lock query boolean
func (d *defaultResourceManager) LockQuery(branchType model.BranchType, resourceId, xid, lockKeys string) (bool, error) {
return d.getResourceManager(branchType).LockQuery(branchType, resourceId, xid, lockKeys)
}

// Register a model.Resource to be managed by model.Resource Manager
func (d *defaultResourceManager) RegisterResource(resource model.Resource) error {
return d.getResourceManager(resource.GetBranchType()).RegisterResource(resource)
}

// Unregister a model.Resource from the model.Resource Manager
func (d *defaultResourceManager) UnregisterResource(resource model.Resource) error {
return d.getResourceManager(resource.GetBranchType()).UnregisterResource(resource)
}

// Get all resources managed by this manager
func (d *defaultResourceManager) GetManagedResources() map[string]model.Resource {
var allResource map[string]model.Resource
resourceManagerMap.Range(func(branchType, resourceManager interface{}) bool {
rs := resourceManager.(model.ResourceManager).GetManagedResources()
for k, v := range rs {
rs[k] = v
}
return true
})
return allResource
}

// Get the model.BranchType
func (d *defaultResourceManager) GetBranchType() model.BranchType {
panic("DefaultResourceManager isn't a real ResourceManager")
}

+ 33
- 0
pkg/rm/tcc/tcc_resource.go View File

@@ -0,0 +1,33 @@
package tcc

import (
"reflect"
)

import (
"github.com/seata/seata-go/pkg/model"
)

type TCCResource struct {
ResourceGroupId string `default:"DEFAULT"`
AppName string
ActionName string
TargetBean interface{}
PrepareMethod reflect.Method
CommitMethodName string
CommitMethod reflect.Method
RollbackMethodName string
RollbackMethod reflect.Method
}

func (t *TCCResource) GetResourceGroupId() string {
return t.ResourceGroupId
}

func (t *TCCResource) GetResourceId() string {
return t.ActionName
}

func (t *TCCResource) GetBranchType() model.BranchType {
return model.TCC
}

+ 15
- 0
pkg/rm/tcc/tcc_service.go View File

@@ -0,0 +1,15 @@
package tcc

import (
"context"
)

import (
"github.com/seata/seata-go/pkg/rm/api"
)

type TCCService interface {
Prepare(ctx context.Context, param interface{}) error
Commit(ctx context.Context, businessActionContext api.BusinessActionContext) error
Rollback(ctx context.Context, businessActionContext api.BusinessActionContext) error
}

+ 1
- 0
pkg/rm/tcc/tcc_service_test.go View File

@@ -0,0 +1 @@
package tcc

+ 279
- 0
pkg/rpc11/readwriter.go View File

@@ -0,0 +1,279 @@
package rpc11

import (
"bytes"
"encoding/binary"
)

import (
getty "github.com/apache/dubbo-getty"

"github.com/pkg/errors"

"vimagination.zapto.org/byteio"
)

import (
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/protocol/codec"
)

/**
* <pre>
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
* | magic |Proto| Full length | Head | Msg |Seria|Compr| RequestID |
* | code |colVer| (head+body) | Length |Type |lizer|ess | |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
* | |
* | Head Map [Optional] |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
* | |
* | body |
* | |
* | ... ... |
* +-----------------------------------------------------------------------------------------------+
* </pre>
* <p>
* <li>Full Length: include all data </li>
* <li>Head Length: include head data from magic code to head map. </li>
* <li>Body Length: Full Length - Head Length</li>
* </p>
* https://github.com/seata/seata/issues/893
*/
const (
SeataV1PackageHeaderReservedLength = 16
)

var (
// RpcPkgHandler
rpcPkgHandler = &RpcPackageHandler{}
)

var (
ErrNotEnoughStream = errors.New("packet stream is not enough")
ErrTooLargePackage = errors.New("package length is exceed the getty package's legal maximum length.")
ErrInvalidPackage = errors.New("invalid rpc package")
ErrIllegalMagic = errors.New("package magic is not right.")
)

type RpcPackageHandler struct{}

type SeataV1PackageHeader struct {
Magic0 byte
Magic1 byte
Version byte
TotalLength uint32
HeadLength uint16
MessageType byte
CodecType byte
CompressType byte
ID uint32
Meta map[string]string
BodyLength uint32
}

func (h *SeataV1PackageHeader) Unmarshal(buf *bytes.Buffer) (int, error) {
bufLen := buf.Len()
if bufLen < SeataV1PackageHeaderReservedLength {
return 0, ErrNotEnoughStream
}

// magic
if err := binary.Read(buf, binary.BigEndian, &(h.Magic0)); err != nil {
return 0, err
}
if err := binary.Read(buf, binary.BigEndian, &(h.Magic1)); err != nil {
return 0, err
}
if h.Magic0 != protocol.MAGIC_CODE_BYTES[0] || h.Magic1 != protocol.MAGIC_CODE_BYTES[1] {
return 0, ErrIllegalMagic
}
// version
if err := binary.Read(buf, binary.BigEndian, &(h.Version)); err != nil {
return 0, err
}
// TODO check version compatible here

// total length
if err := binary.Read(buf, binary.BigEndian, &(h.TotalLength)); err != nil {
return 0, err
}
// head length
if err := binary.Read(buf, binary.BigEndian, &(h.HeadLength)); err != nil {
return 0, err
}
// message type
if err := binary.Read(buf, binary.BigEndian, &(h.MessageType)); err != nil {
return 0, err
}
// codec type
if err := binary.Read(buf, binary.BigEndian, &(h.CodecType)); err != nil {
return 0, err
}
// compress type
if err := binary.Read(buf, binary.BigEndian, &(h.CompressType)); err != nil {
return 0, err
}
// id
if err := binary.Read(buf, binary.BigEndian, &(h.ID)); err != nil {
return 0, err
}
// todo meta map
if h.HeadLength > SeataV1PackageHeaderReservedLength {
headMapLength := h.HeadLength - SeataV1PackageHeaderReservedLength
h.Meta = headMapDecode(buf.Bytes()[:headMapLength])
}
h.BodyLength = h.TotalLength - uint32(h.HeadLength)

return int(h.TotalLength), nil
}

// Read read binary data from to rpc message
func (p *RpcPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
var header SeataV1PackageHeader

buf := bytes.NewBuffer(data)
_, err := header.Unmarshal(buf)
if err != nil {
if err == ErrNotEnoughStream {
// getty case2
return nil, 0, nil
}
// getty case1
return nil, 0, err
}
if uint32(len(data)) < header.TotalLength {
// get case3
return nil, int(header.TotalLength), nil
}

//r := byteio.BigEndianReader{Reader: bytes.NewReader(data)}
rpcMessage := protocol.RpcMessage{
Codec: header.CodecType,
ID: int32(header.ID),
Compressor: header.CompressType,
MessageType: header.MessageType,
HeadMap: header.Meta,
}

if header.MessageType == protocol.MSGTypeHeartbeatRequest {
rpcMessage.Body = protocol.HeartBeatMessagePing
} else if header.MessageType == protocol.MSGTypeHeartbeatResponse {
rpcMessage.Body = protocol.HeartBeatMessagePong
} else {
if header.BodyLength > 0 {
//todo compress
msg, _ := codec.MessageDecoder(header.CodecType, data[header.HeadLength:])
rpcMessage.Body = msg
}
}

return rpcMessage, int(header.TotalLength), nil
}

// Write write rpc message to binary data
func (p *RpcPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
msg, ok := pkg.(protocol.RpcMessage)
if !ok {
return nil, ErrInvalidPackage
}

fullLength := protocol.V1HeadLength
headLength := protocol.V1HeadLength
var result = make([]byte, 0, fullLength)

var b bytes.Buffer
w := byteio.BigEndianWriter{Writer: &b}

result = append(result, protocol.MAGIC_CODE_BYTES[:2]...)
result = append(result, protocol.VERSION)

w.WriteByte(msg.MessageType)
w.WriteByte(msg.Codec)
w.WriteByte(msg.Compressor)
w.WriteInt32(msg.ID)

if msg.HeadMap != nil && len(msg.HeadMap) > 0 {
headMapBytes, headMapLength := headMapEncode(msg.HeadMap)
headLength += headMapLength
fullLength += headMapLength
w.Write(headMapBytes)
}

if msg.MessageType != protocol.MSGTypeHeartbeatRequest &&
msg.MessageType != protocol.MSGTypeHeartbeatResponse {

bodyBytes := codec.MessageEncoder(msg.Codec, msg.Body)
fullLength += len(bodyBytes)
w.Write(bodyBytes)
}

fullLen := int32(fullLength)
headLen := int16(headLength)
result = append(result, []byte{byte(fullLen >> 24), byte(fullLen >> 16), byte(fullLen >> 8), byte(fullLen)}...)
result = append(result, []byte{byte(headLen >> 8), byte(headLen)}...)
result = append(result, b.Bytes()...)

return result, nil
}

func headMapDecode(data []byte) map[string]string {
size := len(data)
if size == 0 {
return nil
}

mp := make(map[string]string)
r := byteio.BigEndianReader{Reader: bytes.NewReader(data)}

readLength := 0
for readLength < size {
var key, value string
lengthK, _, _ := r.ReadUint16()
if lengthK < 0 {
break
} else if lengthK == 0 {
key = ""
} else {
key, _, _ = r.ReadString(int(lengthK))
}

lengthV, _, _ := r.ReadUint16()
if lengthV < 0 {
break
} else if lengthV == 0 {
value = ""
} else {
value, _, _ = r.ReadString(int(lengthV))
}

mp[key] = value
readLength += int(lengthK + lengthV)
}

return mp
}

func headMapEncode(data map[string]string) ([]byte, int) {
var b bytes.Buffer

w := byteio.BigEndianWriter{Writer: &b}
for k, v := range data {
if k == "" {
w.WriteUint16(0)
} else {
w.WriteUint16(uint16(len(k)))
w.WriteString(k)
}

if v == "" {
w.WriteUint16(0)
} else {
w.WriteUint16(uint16(len(v)))
w.WriteString(v)
}
}

return b.Bytes(), b.Len()
}

+ 159
- 0
pkg/rpc11/rpc_client.go View File

@@ -0,0 +1,159 @@
package rpc11

import (
"fmt"
"net"
"sync"
"sync/atomic"
)

import (
getty "github.com/apache/dubbo-getty"

gxsync "github.com/dubbogo/gost/sync"
)

import (
"github.com/seata/seata-go/pkg/config"
log "github.com/seata/seata-go/pkg/util/log"
)

var (
rpcClient *RpcClient
MAX_CHECK_ALIVE_RETRY = 600
CHECK_ALIVE_INTERNAL = 100
)

func init() {
rpcClient = newRpcClient()
}

type RpcClient struct {
lock sync.RWMutex
conf *config.ClientConfig
gettyClients []getty.Client
listener getty.EventListener
// serverAddress -> rpc_client.Session -> bool
serverSessionsMap sync.Map
sessionSize int32
}

func newRpcClient() *RpcClient {
rpcClient := &RpcClient{
conf: config.GetClientConfig(),
gettyClients: make([]getty.Client, 0),
listener: NewClientEventHandler(),
}
rpcClient.init()
return rpcClient
}

func (r *RpcClient) init() {
//todo 暂时写死地址,待改为配置
//addressList := []string{"127.0.0.1:8091"}
addressList := []string{"127.0.0.1:8090"}
if len(addressList) == 0 {
log.Warn("no have valid seata server list")
}
for _, address := range addressList {
gettyClient := getty.NewTCPClient(
getty.WithServerAddress(address),
getty.WithConnectionNumber((int)(r.conf.GettyConfig.ConnectionNum)),
getty.WithReconnectInterval(r.conf.GettyConfig.ReconnectInterval),
getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(2)),
)
gettyClient.RunEventLoop(r.newSession)
r.gettyClients = append(r.gettyClients, gettyClient)
}
}

func (r *RpcClient) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
)

if r.conf.GettyConfig.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}

if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}

tcpConn.SetNoDelay(r.conf.GettyConfig.GettySessionParam.TCPNoDelay)
tcpConn.SetKeepAlive(r.conf.GettyConfig.GettySessionParam.TCPKeepAlive)
if r.conf.GettyConfig.GettySessionParam.TCPKeepAlive {
tcpConn.SetKeepAlivePeriod(r.conf.GettyConfig.GettySessionParam.KeepAlivePeriod)
}
tcpConn.SetReadBuffer(r.conf.GettyConfig.GettySessionParam.TCPRBufSize)
tcpConn.SetWriteBuffer(r.conf.GettyConfig.GettySessionParam.TCPWBufSize)

session.SetName(r.conf.GettyConfig.GettySessionParam.SessionName)
session.SetMaxMsgLen(r.conf.GettyConfig.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(rpcPkgHandler)
session.SetEventListener(r.listener)
session.SetReadTimeout(r.conf.GettyConfig.GettySessionParam.TCPReadTimeout)
session.SetWriteTimeout(r.conf.GettyConfig.GettySessionParam.TCPWriteTimeout)
session.SetCronPeriod((int)(r.conf.GettyConfig.HeartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(r.conf.GettyConfig.GettySessionParam.WaitTimeout)
log.Debugf("rpc_client new session:%s", session.Stat())

return nil
}

func (r *RpcClient) AcquireGettySession() getty.Session {
// map 遍历是随机的
var session getty.Session
r.serverSessionsMap.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
r.ReleaseGettySession(session)
} else {
return false
}
return true
})
if session != nil {
return session
}
//if sessionSize == 0 {
// ticker := time.NewTicker(time.Duration(CHECK_ALIVE_INTERNAL) * time.Millisecond)
// defer ticker.Stop()
// for i := 0; i < MAX_CHECK_ALIVE_RETRY; i++ {
// <-ticker.C
// allSessions.Range(func(key, value interface{}) bool {
// session = key.(getty.Session)
// if session.IsClosed() {
// sessionManager.ReleaseGettySession(session)
// } else {
// return false
// }
// return true
// })
// if session != nil {
// return session
// }
// }
//}
return nil
}

func (r *RpcClient) RegisterGettySession(session getty.Session) {
//r.serverSessionsMap.Store(session, true)
m, _ := r.serverSessionsMap.LoadOrStore(session.RemoteAddr(), &sync.Map{})
sMap := m.(*sync.Map)
sMap.Store(session, true)
atomic.AddInt32(&r.sessionSize, 1)
}

func (r *RpcClient) ReleaseGettySession(session getty.Session) {
r.serverSessionsMap.Delete(session)
if !session.IsClosed() {
m, _ := r.serverSessionsMap.LoadOrStore(session.RemoteAddr(), &sync.Map{})
sMap := m.(*sync.Map)
sMap.Delete(session)
session.Close()
}
atomic.AddInt32(&r.sessionSize, -1)
}

+ 93
- 0
pkg/rpc11/rpc_remoting_client.go View File

@@ -0,0 +1,93 @@
package rpc11

import (
"time"
)

import (
getty "github.com/apache/dubbo-getty"

"github.com/pkg/errors"

"go.uber.org/atomic"
)

import (
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/protocol/codec"
log "github.com/seata/seata-go/pkg/util/log"
)

type ClientEventHandler struct {
}

func NewClientEventHandler() *ClientEventHandler {
return &ClientEventHandler{}
}

func (h *ClientEventHandler) OnOpen(session getty.Session) error {
log.Infof("OnOpen session{%s} open", session.Stat())
rpcClient.RegisterGettySession(session)
return nil
}

func (h *ClientEventHandler) OnError(session getty.Session, err error) {
log.Infof("OnError session{%s} got error{%v}, will be closed.", session.Stat(), err)
}

func (h *ClientEventHandler) OnClose(session getty.Session) {
log.Infof("OnClose session{%s} is closing......", session.Stat())
}

func (h *ClientEventHandler) OnMessage(session getty.Session, pkg interface{}) {
s, ok := pkg.(string)
if !ok {
log.Infof("illegal packge{%#v}", pkg)
return
}

log.Infof("OnMessage: %s", s)
}

func (h *ClientEventHandler) OnCron(session getty.Session) {
active := session.GetActive()
if 20*time.Second.Nanoseconds() < time.Since(active).Nanoseconds() {
log.Infof("OnCorn session{%s} timeout{%s}", session.Stat(), time.Since(active).String())
session.Close()
}
}

func (client *ClientEventHandler) sendMergedMessage(mergedMessage protocol.MergedWarpMessage) {
ss := rpcClient.AcquireGettySession()
err := client.sendAsync(ss, mergedMessage)
if err != nil {
log.Errorf("error sendMergedMessage")
}
}

func (client *ClientEventHandler) sendAsync(session getty.Session, msg interface{}) error {
var err error
if session == nil || session.IsClosed() {
log.Warn("sendAsyncRequestWithResponse nothing, caused by null channel.")
}
idGenerator := atomic.Uint32{}
rpcMessage := protocol.RpcMessage{
ID: int32(idGenerator.Inc()),
MessageType: protocol.MSGTypeRequestOneway,
Codec: codec.SEATA,
Compressor: 0,
Body: msg,
}
log.Infof("store message, id %d : %#v", rpcMessage.ID, msg)
//client.mergeMsgMap.Store(rpcMessage.ID, msg)
//config timeout
pkgLen, sendLen, err := session.WritePkg(rpcMessage, time.Duration(0))
if err != nil || (pkgLen != 0 && pkgLen != sendLen) {
log.Warnf("start to close the session because %d of %d bytes data is sent success. err:%+v", sendLen, pkgLen, err)
//runtime.GoWithRecover(func() {
// session.Close()
//}, nil)
return errors.Wrap(err, "pkg not send completely!")
}
return nil
}

+ 27
- 0
pkg/rpc11/rpc_remoting_client_test.go View File

@@ -0,0 +1,27 @@
package rpc11

import (
"testing"
"time"
)

import (
"github.com/seata/seata-go/pkg/protocol"
)

func TestSendMergedMessage(test *testing.T) {
request := protocol.RegisterRMRequest{
ResourceIds: "1111",
AbstractIdentifyRequest: protocol.AbstractIdentifyRequest{
ApplicationId: "ApplicationID",
TransactionServiceGroup: "TransactionServiceGroup",
},
}
mergedMessage := protocol.MergedWarpMessage{
Msgs: []protocol.MessageTypeAware{request},
MsgIds: []int32{1212},
}
handler := NewClientEventHandler()
handler.sendMergedMessage(mergedMessage)
time.Sleep(100000 * time.Second)
}

+ 21
- 0
pkg/rpc_client/client_message_sender.go View File

@@ -0,0 +1,21 @@
package rpc_client

import (
"time"
)

import (
"github.com/seata/seata-go/pkg/protocol"
)

type ClientMessageSender interface {

// Send msg with response object.
SendMsgWithResponse(msg interface{}) (interface{}, error)

// Send msg with response object.
SendMsgWithResponseAndTimeout(msg interface{}, timeout time.Duration) (interface{}, error)

// Send response.
SendResponse(request protocol.RpcMessage, serverAddress string, msg interface{})
}

+ 101
- 0
pkg/rpc_client/getty_client_session_manager.go View File

@@ -0,0 +1,101 @@
package rpc_client

import (
"sync"
"sync/atomic"
"time"
)

import (
getty "github.com/apache/dubbo-getty"
)

var (
MAX_CHECK_ALIVE_RETRY = 600

CHECK_ALIVE_INTERNAL = 100

allSessions = sync.Map{}

// serverAddress -> rpc_client.Session -> bool
serverSessions = sync.Map{}

sessionSize int32 = 0

clientSessionManager = &GettyClientSessionManager{}
)

type GettyClientSessionManager struct{}

func (sessionManager *GettyClientSessionManager) AcquireGettySession() getty.Session {
// map 遍历是随机的
var session getty.Session
allSessions.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
sessionManager.ReleaseGettySession(session)
} else {
return false
}
return true
})
if session != nil {
return session
}
if sessionSize == 0 {
ticker := time.NewTicker(time.Duration(CHECK_ALIVE_INTERNAL) * time.Millisecond)
defer ticker.Stop()
for i := 0; i < MAX_CHECK_ALIVE_RETRY; i++ {
<-ticker.C
allSessions.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
sessionManager.ReleaseGettySession(session)
} else {
return false
}
return true
})
if session != nil {
return session
}
}
}
return nil
}

func (sessionManager *GettyClientSessionManager) AcquireGettySessionByServerAddress(serverAddress string) getty.Session {
m, _ := serverSessions.LoadOrStore(serverAddress, &sync.Map{})
sMap := m.(*sync.Map)

var session getty.Session
sMap.Range(func(key, value interface{}) bool {
session = key.(getty.Session)
if session.IsClosed() {
sessionManager.ReleaseGettySession(session)
} else {
return false
}
return true
})
return session
}

func (sessionManager *GettyClientSessionManager) ReleaseGettySession(session getty.Session) {
allSessions.Delete(session)
if !session.IsClosed() {
m, _ := serverSessions.LoadOrStore(session.RemoteAddr(), &sync.Map{})
sMap := m.(*sync.Map)
sMap.Delete(session)
session.Close()
}
atomic.AddInt32(&sessionSize, -1)
}

func (sessionManager *GettyClientSessionManager) RegisterGettySession(session getty.Session) {
allSessions.Store(session, true)
m, _ := serverSessions.LoadOrStore(session.RemoteAddr(), &sync.Map{})
sMap := m.(*sync.Map)
sMap.Store(session, true)
atomic.AddInt32(&sessionSize, 1)
}

+ 279
- 0
pkg/rpc_client/readwriter.go View File

@@ -0,0 +1,279 @@
package rpc_client

import (
"bytes"
"encoding/binary"
)

import (
getty "github.com/apache/dubbo-getty"

"github.com/pkg/errors"

"vimagination.zapto.org/byteio"
)

import (
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/protocol/codec"
)

/**
* <pre>
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
* +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
* | magic |Proto| Full length | Head | Msg |Seria|Compr| RequestID |
* | code |colVer| (head+body) | Length |Type |lizer|ess | |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
* | |
* | Head Map [Optional] |
* +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
* | |
* | body |
* | |
* | ... ... |
* +-----------------------------------------------------------------------------------------------+
* </pre>
* <p>
* <li>Full Length: include all data </li>
* <li>Head Length: include head data from magic code to head map. </li>
* <li>Body Length: Full Length - Head Length</li>
* </p>
* https://github.com/seata/seata/issues/893
*/
const (
SeataV1PackageHeaderReservedLength = 16
)

var (
// RpcPkgHandler
rpcPkgHandler = &RpcPackageHandler{}
)

var (
ErrNotEnoughStream = errors.New("packet stream is not enough")
ErrTooLargePackage = errors.New("package length is exceed the getty package's legal maximum length.")
ErrInvalidPackage = errors.New("invalid rpc package")
ErrIllegalMagic = errors.New("package magic is not right.")
)

type RpcPackageHandler struct{}

type SeataV1PackageHeader struct {
Magic0 byte
Magic1 byte
Version byte
TotalLength uint32
HeadLength uint16
MessageType byte
CodecType byte
CompressType byte
ID uint32
Meta map[string]string
BodyLength uint32
}

func (h *SeataV1PackageHeader) Unmarshal(buf *bytes.Buffer) (int, error) {
bufLen := buf.Len()
if bufLen < SeataV1PackageHeaderReservedLength {
return 0, ErrNotEnoughStream
}

// magic
if err := binary.Read(buf, binary.BigEndian, &(h.Magic0)); err != nil {
return 0, err
}
if err := binary.Read(buf, binary.BigEndian, &(h.Magic1)); err != nil {
return 0, err
}
if h.Magic0 != protocol.MAGIC_CODE_BYTES[0] || h.Magic1 != protocol.MAGIC_CODE_BYTES[1] {
return 0, ErrIllegalMagic
}
// version
if err := binary.Read(buf, binary.BigEndian, &(h.Version)); err != nil {
return 0, err
}
// TODO check version compatible here

// total length
if err := binary.Read(buf, binary.BigEndian, &(h.TotalLength)); err != nil {
return 0, err
}
// head length
if err := binary.Read(buf, binary.BigEndian, &(h.HeadLength)); err != nil {
return 0, err
}
// message type
if err := binary.Read(buf, binary.BigEndian, &(h.MessageType)); err != nil {
return 0, err
}
// codec type
if err := binary.Read(buf, binary.BigEndian, &(h.CodecType)); err != nil {
return 0, err
}
// compress type
if err := binary.Read(buf, binary.BigEndian, &(h.CompressType)); err != nil {
return 0, err
}
// id
if err := binary.Read(buf, binary.BigEndian, &(h.ID)); err != nil {
return 0, err
}
// todo meta map
if h.HeadLength > SeataV1PackageHeaderReservedLength {
headMapLength := h.HeadLength - SeataV1PackageHeaderReservedLength
h.Meta = headMapDecode(buf.Bytes()[:headMapLength])
}
h.BodyLength = h.TotalLength - uint32(h.HeadLength)

return int(h.TotalLength), nil
}

// Read read binary data from to rpc message
func (p *RpcPackageHandler) Read(ss getty.Session, data []byte) (interface{}, int, error) {
var header SeataV1PackageHeader

buf := bytes.NewBuffer(data)
_, err := header.Unmarshal(buf)
if err != nil {
if err == ErrNotEnoughStream {
// getty case2
return nil, 0, nil
}
// getty case1
return nil, 0, err
}
if uint32(len(data)) < header.TotalLength {
// get case3
return nil, int(header.TotalLength), nil
}

//r := byteio.BigEndianReader{Reader: bytes.NewReader(data)}
rpcMessage := protocol.RpcMessage{
Codec: header.CodecType,
ID: int32(header.ID),
Compressor: header.CompressType,
MessageType: header.MessageType,
HeadMap: header.Meta,
}

if header.MessageType == protocol.MSGTypeHeartbeatRequest {
rpcMessage.Body = protocol.HeartBeatMessagePing
} else if header.MessageType == protocol.MSGTypeHeartbeatResponse {
rpcMessage.Body = protocol.HeartBeatMessagePong
} else {
if header.BodyLength > 0 {
//todo compress
msg, _ := codec.MessageDecoder(header.CodecType, data[header.HeadLength:])
rpcMessage.Body = msg
}
}

return rpcMessage, int(header.TotalLength), nil
}

// Write write rpc message to binary data
func (p *RpcPackageHandler) Write(ss getty.Session, pkg interface{}) ([]byte, error) {
msg, ok := pkg.(protocol.RpcMessage)
if !ok {
return nil, ErrInvalidPackage
}

fullLength := protocol.V1HeadLength
headLength := protocol.V1HeadLength
var result = make([]byte, 0, fullLength)

var b bytes.Buffer
w := byteio.BigEndianWriter{Writer: &b}

result = append(result, protocol.MAGIC_CODE_BYTES[:2]...)
result = append(result, protocol.VERSION)

w.WriteByte(msg.MessageType)
w.WriteByte(msg.Codec)
w.WriteByte(msg.Compressor)
w.WriteInt32(msg.ID)

if msg.HeadMap != nil && len(msg.HeadMap) > 0 {
headMapBytes, headMapLength := headMapEncode(msg.HeadMap)
headLength += headMapLength
fullLength += headMapLength
w.Write(headMapBytes)
}

if msg.MessageType != protocol.MSGTypeHeartbeatRequest &&
msg.MessageType != protocol.MSGTypeHeartbeatResponse {

bodyBytes := codec.MessageEncoder(msg.Codec, msg.Body)
fullLength += len(bodyBytes)
w.Write(bodyBytes)
}

fullLen := int32(fullLength)
headLen := int16(headLength)
result = append(result, []byte{byte(fullLen >> 24), byte(fullLen >> 16), byte(fullLen >> 8), byte(fullLen)}...)
result = append(result, []byte{byte(headLen >> 8), byte(headLen)}...)
result = append(result, b.Bytes()...)

return result, nil
}

func headMapDecode(data []byte) map[string]string {
size := len(data)
if size == 0 {
return nil
}

mp := make(map[string]string)
r := byteio.BigEndianReader{Reader: bytes.NewReader(data)}

readLength := 0
for readLength < size {
var key, value string
lengthK, _, _ := r.ReadUint16()
if lengthK < 0 {
break
} else if lengthK == 0 {
key = ""
} else {
key, _, _ = r.ReadString(int(lengthK))
}

lengthV, _, _ := r.ReadUint16()
if lengthV < 0 {
break
} else if lengthV == 0 {
value = ""
} else {
value, _, _ = r.ReadString(int(lengthV))
}

mp[key] = value
readLength += int(lengthK + lengthV)
}

return mp
}

func headMapEncode(data map[string]string) ([]byte, int) {
var b bytes.Buffer

w := byteio.BigEndianWriter{Writer: &b}
for k, v := range data {
if k == "" {
w.WriteUint16(0)
} else {
w.WriteUint16(uint16(len(k)))
w.WriteString(k)
}

if v == "" {
w.WriteUint16(0)
} else {
w.WriteUint16(uint16(len(v)))
w.WriteString(v)
}
}

return b.Bytes(), b.Len()
}

+ 102
- 0
pkg/rpc_client/rpc_client.go View File

@@ -0,0 +1,102 @@
package rpc_client

import (
"fmt"
"github.com/seata/seata-go/pkg/config"
"github.com/seata/seata-go/pkg/util/log"
"net"
)

import (
getty "github.com/apache/dubbo-getty"

gxsync "github.com/dubbogo/gost/sync"
)

type RpcClient struct {
conf *config.ClientConfig
gettyClients []getty.Client
rpcHandler RpcRemoteClient
}

func init() {
newRpcClient()
}

func newRpcClient() *RpcClient {
rpcClient := &RpcClient{
conf: config.GetClientConfig(),
gettyClients: make([]getty.Client, 0),
rpcHandler: *InitRpcRemoteClient(),
}
rpcClient.init()
return rpcClient
}

func (c *RpcClient) init() {
addressList := getAvailServerList(c.conf)
if len(addressList) == 0 {
log.Warn("no have valid seata server list")
}
for _, address := range addressList {
gettyClient := getty.NewTCPClient(
getty.WithServerAddress(address),
getty.WithConnectionNumber((int)(c.conf.GettyConfig.ConnectionNum)),
getty.WithReconnectInterval(c.conf.GettyConfig.ReconnectInterval),
getty.WithClientTaskPool(gxsync.NewTaskPoolSimple(0)),
)
go gettyClient.RunEventLoop(c.newSession)
// c.gettyClients = append(c.gettyClients, gettyClient)
}
}

// todo mock
func getAvailServerList(config *config.ClientConfig) []string {
//reg, err := extension.GetRegistry(config.RegistryConfig.Mode)
//if err != nil {
// logger.Errorf("Registry can not connect success, program is going to panic.Error message is %s", err.Error())
// panic(err.Error())
//}
//addrs, err := reg.Lookup()
//if err != nil {
// logger.Errorf("no hava valid server list", err.Error())
// return nil
//}
//return addrs
return []string{"127.0.0.1:8091"}
}

func (c *RpcClient) newSession(session getty.Session) error {
var (
ok bool
tcpConn *net.TCPConn
)

if c.conf.GettyConfig.GettySessionParam.CompressEncoding {
session.SetCompressType(getty.CompressZip)
}

if tcpConn, ok = session.Conn().(*net.TCPConn); !ok {
panic(fmt.Sprintf("%s, session.conn{%#v} is not tcp connection\n", session.Stat(), session.Conn()))
}

tcpConn.SetNoDelay(c.conf.GettyConfig.GettySessionParam.TCPNoDelay)
tcpConn.SetKeepAlive(c.conf.GettyConfig.GettySessionParam.TCPKeepAlive)
if c.conf.GettyConfig.GettySessionParam.TCPKeepAlive {
tcpConn.SetKeepAlivePeriod(c.conf.GettyConfig.GettySessionParam.KeepAlivePeriod)
}
tcpConn.SetReadBuffer(c.conf.GettyConfig.GettySessionParam.TCPRBufSize)
tcpConn.SetWriteBuffer(c.conf.GettyConfig.GettySessionParam.TCPWBufSize)

session.SetName(c.conf.GettyConfig.GettySessionParam.SessionName)
session.SetMaxMsgLen(c.conf.GettyConfig.GettySessionParam.MaxMsgLen)
session.SetPkgHandler(rpcPkgHandler)
session.SetEventListener(&c.rpcHandler)
session.SetReadTimeout(c.conf.GettyConfig.GettySessionParam.TCPReadTimeout)
session.SetWriteTimeout(c.conf.GettyConfig.GettySessionParam.TCPWriteTimeout)
session.SetCronPeriod((int)(c.conf.GettyConfig.HeartbeatPeriod.Nanoseconds() / 1e6))
session.SetWaitTime(c.conf.GettyConfig.GettySessionParam.WaitTimeout)
log.Debugf("rpc_client new session:%s\n", session.Stat())

return nil
}

+ 412
- 0
pkg/rpc_client/rpc_remoting_client.go View File

@@ -0,0 +1,412 @@
package rpc_client

import (
"math/rand"
"strings"
"sync"
"time"
)

import (
getty "github.com/apache/dubbo-getty"

gxtime "github.com/dubbogo/gost/time"

"github.com/pkg/errors"

"go.uber.org/atomic"
)

import (
"github.com/seata/seata-go/pkg/config"
"github.com/seata/seata-go/pkg/protocol"
"github.com/seata/seata-go/pkg/protocol/codec"
log "github.com/seata/seata-go/pkg/util/log"
"github.com/seata/seata-go/pkg/util/runtime"
)

const (
RPC_REQUEST_TIMEOUT = 30 * time.Second
)

var rpcRemoteClient *RpcRemoteClient

func InitRpcRemoteClient() *RpcRemoteClient {
rpcRemoteClient = &RpcRemoteClient{
conf: config.GetClientConfig(),
idGenerator: &atomic.Uint32{},
futures: &sync.Map{},
mergeMsgMap: &sync.Map{},
rpcMessageChannel: make(chan protocol.RpcMessage, 100),
BranchRollbackRequestChannel: make(chan RpcRMMessage),
BranchCommitRequestChannel: make(chan RpcRMMessage),
GettySessionOnOpenChannel: make(chan string),
}
if rpcRemoteClient.conf.EnableClientBatchSendRequest {
go rpcRemoteClient.processMergedMessage()
}
return rpcRemoteClient
}

func GetRpcRemoteClient() *RpcRemoteClient {
return rpcRemoteClient
}

type RpcRemoteClient struct {
conf *config.ClientConfig
idGenerator *atomic.Uint32
futures *sync.Map
mergeMsgMap *sync.Map
rpcMessageChannel chan protocol.RpcMessage
BranchCommitRequestChannel chan RpcRMMessage
BranchRollbackRequestChannel chan RpcRMMessage
GettySessionOnOpenChannel chan string
}

// OnOpen ...
func (client *RpcRemoteClient) OnOpen(session getty.Session) error {
go func() {
request := protocol.RegisterTMRequest{AbstractIdentifyRequest: protocol.AbstractIdentifyRequest{
Version: client.conf.SeataVersion,
ApplicationId: client.conf.ApplicationID,
TransactionServiceGroup: client.conf.TransactionServiceGroup,
}}
_, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT)
if err == nil {
clientSessionManager.RegisterGettySession(session)
client.GettySessionOnOpenChannel <- session.RemoteAddr()
}
}()

return nil
}

// OnError ...
func (client *RpcRemoteClient) OnError(session getty.Session, err error) {
clientSessionManager.ReleaseGettySession(session)
}

// OnClose ...
func (client *RpcRemoteClient) OnClose(session getty.Session) {
clientSessionManager.ReleaseGettySession(session)
}

// OnMessage ...
func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) {
log.Debugf("received message: {%#v}", pkg)
rpcMessage, ok := pkg.(protocol.RpcMessage)
if !ok {
log.Errorf("received message is not protocol.RpcMessage. pkg: %#v", pkg)
return
}

heartBeat, isHeartBeat := rpcMessage.Body.(protocol.HeartBeatMessage)
if isHeartBeat && heartBeat == protocol.HeartBeatMessagePong {
log.Debugf("received PONG from %s", session.RemoteAddr())
return
}

if rpcMessage.MessageType == protocol.MSGTypeRequest ||
rpcMessage.MessageType == protocol.MSGTypeRequestOneway {
log.Debugf("msgID: %d, body: %#v", rpcMessage.ID, rpcMessage.Body)

client.onMessage(rpcMessage, session.RemoteAddr())
return
}

mergedResult, isMergedResult := rpcMessage.Body.(protocol.MergeResultMessage)
if isMergedResult {
mm, loaded := client.mergeMsgMap.Load(rpcMessage.ID)
if loaded {
mergedMessage := mm.(protocol.MergedWarpMessage)
log.Infof("rpcMessageID: %d, rpcMessage: %#v, result: %#v", rpcMessage.ID, mergedMessage, mergedResult)
for i := 0; i < len(mergedMessage.Msgs); i++ {
msgID := mergedMessage.MsgIds[i]
resp, loaded := client.futures.Load(msgID)
if loaded {
response := resp.(*protocol.MessageFuture)
response.Response = mergedResult.Msgs[i]
response.Done <- true
client.futures.Delete(msgID)
}
}
client.mergeMsgMap.Delete(rpcMessage.ID)
}
} else {
resp, loaded := client.futures.Load(rpcMessage.ID)
if loaded {
response := resp.(*protocol.MessageFuture)
response.Response = rpcMessage.Body
response.Done <- true
client.futures.Delete(rpcMessage.ID)
}
}
}

// OnCron ...
func (client *RpcRemoteClient) OnCron(session getty.Session) {
client.defaultSendRequest(session, protocol.HeartBeatMessagePing)
}

func (client *RpcRemoteClient) onMessage(rpcMessage protocol.RpcMessage, serverAddress string) {
msg := rpcMessage.Body.(protocol.MessageTypeAware)
log.Debugf("onMessage: %#v", msg)
switch msg.GetTypeCode() {
case protocol.TypeBranchCommit:
client.BranchCommitRequestChannel <- RpcRMMessage{
RpcMessage: rpcMessage,
ServerAddress: serverAddress,
}
case protocol.TypeBranchRollback:
client.BranchRollbackRequestChannel <- RpcRMMessage{
RpcMessage: rpcMessage,
ServerAddress: serverAddress,
}
case protocol.TypeRmDeleteUndolog:
break
default:
break
}
}

//*************************************
// ClientMessageSender
//*************************************
func (client *RpcRemoteClient) SendMsgWithResponse(msg interface{}) (interface{}, error) {
if client.conf.EnableClientBatchSendRequest {
return client.sendAsyncRequest2(msg, RPC_REQUEST_TIMEOUT)
}
return client.SendMsgWithResponseAndTimeout(msg, RPC_REQUEST_TIMEOUT)
}

func (client *RpcRemoteClient) SendMsgWithResponseAndTimeout(msg interface{}, timeout time.Duration) (interface{}, error) {
ss := clientSessionManager.AcquireGettySession()
return client.sendAsyncRequestWithResponse(ss, msg, timeout)
}

func (client *RpcRemoteClient) SendResponse(request protocol.RpcMessage, serverAddress string, msg interface{}) {
client.defaultSendResponse(request, clientSessionManager.AcquireGettySessionByServerAddress(serverAddress), msg)
}

func (client *RpcRemoteClient) sendAsyncRequestWithResponse(session getty.Session, msg interface{}, timeout time.Duration) (interface{}, error) {
if timeout <= time.Duration(0) {
return nil, errors.New("timeout should more than 0ms")
}
return client.sendAsyncRequest(session, msg, timeout)
}

func (client *RpcRemoteClient) sendAsyncRequestWithoutResponse(session getty.Session, msg interface{}) error {
_, err := client.sendAsyncRequest(session, msg, time.Duration(0))
return err
}

func (client *RpcRemoteClient) sendAsyncRequest(session getty.Session, msg interface{}, timeout time.Duration) (interface{}, error) {
var err error
if session == nil || session.IsClosed() {
log.Warn("sendAsyncRequestWithResponse nothing, caused by null channel.")
}
rpcMessage := protocol.RpcMessage{
ID: int32(client.idGenerator.Inc()),
MessageType: protocol.MSGTypeRequestOneway,
Codec: codec.SEATA,
Compressor: 0,
Body: msg,
}
resp := protocol.NewMessageFuture(rpcMessage)
client.futures.Store(rpcMessage.ID, resp)
//config timeout
_, _, err = session.WritePkg(rpcMessage, time.Duration(0))
if err != nil {
client.futures.Delete(rpcMessage.ID)
log.Errorf("send message: %#v, session: %s", rpcMessage, session.Stat())
return nil, err
}

log.Debugf("send message: %#v, session: %s", rpcMessage, session.Stat())

if timeout > time.Duration(0) {
select {
case <-gxtime.GetDefaultTimerWheel().After(timeout):
client.futures.Delete(rpcMessage.ID)
if session != nil {
return nil, errors.Errorf("wait response timeout, ip: %s, request: %#v", session.RemoteAddr(), rpcMessage)
} else {
return nil, errors.Errorf("wait response timeout and session is nil, request: %#v", rpcMessage)
}
case <-resp.Done:
err = resp.Err
return resp.Response, err
}
}

return nil, err
}

func (client *RpcRemoteClient) sendAsyncRequest2(msg interface{}, timeout time.Duration) (interface{}, error) {
var err error
rpcMessage := protocol.RpcMessage{
ID: int32(client.idGenerator.Inc()),
MessageType: protocol.MSGTypeRequest,
Codec: codec.SEATA,
Compressor: 0,
Body: msg,
}
resp := protocol.NewMessageFuture(rpcMessage)
client.futures.Store(rpcMessage.ID, resp)

client.rpcMessageChannel <- rpcMessage

log.Infof("send message: %#v", rpcMessage)

if timeout > time.Duration(0) {
select {
case <-gxtime.GetDefaultTimerWheel().After(timeout):
client.futures.Delete(rpcMessage.ID)
return nil, errors.Errorf("wait response timeout, request: %#v", rpcMessage)
case <-resp.Done:
err = resp.Err
}
return resp.Response, err
}
return nil, err
}

func (client *RpcRemoteClient) sendAsync(session getty.Session, msg interface{}) error {
var err error
if session == nil || session.IsClosed() {
log.Warn("sendAsyncRequestWithResponse nothing, caused by null channel.")
}
rpcMessage := protocol.RpcMessage{
ID: int32(client.idGenerator.Inc()),
MessageType: protocol.MSGTypeRequestOneway,
Codec: codec.SEATA,
Compressor: 0,
Body: msg,
}
log.Infof("store message, id %d : %#v", rpcMessage.ID, msg)
client.mergeMsgMap.Store(rpcMessage.ID, msg)
//config timeout
pkgLen, sendLen, err := session.WritePkg(rpcMessage, time.Duration(0))
if err != nil || (pkgLen != 0 && pkgLen != sendLen) {
log.Warnf("start to close the session because %d of %d bytes data is sent success. err:%+v", sendLen, pkgLen, err)
runtime.GoWithRecover(func() {
session.Close()
}, nil)
return errors.Wrap(err, "pkg not send completely!")
}
return nil
}

func (client *RpcRemoteClient) defaultSendRequest(session getty.Session, msg interface{}) {
rpcMessage := protocol.RpcMessage{
ID: int32(client.idGenerator.Inc()),
Codec: codec.SEATA,
Compressor: 0,
Body: msg,
}
_, ok := msg.(protocol.HeartBeatMessage)
if ok {
rpcMessage.MessageType = protocol.MSGTypeHeartbeatRequest
} else {
rpcMessage.MessageType = protocol.MSGTypeRequest
}
pkgLen, sendLen, err := session.WritePkg(rpcMessage, client.conf.GettyConfig.GettySessionParam.TCPWriteTimeout)
if err != nil || (pkgLen != 0 && pkgLen != sendLen) {
log.Warnf("start to close the session because %d of %d bytes data is sent success. err:%+v", sendLen, pkgLen, err)
runtime.GoWithRecover(func() {
session.Close()
}, nil)
}
}

func (client *RpcRemoteClient) defaultSendResponse(request protocol.RpcMessage, session getty.Session, msg interface{}) {
resp := protocol.RpcMessage{
ID: request.ID,
Codec: request.Codec,
Compressor: request.Compressor,
Body: msg,
}
_, ok := msg.(protocol.HeartBeatMessage)
if ok {
resp.MessageType = protocol.MSGTypeHeartbeatResponse
} else {
resp.MessageType = protocol.MSGTypeResponse
}

pkgLen, sendLen, err := session.WritePkg(resp, time.Duration(0))
if err != nil || (pkgLen != 0 && pkgLen != sendLen) {
log.Warnf("start to close the session because %d of %d bytes data is sent success. err:%+v", sendLen, pkgLen, err)
runtime.GoWithRecover(func() {
session.Close()
}, nil)
}
}

func (client *RpcRemoteClient) RegisterResource(serverAddress string, request protocol.RegisterRMRequest) {
session := clientSessionManager.AcquireGettySessionByServerAddress(serverAddress)
if session != nil {
err := client.sendAsyncRequestWithoutResponse(session, request)
if err != nil {
log.Errorf("register resource failed, session:{},resourceID:{}", session, request.ResourceIds)
}
}
}

func loadBalance(transactionServiceGroup string) string {
addressList := getAddressList(transactionServiceGroup)
if len(addressList) == 1 {
return addressList[0]
}
return addressList[rand.Intn(len(addressList))]
}

func getAddressList(transactionServiceGroup string) []string {
addressList := strings.Split(transactionServiceGroup, ",")
return addressList
}

func (client *RpcRemoteClient) processMergedMessage() {
ticker := time.NewTicker(5 * time.Millisecond)
mergedMessage := protocol.MergedWarpMessage{
Msgs: make([]protocol.MessageTypeAware, 0),
MsgIds: make([]int32, 0),
}
for {
select {
case rpcMessage := <-client.rpcMessageChannel:
message := rpcMessage.Body.(protocol.MessageTypeAware)
mergedMessage.Msgs = append(mergedMessage.Msgs, message)
mergedMessage.MsgIds = append(mergedMessage.MsgIds, rpcMessage.ID)
if len(mergedMessage.Msgs) == 20 {
client.sendMergedMessage(mergedMessage)
mergedMessage = protocol.MergedWarpMessage{
Msgs: make([]protocol.MessageTypeAware, 0),
MsgIds: make([]int32, 0),
}
}
case <-ticker.C:
if len(mergedMessage.Msgs) > 0 {
client.sendMergedMessage(mergedMessage)
mergedMessage = protocol.MergedWarpMessage{
Msgs: make([]protocol.MessageTypeAware, 0),
MsgIds: make([]int32, 0),
}
}
}
}
}

func (client *RpcRemoteClient) sendMergedMessage(mergedMessage protocol.MergedWarpMessage) {
ss := clientSessionManager.AcquireGettySession()
err := client.sendAsync(ss, mergedMessage)
if err != nil {
for _, id := range mergedMessage.MsgIds {
resp, loaded := client.futures.Load(id)
if loaded {
response := resp.(*protocol.MessageFuture)
response.Done <- true
client.futures.Delete(id)
}
}
}
}

+ 24
- 0
pkg/rpc_client/rpc_remoting_client_test.go View File

@@ -0,0 +1,24 @@
package rpc_client

import (
"github.com/seata/seata-go/pkg/protocol"
"testing"
"time"
)

func TestSendMsgWithResponse(test *testing.T) {
request := protocol.RegisterRMRequest{
ResourceIds: "1111",
AbstractIdentifyRequest: protocol.AbstractIdentifyRequest{
ApplicationId: "ApplicationID",
TransactionServiceGroup: "TransactionServiceGroup",
},
}
mergedMessage := protocol.MergedWarpMessage{
Msgs: []protocol.MessageTypeAware{request},
MsgIds: []int32{1212},
}
handler := GetRpcRemoteClient()
handler.sendMergedMessage(mergedMessage)
time.Sleep(100000 * time.Second)
}

+ 10
- 0
pkg/rpc_client/rpc_rm_message.go View File

@@ -0,0 +1,10 @@
package rpc_client

import (
"github.com/seata/seata-go/pkg/protocol"
)

type RpcRMMessage struct {
RpcMessage protocol.RpcMessage
ServerAddress string
}

+ 45
- 0
pkg/tm/api/global_transaction.go View File

@@ -0,0 +1,45 @@
package api

import (
"github.com/seata/seata-go/pkg/model"
)

type GlobalTransactionRole int8

const (
LAUNCHER GlobalTransactionRole = 0
PARTICIPANT GlobalTransactionRole = 1
)

type GlobalTransaction interface {

// Begin a new global transaction with given timeout and given name.
begin(timeout int64, name string) error

// Commit the global transaction.
commit() error

// Rollback the global transaction.
rollback() error

// Suspend the global transaction.
suspend() (SuspendedResourcesHolder, error)

// Resume the global transaction.
resume(suspendedResourcesHolder SuspendedResourcesHolder) error

// Ask TC for current status of the corresponding global transaction.
getStatus() (model.GlobalStatus, error)

// Get XID.
getXid() string

// report the global transaction status.
globalReport(globalStatus model.GlobalStatus) error

// local status of the global transaction.
getLocalStatus() model.GlobalStatus

// get global transaction role.
getGlobalTransactionRole() GlobalTransactionRole
}

+ 11
- 0
pkg/tm/api/suspended_resources_holder.go View File

@@ -0,0 +1,11 @@
package api

type SuspendedResourcesHolder struct {
Xid string
}

func NewSuspendedResourcesHolder(xid string) SuspendedResourcesHolder {
return SuspendedResourcesHolder{
Xid: xid,
}
}

+ 218
- 0
pkg/util/log/logging.go View File

@@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package log

import (
"bytes"
"errors"
"fmt"
)

import (
"github.com/natefinch/lumberjack"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// Level represents the level of logging.
type LogLevel int8

const (
// DebugLevel logs are typically voluminous, and are usually disabled in
// production.
DebugLevel = LogLevel(zapcore.DebugLevel)
// InfoLevel is the default logging priority.
InfoLevel = LogLevel(zapcore.InfoLevel)
// WarnLevel logs are more important than Info, but don't need individual
// human review.
WarnLevel = LogLevel(zapcore.WarnLevel)
// ErrorLevel logs are high-priority. If an application is running smoothly,
// it shouldn't generate any error-level logs.
ErrorLevel = LogLevel(zapcore.ErrorLevel)
// PanicLevel logs a message, then panics.
PanicLevel = LogLevel(zapcore.PanicLevel)
// FatalLevel logs a message, then calls os.Exit(1).
FatalLevel = LogLevel(zapcore.FatalLevel)
)

func (l *LogLevel) UnmarshalText(text []byte) error {
if l == nil {
return errors.New("can't unmarshal a nil *Level")
}
if !l.unmarshalText(text) && !l.unmarshalText(bytes.ToLower(text)) {
return fmt.Errorf("unrecognized level: %q", text)
}
return nil
}

func (l *LogLevel) unmarshalText(text []byte) bool {
switch string(text) {
case "debug", "DEBUG":
*l = DebugLevel
case "info", "INFO", "": // make the zero value useful
*l = InfoLevel
case "warn", "WARN":
*l = WarnLevel
case "error", "ERROR":
*l = ErrorLevel
case "panic", "PANIC":
*l = PanicLevel
case "fatal", "FATAL":
*l = FatalLevel
default:
return false
}
return true
}

type Logger interface {
Debug(v ...interface{})
Debugf(format string, v ...interface{})

Info(v ...interface{})
Infof(format string, v ...interface{})

Warn(v ...interface{})
Warnf(format string, v ...interface{})

Error(v ...interface{})
Errorf(format string, v ...interface{})

Panic(v ...interface{})
Panicf(format string, v ...interface{})

Fatal(v ...interface{})
Fatalf(format string, v ...interface{})
}

var (
log Logger
zapLogger *zap.Logger

zapLoggerConfig = zap.NewDevelopmentConfig()
zapLoggerEncoderConfig = zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "level",
NameKey: "logger",
CallerKey: "caller",
MessageKey: "message",
StacktraceKey: "stacktrace",
EncodeLevel: zapcore.CapitalColorLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}
)

func init() {
zapLoggerConfig.EncoderConfig = zapLoggerEncoderConfig
zapLogger, _ = zapLoggerConfig.Build(zap.AddCallerSkip(1))
log = zapLogger.Sugar()
}

func Init(logPath string, level LogLevel) {
lumberJackLogger := &lumberjack.Logger{
Filename: logPath,
MaxSize: 10,
MaxBackups: 5,
MaxAge: 30,
Compress: false,
}
syncer := zapcore.AddSync(lumberJackLogger)

encoderConfig := zap.NewProductionEncoderConfig()
encoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
encoderConfig.EncodeLevel = zapcore.CapitalLevelEncoder

encoder := zapcore.NewConsoleEncoder(encoderConfig)
core := zapcore.NewCore(encoder, syncer, zap.NewAtomicLevelAt(zapcore.Level(level)))
zapLogger = zap.New(core, zap.AddCaller())

log = zapLogger.Sugar()
}

// SetLogger: customize yourself logger.
func SetLogger(logger Logger) {
log = logger
}

// GetLogger get logger
func GetLogger() Logger {
return log
}

// Debug ...
func Debug(v ...interface{}) {
log.Debug(v...)
}

// Debugf ...
func Debugf(format string, v ...interface{}) {
log.Debugf(format, v...)
}

// Info ...
func Info(v ...interface{}) {
log.Info(v...)
}

// Infof ...
func Infof(format string, v ...interface{}) {
log.Infof(format, v...)
}

// Warn ...
func Warn(v ...interface{}) {
log.Warn(v...)
}

// Warnf ...
func Warnf(format string, v ...interface{}) {
log.Warnf(format, v...)
}

// Error ...
func Error(v ...interface{}) {
log.Error(v...)
}

// Errorf ...
func Errorf(format string, v ...interface{}) {
log.Errorf(format, v...)
}

// Panic ...
func Panic(v ...interface{}) {
log.Panic(v...)
}

// Panicf ...
func Panicf(format string, v ...interface{}) {
log.Panicf(format, v...)
}

// Fatal ...
func Fatal(v ...interface{}) {
log.Fatal(v...)
}

// Fatalf ...
func Fatalf(format string, v ...interface{}) {
log.Fatalf(format, v...)
}

+ 38
- 0
pkg/util/runtime/goroutine.go View File

@@ -0,0 +1,38 @@
package runtime

import (
"fmt"
"os"
"runtime/debug"
"time"
)

var debugIgnoreStdout = false

// GoWithRecover wraps a `go func()` with recover()
func GoWithRecover(handler func(), recoverHandler func(r interface{})) {
go func() {
defer func() {
if r := recover(); r != nil {
// TODO: log
if !debugIgnoreStdout {
fmt.Fprintf(os.Stderr, "%s goroutine panic: %v\n%s\n",
time.Now(), r, string(debug.Stack()))
}
if recoverHandler != nil {
go func() {
defer func() {
if p := recover(); p != nil {
if !debugIgnoreStdout {
fmt.Fprintf(os.Stderr, "recover goroutine panic:%v\n%s\n", p, string(debug.Stack()))
}
}
}()
recoverHandler(r)
}()
}
}
}()
handler()
}()
}

+ 26
- 0
test/integration_test.go View File

@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package test

import (
"testing"
)

func Test1(t *testing.T) {

}

+ 29
- 0
testdata/sql/all_in_one.sql View File

@@ -0,0 +1,29 @@
DROP TABLE IF EXISTS `stock_tbl`;
CREATE TABLE `stock_tbl`
(
`id` int(11) NOT NULL AUTO_INCREMENT,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
PRIMARY KEY (`id`),
UNIQUE KEY (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

DROP TABLE IF EXISTS `order_tbl`;
CREATE TABLE `order_tbl`
(
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`commodity_code` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT 0,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

DROP TABLE IF EXISTS `account_tbl`;
CREATE TABLE `account_tbl`
(
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_id` varchar(255) DEFAULT NULL,
`money` int(11) DEFAULT 0,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Loading…
Cancel
Save