Browse Source

feature:add undo protobuf parser (#691)

* feature:add undo protobuf parser

* fix apache header
tags/v2.0.0
marsevilspirit GitHub 9 months ago
parent
commit
0fa027211a
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
6 changed files with 1954 additions and 4 deletions
  1. +1445
    -0
      pkg/datasource/sql/undo/parser/branch_undo_log.pb.go
  2. +165
    -0
      pkg/datasource/sql/undo/parser/branch_undo_log.proto
  3. +1
    -0
      pkg/datasource/sql/undo/parser/parser_cache.go
  4. +4
    -4
      pkg/datasource/sql/undo/parser/parser_json_test.go
  5. +248
    -0
      pkg/datasource/sql/undo/parser/parser_protobuf.go
  6. +91
    -0
      pkg/datasource/sql/undo/parser/parser_protobuf_test.go

+ 1445
- 0
pkg/datasource/sql/undo/parser/branch_undo_log.pb.go
File diff suppressed because it is too large
View File


+ 165
- 0
pkg/datasource/sql/undo/parser/branch_undo_log.proto View File

@@ -0,0 +1,165 @@
syntax = "proto3";
package parser;
import "google/protobuf/any.proto";

option go_package=".;parser";

message BranchUndoLog {
string Xid = 1;
uint64 BranchID = 2;
repeated SQLUndoLog Logs = 3;
}

message SQLUndoLog {
SQLType SQLType = 1;
string TableName = 2;
RecordImage BeforeImage = 3;
RecordImage AfterImage = 4;
}

message RecordImage {
int32 index = 1;
string TableName = 2;
SQLType SQLType = 3;
repeated RowImage Rows = 4;
TableMeta TableMeta = 5;
}

message RowImage {
repeated ColumnImage Columns = 1;
}

message ColumnImage {
IndexType KeyType = 1;
string ColumnName = 2;
JDBCType ColumnType = 3;
google.protobuf.Any Value = 4;
}

message TableMeta {
string TableName = 1;
map<string, ColumnMeta> Columns = 2;
map<string, IndexMeta> Indexs = 3;
repeated string ColumnNames = 4;
}

message ColumnMeta {
string Schema = 1;
string Table = 2;
bytes ColumnDef = 3;
bool Autoincrement = 4;
string ColumnName = 5;
string ColumnType = 6;
int32 DatabaseType = 7;
string DatabaseTypeString = 8;
string ColumnKey = 9;
int32 IsNullable = 10;
string Extra = 11;
}

message IndexMeta {
string Schema = 1;
string Table = 2;
string Name = 3;
string ColumnName = 4;
IndexType IType = 5;
repeated ColumnMeta Columns = 6;
}

enum IndexType {
IndexTypeNull = 0;
IndexTypePrimaryKey = 1;
}

enum JDBCType {
JDBCTypeNull = 0;
JDBCTypeBit = -7;
JDBCTypeTinyInt = -6;
JDBCTypeSmallInt = 5;
JDBCTypeInteger = 4;
JDBCTypeBigInt = -5;
JDBCTypeFloat = 6;
JDBCTypeReal = 7;
JDBCTypeDouble = 8;
JDBCTypeNumberic = 2;
JDBCTypeDecimal = 3;
JDBCTypeChar = 1;
JDBCTypeVarchar = 12;
JDBCTypeLongVarchar = -1;
JDBCTypeDate = 91;
JDBCTypeTime = 92;
JDBCTypeTimestamp = 93;
JDBCTypeBinary = -2;
JDBCTypeVarBinary = -3;
JDBCTypeLongVarBinary = -4;
JDBCTypeOther = 1111;
JDBCTypeJavaObject = 2000;
JDBCTypeDistinct = 2001;
JDBCTypeStruct = 2002;
JDBCTypeArray = 2003;
JDBCTypeBlob = 2004;
JDBCTypeClob = 2005;
JDBCTypeRef = 2006;
JDBCTypeDateLink = 70;
JDBCTypeBoolean = 16;
JDBCTypeRowID = -8;
JDBCTypeNchar = -15;
JDBCTypeNvarchar = -9;
JDBCTypeLongNvVarchar = -16;
JDBCTypeNclob = 2011;
JDBCTypeSqlXML = 2009;
JDBCTypeRefCursor = 2012;
JDBCTypeTimeWithTimeZone = 2013;
JDBCTypeTimestampWithTimezone = 2014;
}

enum SQLType {
SQLTypeSelect = 0;
SQLTypeInsert = 1;
SQLTypeUpdate = 2;
SQLTypeDelete = 3;
SQLTypeSelectForUpdate = 4;
SQLTypeReplace = 5;
SQLTypeTruncate = 6;
SQLTypeCreate = 7;
SQLTypeDrop = 8;
SQLTypeLoad = 9;
SQLTypeMerge = 10;
SQLTypeShow = 11;
SQLTypeAlter = 12;
SQLTypeRename = 13;
SQLTypeDump = 14;
SQLTypeDebug = 15;
SQLTypeExplain = 16;
SQLTypeProcedure = 17;
SQLTypeDesc = 18;
SQLLastInsertID = 19;
SQLSelectWithoutTable = 20;
SQLCreateSequence = 21;
SQLShowSequence = 22;
SQLGetSequence = 23;
SQLAlterSequence = 24;
SQLDropSequence = 25;
SQLTddlShow = 26;
SQLTypeSet = 27;
SQLTypeReload = 28;
SQLTypeSelectUnion = 29;
SQLTypeCreateTable = 30;
SQLTypeDropTable = 31;
SQLTypeAlterTable = 32;
SQLTypeSavePoint = 33;
SQLTypeSelectFromUpdate = 34;
SQLTypeMultiDelete = 35;
SQLTypeMultiUpdate = 36;
SQLTypeCreateIndex = 37;
SQLTypeDropIndex = 38;
SQLTypeKill = 39;
SQLTypeLockTables = 40;
SQLTypeUnLockTables = 41;
SQLTypeCheckTable = 42;
SQLTypeSelectFoundRows = 43;
SQLTypeInsertIgnore = 101; // Adjusted the value to match Go's logic
SQLTypeInsertOnDuplicateUpdate = 102;
SQLTypeMulti = 1044; // Adjusted the value to match Go's logic
SQLTypeUnknown = 1045;
}

+ 1
- 0
pkg/datasource/sql/undo/parser/parser_cache.go View File

@@ -39,6 +39,7 @@ func initCache() {
}

cache.store(&JsonParser{})
cache.store(&ProtobufParser{})
}

func GetCache() *UndoLogParserCache {


+ 4
- 4
pkg/datasource/sql/undo/parser/parser_json_test.go View File

@@ -25,15 +25,15 @@ import (
"seata.apache.org/seata-go/pkg/datasource/sql/undo"
)

func TestGetName(t *testing.T) {
func TestJsonGetName(t *testing.T) {
assert.Equal(t, "json", (&JsonParser{}).GetName())
}

func TestGetDefaultContext(t *testing.T) {
func TestJsonGetDefaultContext(t *testing.T) {
assert.Equal(t, []byte("{}"), (&JsonParser{}).GetDefaultContent())
}

func TestEncode(t *testing.T) {
func TestJsonEncode(t *testing.T) {
TestCases := []struct {
CaseName string
UndoLog *undo.BranchUndoLog
@@ -67,7 +67,7 @@ func TestEncode(t *testing.T) {
}
}

func TestDecode(t *testing.T) {
func TestJsonDecode(t *testing.T) {
TestCases := []struct {
CaseName string
ExpectUndoLog undo.BranchUndoLog


+ 248
- 0
pkg/datasource/sql/undo/parser/parser_protobuf.go View File

@@ -0,0 +1,248 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package parser

import (
"encoding/json"

"github.com/golang/protobuf/ptypes/any"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"seata.apache.org/seata-go/pkg/datasource/sql/types"
"seata.apache.org/seata-go/pkg/datasource/sql/undo"
)

type ProtobufParser struct {
}

// GetName get the name of parser
func (p *ProtobufParser) GetName() string {
return "protobuf"
}

// GetDefaultContent get default content of this parser
func (p *ProtobufParser) GetDefaultContent() []byte {
return []byte{}
}

// Encode branch undo log to byte array
func (p *ProtobufParser) Encode(branchUndoLog *undo.BranchUndoLog) ([]byte, error) {
protoLog := ConvertToProto(branchUndoLog)
return proto.Marshal(protoLog)
}

// Decode byte array to branch undo log
func (p *ProtobufParser) Decode(data []byte) (*undo.BranchUndoLog, error) {
branchUndoLog := &BranchUndoLog{}
err := proto.Unmarshal(data, branchUndoLog)
if err != nil {
return nil, err
}

return ConvertToIntree(branchUndoLog), nil
}

func ConvertToProto(intreeLog *undo.BranchUndoLog) *BranchUndoLog {
protoLog := &BranchUndoLog{
Xid: intreeLog.Xid,
BranchID: intreeLog.BranchID,
Logs: []*SQLUndoLog{},
}
for _, undolog := range intreeLog.Logs {
protolog := &SQLUndoLog{
SQLType: SQLType(undolog.SQLType),
TableName: undolog.TableName,
}

if undolog.BeforeImage != nil {
protolog.BeforeImage = &RecordImage{
TableName: undolog.BeforeImage.TableName,
SQLType: SQLType(undolog.BeforeImage.SQLType),
Rows: []*RowImage{},
}

for _, row := range undolog.BeforeImage.Rows {
protoRow := &RowImage{
Columns: []*ColumnImage{},
}

for _, col := range row.Columns {
anyValue, err := convertInterfaceToAny(col.GetActualValue())
if err != nil {
continue
}

protoCol := &ColumnImage{
KeyType: IndexType(col.KeyType),
ColumnName: col.ColumnName,
ColumnType: JDBCType(col.ColumnType),
Value: anyValue,
}

protoRow.Columns = append(protoRow.Columns, protoCol)
}

protolog.BeforeImage.Rows = append(protolog.BeforeImage.Rows, protoRow)
}
}

if undolog.AfterImage != nil {
protolog.AfterImage = &RecordImage{
TableName: undolog.AfterImage.TableName,
SQLType: SQLType(undolog.AfterImage.SQLType),
Rows: []*RowImage{},
}

for _, row := range undolog.AfterImage.Rows {
protoRow := &RowImage{
Columns: []*ColumnImage{},
}

for _, col := range row.Columns {
anyValue, err := convertInterfaceToAny(col.Value)
if err != nil {
continue
}

protoCol := &ColumnImage{
KeyType: IndexType(col.KeyType),
ColumnName: col.ColumnName,
ColumnType: JDBCType(col.ColumnType),
Value: anyValue,
}

protoRow.Columns = append(protoRow.Columns, protoCol)
}

protolog.AfterImage.Rows = append(protolog.AfterImage.Rows, protoRow)
}
}

protoLog.Logs = append(protoLog.Logs, protolog)
}
return protoLog
}

func ConvertToIntree(protoLog *BranchUndoLog) *undo.BranchUndoLog {
intreeLog := &undo.BranchUndoLog{
Xid: protoLog.Xid,
BranchID: protoLog.BranchID,
Logs: []undo.SQLUndoLog{},
}

for _, pbSqlLog := range protoLog.Logs {
undoSqlLog := undo.SQLUndoLog{
SQLType: types.SQLType(pbSqlLog.SQLType),
TableName: pbSqlLog.TableName,
}

if pbSqlLog.BeforeImage != nil {
undoSqlLog.BeforeImage = &types.RecordImage{
TableName: pbSqlLog.BeforeImage.TableName,
SQLType: types.SQLType(pbSqlLog.BeforeImage.SQLType),
Rows: []types.RowImage{},
}

for _, pbRow := range pbSqlLog.BeforeImage.Rows {
undoRow := types.RowImage{
Columns: []types.ColumnImage{},
}

for _, pbCol := range pbRow.Columns {
anyValue, err := convertAnyToInterface(pbCol.Value)
if err != nil {
continue
}

undoCol := types.ColumnImage{
KeyType: types.IndexType(pbCol.KeyType),
ColumnName: pbCol.ColumnName,
ColumnType: types.JDBCType(pbCol.ColumnType),
Value: anyValue,
}

undoRow.Columns = append(undoRow.Columns, undoCol)
}

undoSqlLog.BeforeImage.Rows = append(undoSqlLog.BeforeImage.Rows, undoRow)
}
}

if pbSqlLog.AfterImage != nil {
undoSqlLog.AfterImage = &types.RecordImage{
TableName: pbSqlLog.AfterImage.TableName,
SQLType: types.SQLType(pbSqlLog.AfterImage.SQLType),
Rows: []types.RowImage{},
}

for _, pbRow := range pbSqlLog.AfterImage.Rows {
undoRow := types.RowImage{
Columns: []types.ColumnImage{},
}

for _, pbCol := range pbRow.Columns {
anyValue, err := convertAnyToInterface(pbCol.Value)
if err != nil {
continue
}

undoCol := types.ColumnImage{
KeyType: types.IndexType(pbCol.KeyType),
ColumnName: pbCol.ColumnName,
ColumnType: types.JDBCType(pbCol.ColumnType),
Value: anyValue,
}

undoRow.Columns = append(undoRow.Columns, undoCol)
}

undoSqlLog.AfterImage.Rows = append(undoSqlLog.AfterImage.Rows, undoRow)
}
}

intreeLog.Logs = append(intreeLog.Logs, undoSqlLog)
}

return intreeLog
}

func convertAnyToInterface(anyValue *any.Any) (interface{}, error) {
var value interface{}
bytesValue := &wrappers.BytesValue{}
err := anypb.UnmarshalTo(anyValue, bytesValue, proto.UnmarshalOptions{})
if err != nil {
return value, err
}
uErr := json.Unmarshal(bytesValue.Value, &value)
if uErr != nil {
return value, uErr
}
return value, nil
}

func convertInterfaceToAny(v interface{}) (*any.Any, error) {
anyValue := &any.Any{}
bytes, _ := json.Marshal(v)
bytesValue := &wrappers.BytesValue{
Value: bytes,
}
err := anypb.MarshalFrom(anyValue, bytesValue, proto.MarshalOptions{})
return anyValue, err
}

+ 91
- 0
pkg/datasource/sql/undo/parser/parser_protobuf_test.go View File

@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package parser

import (
"testing"

"github.com/stretchr/testify/assert"
"seata.apache.org/seata-go/pkg/datasource/sql/undo"
)

func TestProtobufGetName(t *testing.T) {
assert.Equal(t, "protobuf", (&ProtobufParser{}).GetName())
}

func TestProtobufDefaultContext(t *testing.T) {
assert.Equal(t, []byte{}, (&ProtobufParser{}).GetDefaultContent())
}

func TestProtobufEncodeDecode(t *testing.T) {
TestCases := []struct {
CaseName string
UndoLog *undo.BranchUndoLog
ExpectErr bool
}{
{
CaseName: "pass",
UndoLog: &undo.BranchUndoLog{
Xid: "123456",
BranchID: 123456,
Logs: []undo.SQLUndoLog{},
},
ExpectErr: false,
},
}

for _, Case := range TestCases {
t.Run(Case.CaseName, func(t *testing.T) {
parser := &ProtobufParser{}

encodedBytes, err := parser.Encode(Case.UndoLog)
if Case.ExpectErr {
assert.NotNil(t, err)
return
}
assert.Nil(t, err)

decodedUndoLog, err := parser.Decode(encodedBytes)
if Case.ExpectErr {
assert.NotNil(t, err)
return
}
assert.Nil(t, err)

assert.Equal(t, Case.UndoLog.Xid, decodedUndoLog.Xid)
assert.Equal(t, Case.UndoLog.BranchID, decodedUndoLog.BranchID)
assert.Equal(t, len(Case.UndoLog.Logs), len(decodedUndoLog.Logs))
})
}
}

func TestConvertInterfaceToAnyAndBack(t *testing.T) {
originalValue := map[string]interface{}{
"key1": "value1",
"key2": float64(123), // Use float64 to match JSON default behavior
"key3": true,
}

anyValue, err := convertInterfaceToAny(originalValue)
assert.NoError(t, err, "convertInterfaceToAny should not return an error")

convertedValue, err := convertAnyToInterface(anyValue)
assert.NoError(t, err, "convertAnyToInterface should not return an error")

assert.Equal(t, originalValue, convertedValue, "The converted value should match the original")
}

Loading…
Cancel
Save