@@ -6,15 +6,17 @@ require ( | |||
github.com/BurntSushi/toml v1.1.0 // indirect | |||
github.com/apache/dubbo-getty v1.4.8 | |||
github.com/dubbogo/gost v1.11.23 | |||
github.com/dubbogo/tools v1.0.9 // indirect | |||
github.com/fagongzi/goetty v1.3.1 | |||
github.com/fagongzi/log v0.0.0-20170831135209-9a647df25e0e | |||
github.com/fagongzi/util v0.0.0-20181102105153-fd38e0f42a4f | |||
github.com/golang/snappy v0.0.4 // indirect | |||
github.com/kr/pretty v0.3.0 // indirect | |||
github.com/natefinch/lumberjack v2.0.0+incompatible | |||
github.com/pkg/errors v0.9.1 | |||
github.com/rogpeppe/go-internal v1.8.0 // indirect | |||
github.com/stretchr/testify v1.7.0 | |||
go.uber.org/atomic v1.9.0 | |||
go.uber.org/zap v1.19.1 | |||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect | |||
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect | |||
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10 | |||
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect | |||
@@ -64,7 +64,7 @@ type GettySessionParam struct { | |||
TCPNoDelay bool `default:"true" yaml:"tcp_no_delay" json:"tcp_no_delay,omitempty"` | |||
TCPKeepAlive bool `default:"true" yaml:"tcp_keep_alive" json:"tcp_keep_alive,omitempty"` | |||
KeepAlivePeriod time.Duration `default:"180" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"` | |||
CronPeriod time.Duration `default:"1" yaml:"keep_alive_period" json:"keep_alive_period,omitempty"` | |||
CronPeriod time.Duration `default:"1" yaml:"cron_period" json:"cron_period,omitempty"` | |||
TCPRBufSize int `default:"262144" yaml:"tcp_r_buf_size" json:"tcp_r_buf_size,omitempty"` | |||
TCPWBufSize int `default:"65536" yaml:"tcp_w_buf_size" json:"tcp_w_buf_size,omitempty"` | |||
TCPReadTimeout time.Duration `default:"1" yaml:"tcp_read_timeout" json:"tcp_read_timeout,omitempty"` | |||
@@ -27,7 +27,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &BranchRegisterRequestCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchRegisterRequestCodec{}) | |||
} | |||
type BranchRegisterRequestCodec struct { | |||
@@ -0,0 +1,41 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/branch" | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestBranchRegisterRequestCodec(t *testing.T) { | |||
msg := message.BranchRegisterRequest{ | |||
Xid: "abc134", | |||
ResourceId: "124", | |||
LockKey: "a:1,b:2", | |||
ApplicationData: []byte("abc"), | |||
BranchType: branch.BranchTypeTCC, | |||
} | |||
codec := BranchRegisterRequestCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -27,7 +27,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &BranchRegisterResponseCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchRegisterResponseCodec{}) | |||
} | |||
type BranchRegisterResponseCodec struct { | |||
@@ -59,8 +59,8 @@ func (c *BranchRegisterResponseCodec) Encode(in interface{}) []byte { | |||
buf := goetty.NewByteBuf(0) | |||
resp, _ := in.(message.BranchRegisterResponse) | |||
resultCode := ReadByte(buf) | |||
if resultCode == byte(message.ResultCodeFailed) { | |||
buf.WriteByte(byte(resp.ResultCode)) | |||
if resp.ResultCode == message.ResultCodeFailed { | |||
var msg string | |||
if len(resp.Msg) > 128 { | |||
msg = resp.Msg[:128] | |||
@@ -72,17 +72,17 @@ func (c *BranchRegisterResponseCodec) Encode(in interface{}) []byte { | |||
buf.WriteByte(byte(resp.TransactionExceptionCode)) | |||
branchID := uint64(resp.BranchId) | |||
branchIdBytes := []byte{ | |||
byte(branchID >> 56), | |||
byte(branchID >> 48), | |||
byte(branchID >> 40), | |||
byte(branchID >> 32), | |||
byte(branchID >> 24), | |||
byte(branchID >> 16), | |||
byte(branchID >> 8), | |||
byte(branchID), | |||
} | |||
buf.Write(branchIdBytes) | |||
//branchIdBytes := []byte{ | |||
// byte(branchID >> 56), | |||
// byte(branchID >> 48), | |||
// byte(branchID >> 40), | |||
// byte(branchID >> 32), | |||
// byte(branchID >> 24), | |||
// byte(branchID >> 16), | |||
// byte(branchID >> 8), | |||
// byte(branchID), | |||
//} | |||
buf.WriteUInt64(branchID) | |||
return buf.RawBuf() | |||
} | |||
@@ -0,0 +1,44 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/seata/seata-go/pkg/protocol/transaction" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestBranchRegisterResponseCodec(t *testing.T) { | |||
msg := message.BranchRegisterResponse{ | |||
AbstractTransactionResponse: message.AbstractTransactionResponse{ | |||
AbstractResultMessage: message.AbstractResultMessage{ | |||
ResultCode: message.ResultCodeFailed, | |||
Msg: "FAILED", | |||
}, | |||
TransactionExceptionCode: transaction.TransactionExceptionCodeUnknown, | |||
}, | |||
BranchId: 124356567, | |||
} | |||
codec := BranchRegisterResponseCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -33,12 +33,11 @@ import ( | |||
type CodecType byte | |||
// TODO 待重构 | |||
const ( | |||
CodeTypeSeata = CodecType(0x1) | |||
CodeTypeProtobuf = CodecType(0x2) | |||
CodeTypeKRYO = CodecType(0x4) | |||
CodeTypeFST = CodecType(0x8) | |||
CodecTypeSeata = CodecType(0x1) | |||
CodecTypeProtobuf = CodecType(0x2) | |||
CodecTypeKRYO = CodecType(0x4) | |||
CodecTypeFST = CodecType(0x8) | |||
) | |||
type Codec interface { | |||
@@ -43,25 +43,17 @@ func (c *CommonGlobalEndRequestCodec) Decode(in []byte) interface{} { | |||
buf := goetty.NewByteBuf(len(in)) | |||
buf.Write(in) | |||
var length uint16 | |||
var xidLen int | |||
if buf.Readable() >= 2 { | |||
xidLen = int(ReadUInt16(buf)) | |||
length = ReadUInt16(buf) | |||
if length > 0 { | |||
bytes := make([]byte, length) | |||
res.Xid = string(Read(buf, bytes)) | |||
} | |||
if buf.Readable() >= xidLen { | |||
xidBytes := make([]byte, xidLen) | |||
xidBytes = Read(buf, xidBytes) | |||
res.Xid = string(xidBytes) | |||
} | |||
var extraDataLen int | |||
if buf.Readable() >= 2 { | |||
extraDataLen = int(ReadUInt16(buf)) | |||
} | |||
if buf.Readable() >= extraDataLen { | |||
extraDataBytes := make([]byte, xidLen) | |||
extraDataBytes = Read(buf, extraDataBytes) | |||
res.ExtraData = extraDataBytes | |||
length = ReadUInt16(buf) | |||
if length > 0 { | |||
bytes := make([]byte, length) | |||
res.ExtraData = Read(buf, bytes) | |||
} | |||
return res | |||
@@ -80,7 +80,7 @@ func (c *AbstractIdentifyRequestCodec) Decode(in []byte) interface{} { | |||
return msg | |||
} | |||
len = ReadUInt16(buf) | |||
if len > 0 && uint16(buf.Readable()) > len { | |||
if len > 0 && uint16(buf.Readable()) >= len { | |||
extraDataBytes := make([]byte, len) | |||
msg.ExtraData = Read(buf, extraDataBytes) | |||
} | |||
@@ -26,7 +26,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalBeginRequestCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalBeginRequestCodec{}) | |||
} | |||
type GlobalBeginRequestCodec struct { | |||
@@ -0,0 +1,37 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestGlobalBeginRequestCodec(t *testing.T) { | |||
msg := message.GlobalBeginRequest{ | |||
Timeout: 100, | |||
TransactionName: "SeataGoTransaction", | |||
} | |||
codec := GlobalBeginRequestCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -27,7 +27,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalBeginResponseCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalBeginResponseCodec{}) | |||
} | |||
type GlobalBeginResponseCodec struct { | |||
@@ -0,0 +1,46 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/seata/seata-go/pkg/protocol/transaction" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestGlobalBeginResponseCodec(t *testing.T) { | |||
msg := message.GlobalBeginResponse{ | |||
AbstractTransactionResponse: message.AbstractTransactionResponse{ | |||
AbstractResultMessage: message.AbstractResultMessage{ | |||
ResultCode: message.ResultCodeFailed, | |||
Msg: "FAILED", | |||
}, | |||
TransactionExceptionCode: transaction.TransactionExceptionCodeBeginFailed, | |||
}, | |||
Xid: "test-transaction-id", | |||
ExtraData: []byte("TestExtraData"), | |||
} | |||
codec := GlobalBeginResponseCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -22,7 +22,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalCommitRequestCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalCommitRequestCodec{}) | |||
} | |||
type GlobalCommitRequestCodec struct { | |||
@@ -0,0 +1,39 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestGlobalCommitRequestCodec(t *testing.T) { | |||
msg := message.GlobalCommitRequest{ | |||
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{ | |||
Xid: "test-transaction-id", | |||
ExtraData: []byte("TestExtraData"), | |||
}, | |||
} | |||
codec := GlobalCommitRequestCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -22,7 +22,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalCommitResponseCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalCommitResponseCodec{}) | |||
} | |||
type GlobalCommitResponseCodec struct { | |||
@@ -37,6 +37,11 @@ func (g *GlobalCommitResponseCodec) Decode(in []byte) interface{} { | |||
} | |||
} | |||
func (g *GlobalCommitResponseCodec) Encode(in interface{}) []byte { | |||
req := in.(message.GlobalCommitResponse) | |||
return g.CommonGlobalEndResponseCodec.Encode(req.AbstractGlobalEndResponse) | |||
} | |||
func (g *GlobalCommitResponseCodec) GetMessageType() message.MessageType { | |||
return message.MessageType_GlobalCommitResult | |||
} |
@@ -0,0 +1,45 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/seata/seata-go/pkg/protocol/transaction" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestGlobalCommitResponseCodec(t *testing.T) { | |||
msg := message.GlobalCommitResponse{ | |||
AbstractGlobalEndResponse: message.AbstractGlobalEndResponse{ | |||
AbstractTransactionResponse: message.AbstractTransactionResponse{ | |||
AbstractResultMessage: message.AbstractResultMessage{ | |||
ResultCode: message.ResultCodeFailed, | |||
Msg: "ResultCodeFailed message", | |||
}, | |||
}, | |||
GlobalStatus: transaction.GlobalStatusCommitted, | |||
}, | |||
} | |||
codec := GlobalCommitResponseCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -18,7 +18,7 @@ | |||
package codec | |||
//func init() { | |||
// GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalReportRequestCodec{}) | |||
// GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalReportRequestCodec{}) | |||
//} | |||
// | |||
//type GlobalReportRequestCodec struct { |
@@ -22,7 +22,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalReportResponseCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalReportResponseCodec{}) | |||
} | |||
type GlobalReportResponseCodec struct { | |||
@@ -37,6 +37,11 @@ func (g *GlobalReportResponseCodec) Decode(in []byte) interface{} { | |||
} | |||
} | |||
func (g *GlobalReportResponseCodec) Encode(in interface{}) []byte { | |||
req := in.(message.GlobalReportResponse) | |||
return g.CommonGlobalEndResponseCodec.Encode(req.AbstractGlobalEndResponse) | |||
} | |||
func (g *GlobalReportResponseCodec) GetMessageType() message.MessageType { | |||
return message.MessageType_GlobalReportResult | |||
} |
@@ -0,0 +1,45 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/seata/seata-go/pkg/protocol/transaction" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestGlobalReportResponseCodec(t *testing.T) { | |||
msg := message.GlobalReportResponse{ | |||
AbstractGlobalEndResponse: message.AbstractGlobalEndResponse{ | |||
AbstractTransactionResponse: message.AbstractTransactionResponse{ | |||
AbstractResultMessage: message.AbstractResultMessage{ | |||
ResultCode: message.ResultCodeFailed, | |||
Msg: "ResultCodeFailed message", | |||
}, | |||
}, | |||
GlobalStatus: transaction.GlobalStatusCommitted, | |||
}, | |||
} | |||
codec := GlobalReportResponseCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -22,7 +22,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalRollbackRequestCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalRollbackRequestCodec{}) | |||
} | |||
type GlobalRollbackRequestCodec struct { | |||
@@ -32,7 +32,7 @@ type GlobalRollbackRequestCodec struct { | |||
func (g *GlobalRollbackRequestCodec) Decode(in []byte) interface{} { | |||
req := g.CommonGlobalEndRequestCodec.Decode(in) | |||
abstractGlobalEndRequest := req.(message.AbstractGlobalEndRequest) | |||
return message.GlobalCommitRequest{ | |||
return message.GlobalRollbackRequest{ | |||
AbstractGlobalEndRequest: abstractGlobalEndRequest, | |||
} | |||
} | |||
@@ -0,0 +1,39 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestGlobalRollbackRequestCodec(t *testing.T) { | |||
msg := message.GlobalRollbackRequest{ | |||
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{ | |||
Xid: "test-transaction-id", | |||
ExtraData: []byte("TestExtraData"), | |||
}, | |||
} | |||
codec := GlobalRollbackRequestCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -22,7 +22,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalRollbackResponseCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalRollbackResponseCodec{}) | |||
} | |||
type GlobalRollbackResponseCodec struct { | |||
@@ -37,6 +37,11 @@ func (g *GlobalRollbackResponseCodec) Decode(in []byte) interface{} { | |||
} | |||
} | |||
func (g *GlobalRollbackResponseCodec) Encode(in interface{}) []byte { | |||
req := in.(message.GlobalRollbackResponse) | |||
return g.CommonGlobalEndResponseCodec.Encode(req.AbstractGlobalEndResponse) | |||
} | |||
func (g *GlobalRollbackResponseCodec) GetMessageType() message.MessageType { | |||
return message.MessageType_GlobalRollbackResult | |||
} |
@@ -0,0 +1,45 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/seata/seata-go/pkg/protocol/transaction" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestGlobalRollbackResponseCodec(t *testing.T) { | |||
msg := message.GlobalRollbackResponse{ | |||
AbstractGlobalEndResponse: message.AbstractGlobalEndResponse{ | |||
AbstractTransactionResponse: message.AbstractTransactionResponse{ | |||
AbstractResultMessage: message.AbstractResultMessage{ | |||
ResultCode: message.ResultCodeFailed, | |||
Msg: "ResultCodeFailed message", | |||
}, | |||
}, | |||
GlobalStatus: transaction.GlobalStatusCommitted, | |||
}, | |||
} | |||
codec := GlobalRollbackResponseCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -22,7 +22,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalStatusRequestCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalStatusRequestCodec{}) | |||
} | |||
type GlobalStatusRequestCodec struct { | |||
@@ -0,0 +1,39 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestGlobalStatusRequestCodec(t *testing.T) { | |||
msg := message.GlobalStatusRequest{ | |||
AbstractGlobalEndRequest: message.AbstractGlobalEndRequest{ | |||
Xid: "test-transaction-id", | |||
ExtraData: []byte("TestExtraData"), | |||
}, | |||
} | |||
codec := GlobalStatusRequestCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -22,7 +22,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &GlobalStatusResponseCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &GlobalStatusResponseCodec{}) | |||
} | |||
type GlobalStatusResponseCodec struct { | |||
@@ -37,6 +37,11 @@ func (g *GlobalStatusResponseCodec) Decode(in []byte) interface{} { | |||
} | |||
} | |||
func (g *GlobalStatusResponseCodec) Encode(in interface{}) []byte { | |||
req := in.(message.GlobalStatusResponse) | |||
return g.CommonGlobalEndResponseCodec.Encode(req.AbstractGlobalEndResponse) | |||
} | |||
func (g *GlobalStatusResponseCodec) GetMessageType() message.MessageType { | |||
return message.MessageType_GlobalStatusResult | |||
} |
@@ -0,0 +1,45 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/seata/seata-go/pkg/protocol/transaction" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestGlobalStatusResponseCodec(t *testing.T) { | |||
msg := message.GlobalStatusResponse{ | |||
AbstractGlobalEndResponse: message.AbstractGlobalEndResponse{ | |||
AbstractTransactionResponse: message.AbstractTransactionResponse{ | |||
AbstractResultMessage: message.AbstractResultMessage{ | |||
ResultCode: message.ResultCodeFailed, | |||
Msg: "ResultCodeFailed message", | |||
}, | |||
}, | |||
GlobalStatus: transaction.GlobalStatusCommitted, | |||
}, | |||
} | |||
codec := GlobalStatusResponseCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -26,7 +26,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &RegisterRMRequestCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterRMRequestCodec{}) | |||
} | |||
type RegisterRMRequestCodec struct { | |||
@@ -78,7 +78,7 @@ func (c *RegisterRMRequestCodec) Encode(in interface{}) []byte { | |||
Write16String(req.ApplicationId, buf) | |||
Write16String(req.TransactionServiceGroup, buf) | |||
Write16String(string(req.ExtraData), buf) | |||
Write16String(req.ResourceIds, buf) | |||
Write32String(req.ResourceIds, buf) | |||
return buf.RawBuf() | |||
} | |||
@@ -0,0 +1,42 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestRegisterRMRequestCodec(t *testing.T) { | |||
msg := message.RegisterRMRequest{ | |||
AbstractIdentifyRequest: message.AbstractIdentifyRequest{ | |||
Version: "V1,0", | |||
ApplicationId: "TestApplicationId", | |||
TransactionServiceGroup: "TestTransactionServiceGroup", | |||
ExtraData: []byte("TestExtraData"), | |||
}, | |||
ResourceIds: "TestResourceIds", | |||
} | |||
codec := RegisterRMRequestCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -22,7 +22,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &RegisterRMResponseCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterRMResponseCodec{}) | |||
} | |||
type RegisterRMResponseCodec struct { | |||
@@ -37,6 +37,11 @@ func (g *RegisterRMResponseCodec) Decode(in []byte) interface{} { | |||
} | |||
} | |||
func (c *RegisterRMResponseCodec) Encode(in interface{}) []byte { | |||
resp := in.(message.RegisterRMResponse) | |||
return c.AbstractIdentifyResponseCodec.Encode(resp.AbstractIdentifyResponse) | |||
} | |||
func (g *RegisterRMResponseCodec) GetMessageType() message.MessageType { | |||
return message.MessageType_RegRmResult | |||
} |
@@ -0,0 +1,44 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestRegisterRMResponseCodec(t *testing.T) { | |||
msg := message.RegisterRMResponse{ | |||
AbstractIdentifyResponse: message.AbstractIdentifyResponse{ | |||
AbstractResultMessage: message.AbstractResultMessage{ | |||
ResultCode: message.ResultCodeFailed, | |||
//Msg: "TestMsg", | |||
}, | |||
Version: "V1,0", | |||
Identified: false, | |||
}, | |||
} | |||
codec := RegisterRMResponseCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg.Identified, msg2.(message.RegisterRMResponse).Identified) | |||
assert.Equal(t, msg.Version, msg2.(message.RegisterRMResponse).Version) | |||
} |
@@ -22,7 +22,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &RegisterTMRequestCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterTMRequestCodec{}) | |||
} | |||
type RegisterTMRequestCodec struct { | |||
@@ -0,0 +1,41 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestRegisterTMRequestCodec(t *testing.T) { | |||
msg := message.RegisterTMRequest{ | |||
AbstractIdentifyRequest: message.AbstractIdentifyRequest{ | |||
Version: "V1,0", | |||
ApplicationId: "TestApplicationId", | |||
TransactionServiceGroup: "TestTransactionServiceGroup", | |||
ExtraData: []byte("TestExtraData"), | |||
}, | |||
} | |||
codec := RegisterTMRequestCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -22,7 +22,7 @@ import ( | |||
) | |||
func init() { | |||
GetCodecManager().RegisterCodec(CodeTypeSeata, &RegisterTMResponseCodec{}) | |||
GetCodecManager().RegisterCodec(CodecTypeSeata, &RegisterTMResponseCodec{}) | |||
} | |||
type RegisterTMResponseCodec struct { | |||
@@ -0,0 +1,45 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package codec | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestRegisterTMResponseCodec(t *testing.T) { | |||
msg := message.RegisterTMResponse{ | |||
AbstractIdentifyResponse: message.AbstractIdentifyResponse{ | |||
AbstractResultMessage: message.AbstractResultMessage{ | |||
ResultCode: message.ResultCodeFailed, | |||
Msg: "TestMsg", | |||
}, | |||
ExtraData: []byte("TestExtraData"), | |||
Version: "V1,0", | |||
Identified: false, | |||
}, | |||
} | |||
codec := RegisterTMResponseCodec{} | |||
bytes := codec.Encode(msg) | |||
msg2 := codec.Decode(bytes) | |||
assert.Equal(t, msg.Identified, msg2.(message.RegisterTMResponse).Identified) | |||
assert.Equal(t, msg.Version, msg2.(message.RegisterTMResponse).Version) | |||
} |
@@ -61,7 +61,7 @@ type ResourceManager interface { | |||
// Unregister a Resource from the Resource Manager | |||
UnregisterResource(resource Resource) error | |||
// Get all resources managed by this manager | |||
GetManagedResources() sync.Map | |||
GetManagedResources() *sync.Map | |||
// Get the BranchType | |||
GetBranchType() branch.BranchType | |||
} | |||
@@ -61,7 +61,7 @@ func (client *GettyRemotingClient) SendAsyncRequest(msg interface{}) error { | |||
rpcMessage := message.RpcMessage{ | |||
ID: int32(client.idGenerator.Inc()), | |||
Type: msgType, | |||
Codec: byte(codec.CodeTypeSeata), | |||
Codec: byte(codec.CodecTypeSeata), | |||
Compressor: 0, | |||
Body: msg, | |||
} | |||
@@ -72,7 +72,7 @@ func (client *GettyRemotingClient) SendAsyncResponse(msg interface{}) error { | |||
rpcMessage := message.RpcMessage{ | |||
ID: int32(client.idGenerator.Inc()), | |||
Type: message.GettyRequestType_Response, | |||
Codec: byte(codec.CodeTypeSeata), | |||
Codec: byte(codec.CodecTypeSeata), | |||
Compressor: 0, | |||
Body: msg, | |||
} | |||
@@ -83,7 +83,7 @@ func (client *GettyRemotingClient) SendSyncRequest(msg interface{}) (interface{} | |||
rpcMessage := message.RpcMessage{ | |||
ID: int32(client.idGenerator.Inc()), | |||
Type: message.GettyRequestType_RequestSync, | |||
Codec: byte(codec.CodeTypeSeata), | |||
Codec: byte(codec.CodecTypeSeata), | |||
Compressor: 0, | |||
Body: msg, | |||
} | |||
@@ -94,7 +94,7 @@ func (client *GettyRemotingClient) SendSyncRequestWithTimeout(msg interface{}, t | |||
rpcMessage := message.RpcMessage{ | |||
ID: int32(client.idGenerator.Inc()), | |||
Type: message.GettyRequestType_RequestSync, | |||
Codec: byte(codec.CodeTypeSeata), | |||
Codec: byte(codec.CodecTypeSeata), | |||
Compressor: 0, | |||
Body: msg, | |||
} | |||
@@ -0,0 +1,50 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package getty | |||
import ( | |||
"github.com/seata/seata-go/pkg/protocol/codec" | |||
"github.com/seata/seata-go/pkg/protocol/message" | |||
"github.com/stretchr/testify/assert" | |||
"testing" | |||
) | |||
func TestRpcPackageHandler(t *testing.T) { | |||
msg := message.RpcMessage{ | |||
ID: 1123, | |||
Type: message.GettyRequestType_RequestSync, | |||
Codec: byte(codec.CodecTypeSeata), | |||
Compressor: byte(1), | |||
HeadMap: map[string]string{ | |||
"name": " Jack", | |||
"age": "12", | |||
"address": "Beijing", | |||
}, | |||
Body: message.GlobalBeginRequest{ | |||
Timeout: 100, | |||
TransactionName: "SeataGoTransaction", | |||
}, | |||
} | |||
codec := RpcPackageHandler{} | |||
bytes, err := codec.Write(nil, msg) | |||
assert.Nil(t, err) | |||
msg2, _, _ := codec.Read(nil, bytes) | |||
assert.Equal(t, msg, msg2) | |||
} |
@@ -97,8 +97,8 @@ func (d *ResourceManager) UnregisterResource(resource resource.Resource) error { | |||
} | |||
// Get all resources managed by this manager | |||
func (d *ResourceManager) GetManagedResources() sync.Map { | |||
return resourceManagerMap | |||
func (d *ResourceManager) GetManagedResources() *sync.Map { | |||
return &resourceManagerMap | |||
} | |||
// Get the model.BranchType | |||
@@ -120,8 +120,8 @@ func (t *TCCResourceManager) RegisterResource(resource resource.Resource) error | |||
return t.rmRemoting.RegisterResource(resource) | |||
} | |||
func (t *TCCResourceManager) GetManagedResources() sync.Map { | |||
return t.resourceManagerMap | |||
func (t *TCCResourceManager) GetManagedResources() *sync.Map { | |||
return &t.resourceManagerMap | |||
} | |||
// Commit a branch transaction | |||