* perf: optimize the performance of XA transactionstags/v1.2.0
@@ -20,14 +20,11 @@ package sql | |||
import ( | |||
"context" | |||
"database/sql/driver" | |||
"errors" | |||
"github.com/go-sql-driver/mysql" | |||
"io" | |||
"reflect" | |||
"sync" | |||
"github.com/go-sql-driver/mysql" | |||
"github.com/seata/seata-go/pkg/datasource/sql/types" | |||
"github.com/seata/seata-go/pkg/util/log" | |||
) | |||
type seataATConnector struct { | |||
@@ -117,16 +114,6 @@ func (c *seataConnector) Connect(ctx context.Context) (driver.Conn, error) { | |||
if err != nil { | |||
return nil, err | |||
} | |||
// get the version of mysql for xa. | |||
if c.transType == types.XAMode { | |||
version, err := c.dbVersion(ctx, conn) | |||
if err != nil { | |||
return nil, err | |||
} | |||
c.res.SetDbVersion(version) | |||
} | |||
return &Conn{ | |||
targetConn: conn, | |||
res: c.res, | |||
@@ -137,44 +124,6 @@ func (c *seataConnector) Connect(ctx context.Context) (driver.Conn, error) { | |||
}, nil | |||
} | |||
func (c *seataConnector) dbVersion(ctx context.Context, conn driver.Conn) (string, error) { | |||
queryConn, isQueryContext := conn.(driver.QueryerContext) | |||
if !isQueryContext { | |||
return "", errors.New("get db version error for unexpected driver conn") | |||
} | |||
res, err := queryConn.QueryContext(ctx, "SELECT VERSION()", nil) | |||
if err != nil { | |||
log.Errorf("seata connector get the xa mysql version err:%v", err) | |||
return "", err | |||
} | |||
dest := make([]driver.Value, 1) | |||
var version string | |||
if err = res.Next(dest); err != nil { | |||
if err == io.EOF { | |||
return version, nil | |||
} | |||
return "", err | |||
} | |||
if len(dest) != 1 { | |||
return "", errors.New("get the mysql version is not column 1") | |||
} | |||
switch reflect.TypeOf(dest[0]).Kind() { | |||
case reflect.Slice, reflect.Array: | |||
val := reflect.ValueOf(dest[0]).Bytes() | |||
version = string(val) | |||
case reflect.String: | |||
version = reflect.ValueOf(dest[0]).String() | |||
default: | |||
return "", errors.New("get the mysql version is not a string") | |||
} | |||
return version, nil | |||
} | |||
// Driver returns the underlying Driver of the Connector, | |||
// mainly to maintain compatibility with the Driver method | |||
// on sql.DB. | |||
@@ -29,6 +29,7 @@ import ( | |||
"github.com/seata/seata-go/pkg/datasource/sql/undo" | |||
"github.com/seata/seata-go/pkg/datasource/sql/util" | |||
"github.com/seata/seata-go/pkg/protocol/branch" | |||
"github.com/seata/seata-go/pkg/util/log" | |||
) | |||
type dbOption func(db *DBResource) | |||
@@ -123,6 +124,16 @@ func (db *DBResource) GetResourceGroupId() string { | |||
} | |||
func (db *DBResource) init() { | |||
ctx := context.Background() | |||
conn, err := db.connector.Connect(ctx) | |||
if err != nil { | |||
log.Errorf("connect: %w", err) | |||
} | |||
version, err := selectDBVersion(ctx, conn) | |||
if err != nil { | |||
log.Errorf("select db version: %w", err) | |||
} | |||
db.SetDbVersion(version) | |||
db.checkDbVersion() | |||
} | |||
@@ -23,13 +23,15 @@ import ( | |||
"database/sql/driver" | |||
"errors" | |||
"fmt" | |||
"io" | |||
"reflect" | |||
"strings" | |||
"github.com/go-sql-driver/mysql" | |||
"github.com/seata/seata-go/pkg/datasource/sql/datasource" | |||
mysql2 "github.com/seata/seata-go/pkg/datasource/sql/datasource/mysql" | |||
"github.com/seata/seata-go/pkg/datasource/sql/types" | |||
"github.com/seata/seata-go/pkg/datasource/sql/util" | |||
"github.com/seata/seata-go/pkg/protocol/branch" | |||
"github.com/seata/seata-go/pkg/util/log" | |||
) | |||
@@ -147,20 +149,16 @@ func (d *seataDriver) getOpenConnectorProxy(connector driver.Connector, dbType t | |||
withDBName(cfg.DBName), | |||
withConnector(connector), | |||
} | |||
res, err := newResource(options...) | |||
if err != nil { | |||
log.Errorf("create new resource: %w", err) | |||
return nil, err | |||
} | |||
datasource.RegisterTableCache(types.DBTypeMySQL, mysql2.NewTableMetaInstance(db)) | |||
if err = datasource.GetDataSourceManager(d.branchType).RegisterResource(res); err != nil { | |||
log.Errorf("regisiter resource: %w", err) | |||
return nil, err | |||
} | |||
return &seataConnector{ | |||
res: res, | |||
target: connector, | |||
@@ -193,3 +191,53 @@ func parseResourceID(dsn string) string { | |||
} | |||
return strings.ReplaceAll(res, ",", "|") | |||
} | |||
func selectDBVersion(ctx context.Context, conn driver.Conn) (string, error) { | |||
var rowsi driver.Rows | |||
var err error | |||
queryerCtx, ok := conn.(driver.QueryerContext) | |||
var queryer driver.Queryer | |||
if !ok { | |||
queryer, ok = conn.(driver.Queryer) | |||
} | |||
if ok { | |||
rowsi, err = util.CtxDriverQuery(ctx, queryerCtx, queryer, "SELECT VERSION()", nil) | |||
defer func() { | |||
if rowsi != nil { | |||
rowsi.Close() | |||
} | |||
}() | |||
if err != nil { | |||
log.Errorf("ctx driver query: %+v", err) | |||
return "", err | |||
} | |||
} else { | |||
log.Errorf("target conn should been driver.QueryerContext or driver.Queryer") | |||
return "", fmt.Errorf("invalid conn") | |||
} | |||
dest := make([]driver.Value, 1) | |||
var version string | |||
if err = rowsi.Next(dest); err != nil { | |||
if err == io.EOF { | |||
return version, nil | |||
} | |||
return "", err | |||
} | |||
if len(dest) != 1 { | |||
return "", errors.New("get db version is not column 1") | |||
} | |||
switch reflect.TypeOf(dest[0]).Kind() { | |||
case reflect.Slice, reflect.Array: | |||
val := reflect.ValueOf(dest[0]).Bytes() | |||
version = string(val) | |||
case reflect.String: | |||
version = reflect.ValueOf(dest[0]).String() | |||
default: | |||
return "", errors.New("get db version is not a string") | |||
} | |||
return version, nil | |||
} |
@@ -1,3 +1,20 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package fence | |||
import ( | |||
@@ -1,3 +1,20 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one or more | |||
* contributor license agreements. See the NOTICE file distributed with | |||
* this work for additional information regarding copyright ownership. | |||
* The ASF licenses this file to You under the Apache License, Version 2.0 | |||
* (the "License"); you may not use this file except in compliance with | |||
* the License. You may obtain a copy of the License at | |||
* | |||
* http://www.apache.org/licenses/LICENSE-2.0 | |||
* | |||
* Unless required by applicable law or agreed to in writing, software | |||
* distributed under the License is distributed on an "AS IS" BASIS, | |||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
* See the License for the specific language governing permissions and | |||
* limitations under the License. | |||
*/ | |||
package fence | |||
import ( | |||