Browse Source

optimize commit (#354)

optimize commit
tags/1.0.2-RC1
Yuecai Liu GitHub 2 years ago
parent
commit
aedef6e9d8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 264 additions and 11 deletions
  1. +2
    -2
      pkg/datasource/sql/at.go
  2. +2
    -1
      pkg/datasource/sql/driver_test.go
  3. +17
    -0
      pkg/datasource/sql/mock/mock_datasource_manager.go
  4. +1
    -1
      pkg/datasource/sql/tx.go
  5. +21
    -0
      pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go
  6. +0
    -2
      pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder.go
  7. +2
    -1
      pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder_test.go
  8. +1
    -1
      pkg/datasource/sql/undo/executor/mysql_undo_update_executor.go
  9. +63
    -0
      pkg/protocol/codec/branch_report_req_codec.go
  10. +43
    -0
      pkg/protocol/codec/branch_report_req_codec_test.go
  11. +66
    -0
      pkg/protocol/codec/branch_statue_report_response_codec.go
  12. +44
    -0
      pkg/protocol/codec/branch_statue_report_response_codec_test.go
  13. +1
    -1
      pkg/remoting/processor/client/rm_branch_commit_processor.go
  14. +1
    -2
      pkg/rm/rm_remoting.go

+ 2
- 2
pkg/datasource/sql/at.go View File

@@ -116,7 +116,7 @@ func (a *ATSourceManager) BranchRollback(ctx context.Context, branchResource rm.
// BranchCommit
func (a *ATSourceManager) BranchCommit(ctx context.Context, resource rm.BranchResource) (branch.BranchStatus, error) {
a.worker.BranchCommit(ctx, resource)
return branch.BranchStatusPhaseoneDone, nil
return branch.BranchStatusPhasetwoCommitted, nil
}

// LockQuery
@@ -131,7 +131,7 @@ func (a *ATSourceManager) BranchRegister(ctx context.Context, req rm.BranchRegis

// BranchReport
func (a *ATSourceManager) BranchReport(ctx context.Context, param rm.BranchReportParam) error {
return nil
return a.rmRemoting.BranchReport(param)
}

// CreateTableMetaCache


+ 2
- 1
pkg/datasource/sql/driver_test.go View File

@@ -21,10 +21,11 @@ import (
"context"
"database/sql"
"database/sql/driver"
"github.com/seata/seata-go/pkg/rm"
"reflect"
"testing"

"github.com/seata/seata-go/pkg/rm"

"github.com/golang/mock/gomock"
"github.com/seata/seata-go/pkg/datasource/sql/mock"
"github.com/seata/seata-go/pkg/util/reflectx"


+ 17
- 0
pkg/datasource/sql/mock/mock_datasource_manager.go View File

@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Code generated by MockGen. DO NOT EDIT.
// Source: datasource_manager.go



+ 1
- 1
pkg/datasource/sql/tx.go View File

@@ -179,7 +179,7 @@ func (tx *Tx) report(success bool) error {
BranchId: int64(tx.tranCtx.BranchID),
Status: status,
}
dataSourceManager := datasource.GetDataSourceManager(branch.BranchType(tx.tranCtx.TransType))
dataSourceManager := datasource.GetDataSourceManager(tx.tranCtx.TransType.GetBranchType())
retry := REPORT_RETRY_COUNT
for retry > 0 {
err := dataSourceManager.BranchReport(context.Background(), request)


+ 21
- 0
pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go View File

@@ -20,6 +20,7 @@ package builder
import (
"testing"

"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/stretchr/testify/assert"
)

@@ -45,5 +46,25 @@ func TestBuildWhereConditionByPKs(t *testing.T) {
assert.Equal(t, test.expectSQL, sql)
})
}
}

func TestBuildLockKey(t *testing.T) {
metaData := types.TableMeta{
TableName: "test_name",
Indexs: map[string]types.IndexMeta{
"PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{{ColumnName: "id"}, {ColumnName: "userId"}}},
},
}

records := types.RecordImage{
TableName: "test_name",
Rows: []types.RowImage{
{Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, Name: "id", Value: 1}, {KeyType: types.IndexTypePrimaryKey, Name: "userId", Value: "one"}}},
{Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, Name: "id", Value: 2}, {KeyType: types.IndexTypePrimaryKey, Name: "userId", Value: "two"}}},
},
}

builder := BasicUndoLogBuilder{}
lockKeys := builder.buildLockKey2(&records, metaData)
assert.Equal(t, "test_name:1_one,2_two", lockKeys)
}

+ 0
- 2
pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder.go View File

@@ -134,8 +134,6 @@ func (u *MySQLUpdateUndoLogBuilder) AfterImage(ctx context.Context, execCtx *typ
return nil, err
}

lockKey := u.buildLockKey(rows, *metaData)
execCtx.TxCtx.LockKeys[lockKey] = struct{}{}
image.SQLType = execCtx.ParseContext.SQLType

return []*types.RecordImage{image}, nil


+ 2
- 1
pkg/datasource/sql/undo/builder/mysql_update_undo_log_builder_test.go View File

@@ -20,10 +20,11 @@ package builder
import (
"context"
"database/sql/driver"
"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"reflect"
"testing"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"

"github.com/agiledragon/gomonkey"
"github.com/seata/seata-go/pkg/datasource/sql/datasource/mysql"
"github.com/seata/seata-go/pkg/datasource/sql/types"


+ 1
- 1
pkg/datasource/sql/undo/executor/mysql_undo_update_executor.go View File

@@ -65,7 +65,7 @@ func (m *MySQLUndoUpdateExecutor) ExecuteOn(ctx context.Context, dbType types.DB
undoValues = append(undoValues, col.Value)
}

if _, err = stmt.Exec(undoValues); err != nil {
if _, err = stmt.Exec(undoValues...); err != nil {
return err
}
}


+ 63
- 0
pkg/protocol/codec/branch_report_req_codec.go View File

@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/util/bytes"
)

func init() {
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchReportRequestCodec{})
}

type BranchReportRequestCodec struct {
}

func (g *BranchReportRequestCodec) Decode(in []byte) interface{} {
data := message.BranchReportRequest{}
buf := bytes.NewByteBuffer(in)

data.Xid = bytes.ReadString16Length(buf)
data.BranchId = int64(bytes.ReadUInt64(buf))
data.Status = branch.BranchStatus(bytes.ReadByte(buf))
data.ResourceId = bytes.ReadString16Length(buf)
data.ApplicationData = []byte(bytes.ReadString32Length(buf))
data.BranchType = branch.BranchType(bytes.ReadByte(buf))

return data
}

func (g *BranchReportRequestCodec) Encode(in interface{}) []byte {
data, _ := in.(message.BranchReportRequest)
buf := bytes.NewByteBuffer([]byte{})

bytes.WriteString16Length(data.Xid, buf)
buf.WriteInt64(data.BranchId)
buf.WriteByte(byte(data.Status))
bytes.WriteString16Length(data.ResourceId, buf)
bytes.WriteString32Length(string(data.ApplicationData), buf)
buf.WriteByte(byte(data.BranchType))

return buf.Bytes()
}

func (g *BranchReportRequestCodec) GetMessageType() message.MessageType {
return message.MessageTypeBranchStatusReport
}

+ 43
- 0
pkg/protocol/codec/branch_report_req_codec_test.go View File

@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

model2 "github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/stretchr/testify/assert"
)

func TestBranchReportRequestCodec(t *testing.T) {
msg := message.BranchReportRequest{
Xid: "123344",
ResourceId: "root:12345678@tcp(127.0.0.1:3306)/seata_client",
Status: model2.BranchStatusPhaseoneDone,
BranchId: 56678,
BranchType: model2.BranchTypeAT,
ApplicationData: []byte("TestExtraData"),
}

codec := BranchReportRequestCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 66
- 0
pkg/protocol/codec/branch_statue_report_response_codec.go View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"math"

"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/util/bytes"
serror "github.com/seata/seata-go/pkg/util/errors"
)

func init() {
GetCodecManager().RegisterCodec(CodecTypeSeata, &BranchReportResponseCodec{})
}

type BranchReportResponseCodec struct{}

func (g *BranchReportResponseCodec) Decode(in []byte) interface{} {
data := message.BranchReportResponse{}
buf := bytes.NewByteBuffer(in)

data.ResultCode = message.ResultCode(bytes.ReadByte(buf))
if data.ResultCode == message.ResultCodeFailed {
data.Msg = bytes.ReadString8Length(buf)
}
data.TransactionErrorCode = serror.TransactionErrorCode(bytes.ReadByte(buf))

return data
}

func (g *BranchReportResponseCodec) Encode(in interface{}) []byte {
data, _ := in.(message.BranchReportResponse)
buf := bytes.NewByteBuffer([]byte{})

buf.WriteByte(byte(data.ResultCode))
if data.ResultCode == message.ResultCodeFailed {
msg := data.Msg
if len(data.Msg) > math.MaxInt8 {
msg = data.Msg[:math.MaxInt8]
}
bytes.WriteString8Length(msg, buf)
}
buf.WriteByte(byte(data.TransactionErrorCode))

return buf.Bytes()
}

func (g *BranchReportResponseCodec) GetMessageType() message.MessageType {
return message.MessageTypeBranchStatusReportResult
}

+ 44
- 0
pkg/protocol/codec/branch_statue_report_response_codec_test.go View File

@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package codec

import (
"testing"

"github.com/seata/seata-go/pkg/protocol/message"
serror "github.com/seata/seata-go/pkg/util/errors"
"github.com/stretchr/testify/assert"
)

func TestBranchReportResponseCodec(t *testing.T) {
msg := message.BranchReportResponse{
AbstractTransactionResponse: message.AbstractTransactionResponse{
TransactionErrorCode: serror.TransactionErrorCodeBeginFailed,
AbstractResultMessage: message.AbstractResultMessage{
ResultCode: message.ResultCodeFailed,
Msg: "FAILED",
},
},
}

codec := BranchReportResponseCodec{}
bytes := codec.Encode(msg)
msg2 := codec.Decode(bytes)

assert.Equal(t, msg, msg2)
}

+ 1
- 1
pkg/remoting/processor/client/rm_branch_commit_processor.go View File

@@ -87,6 +87,6 @@ func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage messag
log.Errorf("send branch commit response error: {%#v}", err.Error())
return err
}
log.Infof("send branch commit response success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)
log.Infof("send branch commit success: xid %v, branchID %v, resourceID %v, applicationData %v", xid, branchID, resourceID, applicationData)
return nil
}

+ 1
- 2
pkg/rm/rm_remoting.go View File

@@ -22,7 +22,6 @@ import (

"github.com/pkg/errors"

"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/util/log"
@@ -70,7 +69,7 @@ func (r *RMRemoting) BranchReport(param BranchReportParam) error {
BranchId: param.BranchId,
Status: param.Status,
ApplicationData: []byte(param.ApplicationData),
BranchType: branch.BranchTypeAT,
BranchType: param.BranchType,
}

resp, err := getty.GetGettyRemotingClient().SendSyncRequest(request)


Loading…
Cancel
Save