@@ -13,6 +13,7 @@ require ( | |||
github.com/go-sql-driver/mysql v1.6.0 | |||
github.com/golang/mock v1.6.0 | |||
github.com/google/uuid v1.3.0 | |||
github.com/mitchellh/copystructure v1.2.0 | |||
github.com/natefinch/lumberjack v2.0.0+incompatible | |||
github.com/parnurzeal/gorequest v0.2.16 | |||
github.com/pierrec/lz4/v4 v4.1.17 | |||
@@ -85,7 +86,6 @@ require ( | |||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect | |||
github.com/magiconair/properties v1.8.6 // indirect | |||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect | |||
github.com/mitchellh/copystructure v1.2.0 // indirect | |||
github.com/mitchellh/go-homedir v1.1.0 // indirect | |||
github.com/mitchellh/mapstructure v1.5.0 // indirect | |||
github.com/mitchellh/reflectwalk v1.0.2 // indirect | |||
@@ -102,16 +102,7 @@ func (m *mysqlTrigger) getColumnMetas(ctx context.Context, dbName string, table | |||
} | |||
defer rows.Close() | |||
var columnTypes []*sql.ColumnType | |||
i := 0 | |||
for rows.Next() { | |||
if columnTypes == nil { | |||
columnTypes, err = rows.ColumnTypes() | |||
if err != nil { | |||
return nil, err | |||
} | |||
} | |||
var ( | |||
tableName string | |||
tableSchema string | |||
@@ -124,7 +115,6 @@ func (m *mysqlTrigger) getColumnMetas(ctx context.Context, dbName string, table | |||
) | |||
columnMeta := types.ColumnMeta{} | |||
if err = rows.Scan( | |||
&tableName, | |||
&tableSchema, | |||
@@ -140,10 +130,10 @@ func (m *mysqlTrigger) getColumnMetas(ctx context.Context, dbName string, table | |||
columnMeta.Schema = tableSchema | |||
columnMeta.Table = tableName | |||
columnMeta.ColumnName = strings.Trim(columnName, "` ") | |||
columnMeta.DataType = types.GetSqlDataType(dataType) | |||
columnMeta.DatabaseType = types.GetSqlDataType(dataType) | |||
columnMeta.DatabaseTypeString = dataType | |||
columnMeta.ColumnType = columnType | |||
columnMeta.ColumnKey = columnKey | |||
columnMeta.ColumnTypeInfo = columnTypes[i] | |||
if strings.ToLower(isNullable) == "yes" { | |||
columnMeta.IsNullable = 1 | |||
} else { | |||
@@ -153,7 +143,6 @@ func (m *mysqlTrigger) getColumnMetas(ctx context.Context, dbName string, table | |||
columnMeta.Autoincrement = strings.Contains(strings.ToLower(extra), "auto_increment") | |||
columnMetas = append(columnMetas, columnMeta) | |||
i++ | |||
} | |||
if len(columnMetas) == 0 { | |||
@@ -20,6 +20,7 @@ package types | |||
import ( | |||
"database/sql" | |||
"reflect" | |||
"strings" | |||
) | |||
// https://dev.mysql.com/doc/internals/en/com-query-response.html#packet-Protocol::ColumnType | |||
@@ -125,89 +126,231 @@ const ( | |||
JDBCTypeTimestampWithTimezone JDBCType = 2014 | |||
) | |||
// todo perfect all type name | |||
func GetJDBCTypeByTypeName(typeName string) JDBCType { | |||
switch typeName { | |||
type MySQLDefCode int64 | |||
var ( | |||
COM_BINLOG_DUMP MySQLDefCode = 18 | |||
COM_CHANGE_USER MySQLDefCode = 17 | |||
COM_CLOSE_STATEMENT MySQLDefCode = 25 | |||
COM_CONNECT_OUT MySQLDefCode = 20 | |||
COM_END MySQLDefCode = 29 | |||
COM_EXECUTE MySQLDefCode = 23 | |||
COM_FETCH MySQLDefCode = 28 | |||
COM_LONG_DATA MySQLDefCode = 24 | |||
COM_PREPARE MySQLDefCode = 22 | |||
COM_REGISTER_SLAVE MySQLDefCode = 21 | |||
COM_RESET_STMT MySQLDefCode = 26 | |||
COM_SET_OPTION MySQLDefCode = 27 | |||
COM_TABLE_DUMP MySQLDefCode = 19 | |||
CONNECT MySQLDefCode = 11 | |||
CREATE_DB MySQLDefCode = 5 // Not used; deprecated? | |||
DEBUG MySQLDefCode = 13 | |||
DELAYED_INSERT MySQLDefCode = 16 | |||
DROP_DB MySQLDefCode = 6 // Not used; deprecated? | |||
FIELD_LIST MySQLDefCode = 4 // Not used; deprecated in MySQL 5.7.11 and MySQL 8.0.0. | |||
FIELD_TYPE_BIT MySQLDefCode = 16 | |||
FIELD_TYPE_BLOB MySQLDefCode = 252 | |||
FIELD_TYPE_DATE MySQLDefCode = 10 | |||
FIELD_TYPE_DATETIME MySQLDefCode = 12 | |||
// Data Types | |||
FIELD_TYPE_DECIMAL MySQLDefCode = 0 | |||
FIELD_TYPE_DOUBLE MySQLDefCode = 5 | |||
FIELD_TYPE_ENUM MySQLDefCode = 247 | |||
FIELD_TYPE_FLOAT MySQLDefCode = 4 | |||
FIELD_TYPE_GEOMETRY MySQLDefCode = 255 | |||
FIELD_TYPE_INT24 MySQLDefCode = 9 | |||
FIELD_TYPE_LONG MySQLDefCode = 3 | |||
FIELD_TYPE_LONG_BLOB MySQLDefCode = 251 | |||
FIELD_TYPE_LONGLONG MySQLDefCode = 8 | |||
FIELD_TYPE_MEDIUM_BLOB MySQLDefCode = 250 | |||
FIELD_TYPE_NEW_DECIMAL MySQLDefCode = 246 | |||
FIELD_TYPE_NEWDATE MySQLDefCode = 14 | |||
FIELD_TYPE_NULL MySQLDefCode = 6 | |||
FIELD_TYPE_SET MySQLDefCode = 248 | |||
FIELD_TYPE_SHORT MySQLDefCode = 2 | |||
FIELD_TYPE_STRING MySQLDefCode = 254 | |||
FIELD_TYPE_TIME MySQLDefCode = 11 | |||
FIELD_TYPE_TIMESTAMP MySQLDefCode = 7 | |||
FIELD_TYPE_TINY MySQLDefCode = 1 | |||
// Older data types | |||
FIELD_TYPE_TINY_BLOB MySQLDefCode = 249 | |||
FIELD_TYPE_VAR_STRING MySQLDefCode = 253 | |||
FIELD_TYPE_VARCHAR MySQLDefCode = 15 | |||
// Newer data types | |||
FIELD_TYPE_YEAR MySQLDefCode = 13 | |||
FIELD_TYPE_JSON MySQLDefCode = 245 | |||
INIT_DB MySQLDefCode = 2 | |||
LENGTH_BLOB MySQLDefCode = 65535 | |||
LENGTH_LONGBLOB MySQLDefCode = 4294967295 | |||
LENGTH_MEDIUMBLOB MySQLDefCode = 16777215 | |||
LENGTH_TINYBLOB MySQLDefCode = 255 | |||
// Limitations | |||
MAX_ROWS MySQLDefCode = 50000000 // From the MySQL FAQ | |||
/** | |||
* Used to indicate that the server sent no field-level character set information, so the driver should use the connection-level character encoding instead. | |||
*/ | |||
NO_CHARSET_INFO MySQLDefCode = -1 | |||
OPEN_CURSOR_FLAG MySQLDefCode = 1 | |||
PING MySQLDefCode = 14 | |||
PROCESS_INFO MySQLDefCode = 10 // Not used; deprecated in MySQL 5.7.11 and MySQL 8.0.0. | |||
PROCESS_KILL MySQLDefCode = 12 // Not used; deprecated in MySQL 5.7.11 and MySQL 8.0.0. | |||
QUERY MySQLDefCode = 3 | |||
QUIT MySQLDefCode = 1 | |||
RELOAD MySQLDefCode = 7 // Not used; deprecated in MySQL 5.7.11 and MySQL 8.0.0. | |||
SHUTDOWN MySQLDefCode = 8 // Deprecated in MySQL 5.7.9 and MySQL 8.0.0. | |||
// | |||
// Constants defined from mysql | |||
// | |||
// DB Operations | |||
SLEEP MySQLDefCode = 0 | |||
STATISTICS MySQLDefCode = 9 | |||
TIME MySQLDefCode = 15 | |||
) | |||
func MySQLCodeToJava(mysqlType MySQLDefCode) JDBCType { | |||
var jdbcType JDBCType | |||
switch mysqlType { | |||
case FIELD_TYPE_NEW_DECIMAL, FIELD_TYPE_DECIMAL: | |||
jdbcType = JDBCTypeDecimal | |||
case FIELD_TYPE_TINY: | |||
jdbcType = JDBCTypeTinyInt | |||
case FIELD_TYPE_SHORT: | |||
jdbcType = JDBCTypeSmallInt | |||
case FIELD_TYPE_LONG: | |||
jdbcType = JDBCTypeInteger | |||
case FIELD_TYPE_FLOAT: | |||
jdbcType = JDBCTypeReal | |||
case FIELD_TYPE_DOUBLE: | |||
jdbcType = JDBCTypeDouble | |||
case FIELD_TYPE_NULL: | |||
jdbcType = JDBCTypeNull | |||
case FIELD_TYPE_TIMESTAMP: | |||
jdbcType = JDBCTypeTimestamp | |||
case FIELD_TYPE_LONGLONG: | |||
jdbcType = JDBCTypeBigInt | |||
case FIELD_TYPE_INT24: | |||
jdbcType = JDBCTypeInteger | |||
case FIELD_TYPE_DATE: | |||
jdbcType = JDBCTypeDate | |||
case FIELD_TYPE_TIME: | |||
jdbcType = JDBCTypeTime | |||
case FIELD_TYPE_DATETIME: | |||
jdbcType = JDBCTypeTimestamp | |||
case FIELD_TYPE_YEAR: | |||
jdbcType = JDBCTypeDate | |||
case FIELD_TYPE_NEWDATE: | |||
jdbcType = JDBCTypeDate | |||
case FIELD_TYPE_ENUM: | |||
jdbcType = JDBCTypeChar | |||
case FIELD_TYPE_SET: | |||
jdbcType = JDBCTypeChar | |||
case FIELD_TYPE_TINY_BLOB: | |||
jdbcType = JDBCTypeVarBinary | |||
case FIELD_TYPE_MEDIUM_BLOB: | |||
jdbcType = JDBCTypeLongVarBinary | |||
case FIELD_TYPE_LONG_BLOB: | |||
jdbcType = JDBCTypeLongVarBinary | |||
case FIELD_TYPE_BLOB: | |||
jdbcType = JDBCTypeLongVarBinary | |||
case FIELD_TYPE_VAR_STRING, FIELD_TYPE_VARCHAR: | |||
jdbcType = JDBCTypeVarchar | |||
case FIELD_TYPE_JSON, FIELD_TYPE_STRING: | |||
jdbcType = JDBCTypeChar | |||
case FIELD_TYPE_GEOMETRY: | |||
jdbcType = JDBCTypeBinary | |||
case FIELD_TYPE_BIT: | |||
jdbcType = JDBCTypeBit | |||
default: | |||
jdbcType = JDBCTypeVarchar | |||
} | |||
return jdbcType | |||
} | |||
func MySQLStrToJavaType(mysqlType string) JDBCType { | |||
switch strings.ToUpper(mysqlType) { | |||
case "BIT": | |||
return JDBCTypeBit | |||
case "TEXT": | |||
return JDBCTypeLongVarchar | |||
case "BLOB": | |||
return JDBCTypeBlob | |||
case "DATE": | |||
return JDBCTypeDate | |||
case "DATETIME": | |||
return JDBCTypeTimestamp | |||
return MySQLCodeToJava(FIELD_TYPE_BIT) | |||
case "TINYINT": | |||
return MySQLCodeToJava(FIELD_TYPE_TINY) | |||
case "SMALLINT": | |||
return MySQLCodeToJava(FIELD_TYPE_SHORT) | |||
case "MEDIUMINT": | |||
return MySQLCodeToJava(FIELD_TYPE_INT24) | |||
case "INT", "INTEGER": | |||
return MySQLCodeToJava(FIELD_TYPE_LONG) | |||
case "BIGINT": | |||
return MySQLCodeToJava(FIELD_TYPE_LONGLONG) | |||
case "INT24": | |||
return MySQLCodeToJava(FIELD_TYPE_INT24) | |||
case "REAL": | |||
return MySQLCodeToJava(FIELD_TYPE_DOUBLE) | |||
case "FLOAT": | |||
return MySQLCodeToJava(FIELD_TYPE_FLOAT) | |||
case "DECIMAL": | |||
return JDBCTypeDecimal | |||
return MySQLCodeToJava(FIELD_TYPE_DECIMAL) | |||
case "NUMERIC": | |||
return MySQLCodeToJava(FIELD_TYPE_DECIMAL) | |||
case "DOUBLE": | |||
return JDBCTypeDouble | |||
return MySQLCodeToJava(FIELD_TYPE_DOUBLE) | |||
case "CHAR": | |||
return MySQLCodeToJava(FIELD_TYPE_STRING) | |||
case "VARCHAR": | |||
return MySQLCodeToJava(FIELD_TYPE_VAR_STRING) | |||
case "DATE": | |||
return MySQLCodeToJava(FIELD_TYPE_DATE) | |||
case "TIME": | |||
return MySQLCodeToJava(FIELD_TYPE_TIME) | |||
case "YEAR": | |||
return MySQLCodeToJava(FIELD_TYPE_YEAR) | |||
case "TIMESTAMP": | |||
return MySQLCodeToJava(FIELD_TYPE_TIMESTAMP) | |||
case "DATETIME": | |||
return MySQLCodeToJava(FIELD_TYPE_DATETIME) | |||
case "TINYBLOB": | |||
return JDBCTypeBinary | |||
case "BLOB": | |||
return JDBCTypeLongVarBinary | |||
case "MEDIUMBLOB": | |||
return JDBCTypeLongVarBinary | |||
case "LONGBLOB": | |||
return JDBCTypeLongVarBinary | |||
case "TINYTEXT": | |||
return JDBCTypeVarchar | |||
case "TEXT": | |||
return JDBCTypeLongVarchar | |||
case "MEDIUMTEXT": | |||
return JDBCTypeLongVarchar | |||
case "LONGTEXT": | |||
return JDBCTypeLongVarchar | |||
case "ENUM": | |||
return JDBCTypeTinyInt | |||
// todo 待完善 | |||
// case fieldTypeEnum: | |||
// return "ENUM" | |||
// case fieldTypeFloat: | |||
// return "FLOAT" | |||
// case fieldTypeGeometry: | |||
// return "GEOMETRY" | |||
// case fieldTypeInt24: | |||
// return "MEDIUMINT" | |||
// case fieldTypeJSON: | |||
// return "JSON" | |||
// case fieldTypeLong: | |||
// return "INT" | |||
// case fieldTypeLongBLOB: | |||
// if mf.charSet != collations[binaryCollation] { | |||
// return "LONGTEXT" | |||
// } | |||
// return "LONGBLOB" | |||
// case fieldTypeLongLong: | |||
// return "BIGINT" | |||
// case fieldTypeMediumBLOB: | |||
// if mf.charSet != collations[binaryCollation] { | |||
// return "MEDIUMTEXT" | |||
// } | |||
// return "MEDIUMBLOB" | |||
// case fieldTypeNewDate: | |||
// return "DATE" | |||
// case fieldTypeNewDecimal: | |||
// return "DECIMAL" | |||
// case fieldTypeNULL: | |||
// return "NULL" | |||
// case fieldTypeSet: | |||
// return "SET" | |||
// case fieldTypeShort: | |||
// return "SMALLINT" | |||
// case fieldTypeString: | |||
// if mf.charSet == collations[binaryCollation] { | |||
// return "BINARY" | |||
// } | |||
// return "CHAR" | |||
// case fieldTypeTime: | |||
// return "TIME" | |||
// case fieldTypeTimestamp: | |||
// return "TIMESTAMP" | |||
// case fieldTypeTiny: | |||
// return "TINYINT" | |||
// case fieldTypeTinyBLOB: | |||
// if mf.charSet != collations[binaryCollation] { | |||
// return "TINYTEXT" | |||
// } | |||
// return "TINYBLOB" | |||
// case fieldTypeVarChar: | |||
// if mf.charSet == collations[binaryCollation] { | |||
// return "VARBINARY" | |||
// } | |||
// return "VARCHAR" | |||
// case fieldTypeVarString: | |||
// if mf.charSet == collations[binaryCollation] { | |||
// return "VARBINARY" | |||
// } | |||
// return "VARCHAR" | |||
// case fieldTypeYear: | |||
// return "YEAR" | |||
return MySQLCodeToJava(FIELD_TYPE_ENUM) | |||
case "SET": | |||
return MySQLCodeToJava(FIELD_TYPE_SET) | |||
case "GEOMETRY": | |||
return MySQLCodeToJava(FIELD_TYPE_GEOMETRY) | |||
case "BINARY": | |||
return JDBCTypeBinary | |||
// no concrete type on the wire | |||
case "VARBINARY": | |||
return JDBCTypeVarBinary | |||
case "JSON": | |||
return MySQLCodeToJava(FIELD_TYPE_JSON) | |||
default: | |||
return -1 | |||
// Punt | |||
return JDBCTypeOther | |||
} | |||
} |
@@ -17,6 +17,13 @@ | |||
package types | |||
import ( | |||
"encoding/base64" | |||
"encoding/json" | |||
"fmt" | |||
"time" | |||
) | |||
// RoundRecordImage Front and rear mirror data | |||
type RoundRecordImage struct { | |||
bIndex int32 | |||
@@ -138,14 +145,109 @@ func (r *RowImage) NonPrimaryKeys(cols []ColumnImage) []ColumnImage { | |||
return nonPkFields | |||
} | |||
var _ json.Unmarshaler = (*ColumnImage)(nil) | |||
var _ json.Marshaler = (*ColumnImage)(nil) | |||
type CommonValue struct { | |||
Value interface{} | |||
} | |||
// ColumnImage The mirror data information of the column | |||
type ColumnImage struct { | |||
// KeyType index type | |||
KeyType IndexType `json:"keyType"` | |||
// ColumnName column name | |||
ColumnName string `json:"name"` | |||
// Type column type | |||
Type int16 `json:"type"` | |||
// ColumnType column type | |||
ColumnType JDBCType `json:"type"` | |||
// Value column value | |||
Value interface{} `json:"value"` | |||
} | |||
type columnImageAlias ColumnImage | |||
func (c *ColumnImage) MarshalJSON() ([]byte, error) { | |||
if c == nil || c.Value == nil { | |||
return json.Marshal(*c) | |||
} | |||
value := c.Value | |||
if t, ok := c.Value.(time.Time); ok { | |||
value = t.Format(time.RFC3339Nano) | |||
} | |||
return json.Marshal(&columnImageAlias{ | |||
KeyType: c.KeyType, | |||
ColumnName: c.ColumnName, | |||
ColumnType: c.ColumnType, | |||
Value: value, | |||
}) | |||
} | |||
func (c *ColumnImage) UnmarshalJSON(data []byte) error { | |||
var err error | |||
tmpImage := make(map[string]interface{}) | |||
if err := json.Unmarshal(data, &tmpImage); err != nil { | |||
return err | |||
} | |||
var ( | |||
keyType string | |||
columnType int16 | |||
columnName string | |||
value interface{} | |||
actualValue interface{} | |||
) | |||
keyType = tmpImage["keyType"].(string) | |||
columnType = int16(int64(tmpImage["type"].(float64))) | |||
columnName = tmpImage["name"].(string) | |||
value = tmpImage["value"] | |||
if value != nil { | |||
switch JDBCType(columnType) { | |||
case JDBCTypeReal: // 4 Bytes | |||
actualValue = value.(float32) | |||
case JDBCTypeDecimal, JDBCTypeDouble: // 8 Bytes | |||
actualValue = value.(float64) | |||
case JDBCTypeTinyInt: // 1 Bytes | |||
actualValue = int8(value.(float64)) | |||
case JDBCTypeSmallInt: // 2 Bytes | |||
actualValue = int16(value.(float64)) | |||
case JDBCTypeInteger: // 4 Bytes | |||
actualValue = int32(value.(float64)) | |||
case JDBCTypeBigInt: // 8Bytes | |||
actualValue = int64(value.(float64)) | |||
case JDBCTypeTimestamp: // 4 Bytes | |||
actualValue, err = time.Parse(time.RFC3339Nano, value.(string)) | |||
if err != nil { | |||
return err | |||
} | |||
case JDBCTypeDate: // 3Bytes | |||
actualValue, err = time.Parse(time.RFC3339Nano, value.(string)) | |||
if err != nil { | |||
return err | |||
} | |||
case JDBCTypeTime: // 3Bytes | |||
actualValue, err = time.Parse(time.RFC3339Nano, value.(string)) | |||
if err != nil { | |||
return err | |||
} | |||
case JDBCTypeChar, JDBCTypeVarchar: | |||
var val []byte | |||
if val, err = base64.StdEncoding.DecodeString(value.(string)); err != nil { | |||
return err | |||
} | |||
actualValue = string(val) | |||
case JDBCTypeBinary, JDBCTypeVarBinary, JDBCTypeLongVarBinary, JDBCTypeBit: | |||
actualValue = value | |||
} | |||
} | |||
*c = ColumnImage{ | |||
KeyType: ParseIndexType(keyType), | |||
ColumnName: columnName, | |||
ColumnType: JDBCType(columnType), | |||
Value: actualValue, | |||
} | |||
return nil | |||
} | |||
func getTypeStr(src interface{}) string { | |||
return fmt.Sprintf("%T", src) | |||
} |
@@ -0,0 +1,90 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package types | |||
import ( | |||
"encoding/json" | |||
"testing" | |||
"time" | |||
"github.com/stretchr/testify/assert" | |||
) | |||
func TestColumnImage_UnmarshalJSON(t *testing.T) { | |||
now := time.Now() | |||
tests := []struct { | |||
name string | |||
image *ColumnImage | |||
expectValue interface{} | |||
}{ | |||
{ | |||
name: "test-string", | |||
image: &ColumnImage{ | |||
KeyType: IndexTypePrimaryKey, | |||
ColumnName: "Name", | |||
ColumnType: JDBCTypeVarchar, | |||
Value: []byte("Seata-go"), | |||
}, | |||
expectValue: "Seata-go", | |||
}, | |||
{ | |||
name: "test-int", | |||
image: &ColumnImage{ | |||
KeyType: IndexTypeNull, | |||
ColumnName: "Age", | |||
ColumnType: JDBCTypeTinyInt, | |||
Value: int8(20), | |||
}, | |||
expectValue: int8(20), | |||
}, | |||
{ | |||
name: "test-double", | |||
image: &ColumnImage{ | |||
KeyType: IndexTypeNull, | |||
ColumnName: "Salary", | |||
ColumnType: JDBCTypeDouble, | |||
Value: 8899.778, | |||
}, | |||
expectValue: float64(8899.778), | |||
}, | |||
{ | |||
name: "test-time", | |||
image: &ColumnImage{ | |||
KeyType: IndexTypeNull, | |||
ColumnName: "CreateTime", | |||
ColumnType: JDBCTypeTime, | |||
Value: now, | |||
}, | |||
expectValue: now, | |||
}, | |||
} | |||
for _, tt := range tests { | |||
t.Run(tt.name, func(t *testing.T) { | |||
data, err := json.Marshal(tt.image) | |||
assert.Nil(t, err) | |||
var after ColumnImage | |||
err = json.Unmarshal(data, &after) | |||
assert.Nil(t, err) | |||
if ti, ok := tt.expectValue.(time.Time); ok { | |||
assert.Equal(t, ti.Format(time.RFC3339Nano), after.Value.(time.Time).Format(time.RFC3339Nano)) | |||
} else { | |||
assert.Equal(t, tt.expectValue, after.Value) | |||
} | |||
}) | |||
} | |||
} |
@@ -18,26 +18,24 @@ | |||
package types | |||
import ( | |||
"database/sql" | |||
"reflect" | |||
) | |||
// ColumnMeta | |||
type ColumnMeta struct { | |||
// Schema | |||
Schema string | |||
// Table | |||
Table string | |||
// ColumnTypeInfo | |||
ColumnTypeInfo *sql.ColumnType | |||
// Autoincrement | |||
Schema string | |||
Table string | |||
Autoincrement bool | |||
ColumnName string | |||
ColumnType string | |||
DataType int32 | |||
ColumnKey string | |||
IsNullable int8 | |||
Extra string | |||
// todo get columnType | |||
//ColumnTypeInfo *sql.ColumnType | |||
ColumnName string | |||
ColumnType string | |||
DatabaseType int32 | |||
DatabaseTypeString string | |||
ColumnKey string | |||
IsNullable int8 | |||
Extra string | |||
} | |||
type ColumnType struct { | |||
@@ -43,6 +43,13 @@ const ( | |||
IndexTypePrimaryKey IndexType = 1 | |||
) | |||
func ParseIndexType(str string) IndexType { | |||
if str == "PRIMARY_KEY" { | |||
return IndexTypePrimaryKey | |||
} | |||
return IndexTypeNull | |||
} | |||
func (i IndexType) MarshalText() (text []byte, err error) { | |||
switch i { | |||
case IndexTypePrimaryKey: | |||
@@ -43,57 +43,28 @@ func (*BasicUndoLogBuilder) GetScanSlice(columnNames []string, tableMeta *types. | |||
// 从metData获取该列的元信息 | |||
columnMeta = tableMeta.Columns[columnNmae] | |||
) | |||
switch columnMeta.ColumnTypeInfo.ScanType() { | |||
case types.ScanTypeFloat32: | |||
scanVal = float32(0) | |||
break | |||
case types.ScanTypeFloat64: | |||
scanVal = float64(0) | |||
break | |||
case types.ScanTypeInt8: | |||
scanVal = int8(0) | |||
break | |||
case types.ScanTypeInt16: | |||
scanVal = int16(0) | |||
break | |||
case types.ScanTypeInt32: | |||
scanVal = int32(0) | |||
break | |||
case types.ScanTypeInt64: | |||
scanVal = int64(0) | |||
break | |||
case types.ScanTypeNullFloat: | |||
scanVal = sql.NullFloat64{} | |||
break | |||
case types.ScanTypeNullInt: | |||
scanVal = sql.NullInt64{} | |||
break | |||
case types.ScanTypeNullTime: | |||
switch strings.ToUpper(columnMeta.DatabaseTypeString) { | |||
case "VARCHAR", "NVARCHAR", "VARCHAR2", "CHAR", "TEXT", "JSON", "TINYTEXT": | |||
scanVal = sql.RawBytes{} | |||
case "BIT", "INT", "LONGBLOB", "SMALLINT", "TINYINT", "BIGINT", "MEDIUMINT": | |||
if columnMeta.IsNullable == 0 { | |||
scanVal = int64(0) | |||
} else { | |||
scanVal = sql.NullInt64{} | |||
} | |||
case "DATE", "DATETIME", "TIME", "TIMESTAMP", "YEAR": | |||
scanVal = sql.NullTime{} | |||
break | |||
case types.ScanTypeUint8: | |||
scanVal = uint8(0) | |||
break | |||
case types.ScanTypeUint16: | |||
scanVal = uint16(0) | |||
break | |||
case types.ScanTypeUint32: | |||
scanVal = uint32(0) | |||
break | |||
case types.ScanTypeUint64: | |||
scanVal = uint64(0) | |||
break | |||
case types.ScanTypeRawBytes: | |||
case "DECIMAL", "DOUBLE", "FLOAT": | |||
if columnMeta.IsNullable == 0 { | |||
scanVal = float64(0) | |||
} else { | |||
scanVal = sql.NullFloat64{} | |||
} | |||
default: | |||
scanVal = sql.RawBytes{} | |||
break | |||
case types.ScanTypeUnknown: | |||
scanVal = new(interface{}) | |||
break | |||
} | |||
scanSlice = append(scanSlice, &scanVal) | |||
} | |||
return scanSlice | |||
} | |||
@@ -178,12 +149,12 @@ func (b *BasicUndoLogBuilder) buildRecordImages(rowsi driver.Rows, tableMetaData | |||
if _, ok := tableMetaData.GetPrimaryKeyMap()[name]; ok { | |||
keyType = types.IndexTypePrimaryKey | |||
} | |||
jdbcType := types.GetJDBCTypeByTypeName(columnMeta.ColumnTypeInfo.DatabaseTypeName()) | |||
jdbcType := types.MySQLStrToJavaType(columnMeta.DatabaseTypeString) | |||
columns = append(columns, types.ColumnImage{ | |||
KeyType: keyType, | |||
ColumnName: name, | |||
Type: int16(jdbcType), | |||
ColumnType: jdbcType, | |||
Value: ss[i], | |||
}) | |||
} | |||
@@ -109,6 +109,10 @@ func (u *MySQLUpdateUndoLogBuilder) AfterImage(ctx context.Context, execCtx *typ | |||
return []*types.RecordImage{{}}, nil | |||
} | |||
if beforeImages == nil || len(beforeImages) == 0 || len(beforeImages[0].Rows) == 0 { | |||
return beforeImages, nil | |||
} | |||
var beforeImage *types.RecordImage | |||
if len(beforeImages) > 0 { | |||
beforeImage = beforeImages[0] | |||
@@ -19,6 +19,7 @@ package executor | |||
import ( | |||
"fmt" | |||
"reflect" | |||
"strings" | |||
"github.com/seata/seata-go/pkg/datasource/sql/types" | |||
@@ -57,7 +58,7 @@ func compareRows(tableMeta types.TableMeta, oldRows []types.RowImage, newRows [] | |||
if newValue == nil { | |||
return false, fmt.Errorf("compare row failed, rowKey %s, fieldName %s, reason [newField is null]", key, fieldName) | |||
} | |||
if newValue != oldValue { | |||
if !reflect.DeepEqual(newValue, oldValue) { | |||
return false, nil | |||
} | |||
} | |||
@@ -32,7 +32,7 @@ var ( | |||
func initService() { | |||
var err error | |||
db, err = sql.Open(sql2.SeataATMySQLDriver, "root:123456@tcp(127.0.0.1:3306)/seata_client?multiStatements=true&interpolateParams=true") | |||
db, err = sql.Open(sql2.SeataATMySQLDriver, "root:12345678@tcp(127.0.0.1:3306)/seata_client?multiStatements=true&interpolateParams=true") | |||
if err != nil { | |||
panic("init service error") | |||
} | |||
@@ -23,6 +23,8 @@ import ( | |||
"fmt" | |||
"time" | |||
"github.com/pkg/errors" | |||
sql2 "github.com/seata/seata-go/pkg/datasource/sql" | |||
) | |||
@@ -32,7 +34,7 @@ var ( | |||
func initService() { | |||
var err error | |||
db, err = sql.Open(sql2.SeataATMySQLDriver, "root:123456@tcp(127.0.0.1:3306)/seata_client1?multiStatements=true&interpolateParams=true") | |||
db, err = sql.Open(sql2.SeataATMySQLDriver, "root:12345678@tcp(127.0.0.1:3306)/seata_client1?multiStatements=true&interpolateParams=true") | |||
if err != nil { | |||
panic("init service error") | |||
} | |||
@@ -52,5 +54,8 @@ func updateDataFail(ctx context.Context) error { | |||
return err | |||
} | |||
fmt.Printf("update success: %d.\n", rows) | |||
if rows == 0 { | |||
return errors.New("rows affected 0") | |||
} | |||
return nil | |||
} |
@@ -33,6 +33,41 @@ INSERT INTO `seata_client`.`order_tbl` (`id`, `user_id`, `commodity_code`, `coun | |||
DROP TABLE IF EXISTS `undo_log`; | |||
CREATE TABLE `undo_log` ( | |||
`id` bigint NOT NULL AUTO_INCREMENT, | |||
`branch_id` bigint NOT NULL, | |||
`xid` varchar(100) NOT NULL, | |||
`context` varchar(128) NOT NULL, | |||
`rollback_info` longblob NOT NULL, | |||
`log_status` int NOT NULL, | |||
`log_created` datetime NOT NULL, | |||
`log_modified` datetime NOT NULL, | |||
`ext` varchar(100) DEFAULT NULL, | |||
PRIMARY KEY (`id`), | |||
KEY `idx_unionkey` (`xid`,`branch_id`) | |||
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | |||
CREATE database if NOT EXISTS `seata_client1` default character set utf8mb4 collate utf8mb4_unicode_ci; | |||
USE `seata_client1`; | |||
SET NAMES utf8mb4; | |||
SET FOREIGN_KEY_CHECKS = 0; | |||
CREATE TABLE IF NOT EXISTS `order_tbl` ( | |||
`id` int(11) NOT NULL AUTO_INCREMENT, | |||
`user_id` varchar(255) DEFAULT NULL, | |||
`commodity_code` varchar(255) DEFAULT NULL, | |||
`count` int(11) DEFAULT '0', | |||
`money` int(11) DEFAULT '0', | |||
`descs` varchar(255) DEFAULT '', | |||
PRIMARY KEY (`id`) | |||
) ENGINE=InnoDB DEFAULT CHARSET=utf8; | |||
INSERT INTO `seata_client1`.`order_tbl` (`id`, `user_id`, `commodity_code`, `count`, `money`, `descs`) VALUES (1, 'NO-100001', 'C100000', 100, 10, 'init desc'); | |||
DROP TABLE IF EXISTS `undo_log`; | |||
CREATE TABLE `undo_log` ( | |||
`id` bigint NOT NULL AUTO_INCREMENT, | |||
`branch_id` bigint NOT NULL, | |||