You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

undo.go 16 kB

2 years ago
2 years ago

  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package base
  18. import (
  19. "context"
  20. "database/sql"
  21. "database/sql/driver"
  22. "encoding/json"
  23. "fmt"
  24. "strconv"
  25. "strings"
  26. "github.com/arana-db/parser/mysql"
  27. "seata.apache.org/seata-go/pkg/compressor"
  28. "seata.apache.org/seata-go/pkg/datasource/sql/datasource"
  29. "seata.apache.org/seata-go/pkg/datasource/sql/types"
  30. "seata.apache.org/seata-go/pkg/datasource/sql/undo"
  31. "seata.apache.org/seata-go/pkg/datasource/sql/undo/factor"
  32. "seata.apache.org/seata-go/pkg/datasource/sql/undo/parser"
  33. "seata.apache.org/seata-go/pkg/util/collection"
  34. "seata.apache.org/seata-go/pkg/util/log"
  35. )
  36. const (
  37. compressorTypeKey = "compressorTypeKey"
  38. serializerKey = "serializerKey"
  39. defaultUndoLogTableName = " undo_log "
  40. )
  41. func getUndoLogTableName() string {
  42. if undo.UndoConfig.LogTable != "" {
  43. return undo.UndoConfig.LogTable
  44. }
  45. return defaultUndoLogTableName
  46. }
  47. func getCheckUndoLogTableExistSql() string {
  48. return "SELECT 1 FROM " + getUndoLogTableName() + " LIMIT 1"
  49. }
  50. func getInsertUndoLogSql() string {
  51. return "INSERT INTO " + getUndoLogTableName() + "(branch_id,xid,context,rollback_info,log_status,log_created,log_modified) VALUES (?, ?, ?, ?, ?, now(6), now(6))"
  52. }
  53. func getSelectUndoLogSql() string {
  54. return "SELECT `branch_id`,`xid`,`context`,`rollback_info`,`log_status` FROM " + getUndoLogTableName() + " WHERE branch_id = ? AND xid = ? FOR UPDATE"
  55. }
  56. func getDeleteUndoLogSql() string {
  57. return "DELETE FROM " + getUndoLogTableName() + " WHERE branch_id = ? AND xid = ?"
  58. }
  59. // undo log status
  60. const (
  61. // UndoLogStatusNormal This state can be properly rolled back by services
  62. UndoLogStatusNormal = iota
  63. // UndoLogStatusGlobalFinished This state prevents the branch transaction from inserting undo_log after the global transaction is rolled back.
  64. UndoLogStatusGlobalFinished
  65. )
  66. // BaseUndoLogManager
  67. type BaseUndoLogManager struct{}
  68. func NewBaseUndoLogManager() *BaseUndoLogManager {
  69. return &BaseUndoLogManager{}
  70. }
  71. // Init
  72. func (m *BaseUndoLogManager) Init() {
  73. }
  74. // InsertUndoLog
  75. func (m *BaseUndoLogManager) InsertUndoLog(record undo.UndologRecord, conn driver.Conn) error {
  76. log.Infof("begin to insert undo log, xid %v, branch id %v", record.XID, record.BranchID)
  77. stmt, err := conn.Prepare(getInsertUndoLogSql())
  78. if err != nil {
  79. return err
  80. }
  81. _, err = stmt.Exec([]driver.Value{record.BranchID, record.XID, record.Context, record.RollbackInfo, int64(record.LogStatus)})
  82. if err != nil {
  83. return err
  84. }
  85. return nil
  86. }
  87. func (m *BaseUndoLogManager) InsertUndoLogWithSqlConn(ctx context.Context, record undo.UndologRecord, conn *sql.Conn) error {
  88. stmt, err := conn.PrepareContext(ctx, getInsertUndoLogSql())
  89. if err != nil {
  90. return err
  91. }
  92. defer stmt.Close()
  93. _, err = stmt.Exec(record.BranchID, record.XID, record.Context, record.RollbackInfo, int64(record.LogStatus))
  94. if err != nil {
  95. return err
  96. }
  97. return nil
  98. }
  99. // DeleteUndoLog exec delete single undo log operate
  100. func (m *BaseUndoLogManager) DeleteUndoLog(ctx context.Context, xid string, branchID int64, conn *sql.Conn) error {
  101. stmt, err := conn.PrepareContext(ctx, getDeleteUndoLogSql())
  102. if err != nil {
  103. log.Errorf("[DeleteUndoLog] prepare sql fail, err: %v", err)
  104. return err
  105. }
  106. defer stmt.Close()
  107. if _, err = stmt.Exec(branchID, xid); err != nil {
  108. log.Errorf("[DeleteUndoLog] exec delete undo log fail, err: %v", err)
  109. return err
  110. }
  111. return nil
  112. }
  113. // BatchDeleteUndoLog exec delete undo log operate
  114. func (m *BaseUndoLogManager) BatchDeleteUndoLog(xid []string, branchID []int64, conn *sql.Conn) error {
  115. // build delete undo log sql
  116. batchDeleteSql, err := m.getBatchDeleteUndoLogSql(xid, branchID)
  117. if err != nil {
  118. log.Errorf("get undo sql log fail, err: %v", err)
  119. return err
  120. }
  121. ctx := context.Background()
  122. // prepare deal sql
  123. stmt, err := conn.PrepareContext(ctx, batchDeleteSql)
  124. if err != nil {
  125. log.Errorf("prepare sql fail, err: %v", err)
  126. return err
  127. }
  128. defer stmt.Close()
  129. branchIDStr, err := Int64Slice2Str(branchID, ",")
  130. if err != nil {
  131. log.Errorf("slice to string transfer fail, err: %v", err)
  132. return err
  133. }
  134. // exec sql stmt
  135. if _, err = stmt.ExecContext(ctx, branchIDStr, strings.Join(xid, ",")); err != nil {
  136. log.Errorf("exec delete undo log fail, err: %v", err)
  137. return err
  138. }
  139. return nil
  140. }
  141. // FlushUndoLog flush undo log
  142. func (m *BaseUndoLogManager) FlushUndoLog(tranCtx *types.TransactionContext, conn driver.Conn) error {
  143. if tranCtx.RoundImages.IsEmpty() {
  144. return nil
  145. }
  146. sqlUndoLogs := make([]undo.SQLUndoLog, 0)
  147. beforeImages := tranCtx.RoundImages.BeofreImages()
  148. afterImages := tranCtx.RoundImages.AfterImages()
  149. if beforeImages.IsEmptyImage() && afterImages.IsEmptyImage() {
  150. return nil
  151. }
  152. size := len(beforeImages)
  153. if size < len(afterImages) {
  154. size = len(afterImages)
  155. }
  156. for i := 0; i < size; i++ {
  157. var (
  158. tableName string
  159. sqlType types.SQLType
  160. beforeImage *types.RecordImage
  161. afterImage *types.RecordImage
  162. )
  163. if i < len(beforeImages) && beforeImages[i] != nil {
  164. tableName = beforeImages[i].TableName
  165. sqlType = beforeImages[i].SQLType
  166. } else if i < len(afterImages) && afterImages[i] != nil {
  167. tableName = afterImages[i].TableName
  168. sqlType = afterImages[i].SQLType
  169. } else {
  170. continue
  171. }
  172. if i < len(beforeImages) {
  173. beforeImage = beforeImages[i]
  174. }
  175. if i < len(afterImages) {
  176. afterImage = afterImages[i]
  177. }
  178. undoLog := undo.SQLUndoLog{
  179. SQLType: sqlType,
  180. TableName: tableName,
  181. BeforeImage: beforeImage,
  182. AfterImage: afterImage,
  183. }
  184. sqlUndoLogs = append(sqlUndoLogs, undoLog)
  185. }
  186. branchUndoLog := undo.BranchUndoLog{
  187. Xid: tranCtx.XID,
  188. BranchID: tranCtx.BranchID,
  189. Logs: sqlUndoLogs,
  190. }
  191. parseContext := make(map[string]string, 0)
  192. parseContext[serializerKey] = undo.UndoConfig.LogSerialization
  193. parseContext[compressorTypeKey] = undo.UndoConfig.CompressConfig.Type
  194. undoLogContent := m.encodeUndoLogCtx(parseContext)
  195. rollbackInfo, err := m.serializeBranchUndoLog(&branchUndoLog, parseContext[serializerKey])
  196. if err != nil {
  197. return err
  198. }
  199. return m.InsertUndoLog(undo.UndologRecord{
  200. BranchID: tranCtx.BranchID,
  201. XID: tranCtx.XID,
  202. Context: undoLogContent,
  203. RollbackInfo: rollbackInfo,
  204. LogStatus: undo.UndoLogStatueNormnal,
  205. }, conn)
  206. }
  207. // RunUndo undo sql
  208. func (m *BaseUndoLogManager) RunUndo(ctx context.Context, xid string, branchID int64, conn *sql.DB, dbName string) error {
  209. return nil
  210. }
  211. // Undo undo sql
  212. func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid string, branchID int64, db *sql.DB, dbName string) (err error) {
  213. conn, err := db.Conn(ctx)
  214. if err != nil {
  215. return err
  216. }
  217. tx, err := conn.BeginTx(ctx, &sql.TxOptions{})
  218. if err != nil {
  219. return err
  220. }
  221. defer func() {
  222. if err != nil {
  223. if err = tx.Rollback(); err != nil {
  224. log.Errorf("rollback fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
  225. return
  226. }
  227. }
  228. }()
  229. stmt, err := conn.PrepareContext(ctx, getSelectUndoLogSql())
  230. if err != nil {
  231. log.Errorf("prepare sql fail, err: %v", err)
  232. return err
  233. }
  234. defer func() {
  235. if err = stmt.Close(); err != nil {
  236. log.Errorf("stmt close fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
  237. return
  238. }
  239. }()
  240. rows, err := stmt.Query(branchID, xid)
  241. if err != nil {
  242. log.Errorf("query sql fail, err: %v", err)
  243. return err
  244. }
  245. defer func() {
  246. if err = rows.Close(); err != nil {
  247. log.Errorf("rows close fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
  248. return
  249. }
  250. }()
  251. var undoLogRecords []undo.UndologRecord
  252. for rows.Next() {
  253. var record undo.UndologRecord
  254. err = rows.Scan(&record.BranchID, &record.XID, &record.Context, &record.RollbackInfo, &record.LogStatus)
  255. if err != nil {
  256. return err
  257. }
  258. undoLogRecords = append(undoLogRecords, record)
  259. }
  260. if err := rows.Err(); err != nil {
  261. log.Errorf("read rows next fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
  262. return err
  263. }
  264. var exists bool
  265. for _, record := range undoLogRecords {
  266. exists = true
  267. if !record.CanUndo() {
  268. log.Infof("xid %v branch %v, ignore %v undo_log", record.XID, record.BranchID, record.LogStatus)
  269. return nil
  270. }
  271. var logCtx map[string]string
  272. if record.Context != nil && string(record.Context) != "" {
  273. logCtx = m.decodeUndoLogCtx(record.Context)
  274. }
  275. if logCtx == nil {
  276. return fmt.Errorf("undo log context not exist in record %+v", record)
  277. }
  278. rollbackInfo, err := m.getRollbackInfo(record.RollbackInfo, logCtx)
  279. if err != nil {
  280. return err
  281. }
  282. var branchUndoLog *undo.BranchUndoLog
  283. if branchUndoLog, err = m.deserializeBranchUndoLog(rollbackInfo, logCtx); err != nil {
  284. return err
  285. }
  286. sqlUndoLogs := branchUndoLog.Logs
  287. if len(sqlUndoLogs) == 0 {
  288. return nil
  289. }
  290. branchUndoLog.Reverse()
  291. for _, undoLog := range sqlUndoLogs {
  292. tableMeta, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, dbName, undoLog.TableName)
  293. if err != nil {
  294. log.Errorf("get table meta fail, err: %v", err)
  295. return err
  296. }
  297. undoLog.SetTableMeta(tableMeta)
  298. undoExecutor, err := factor.GetUndoExecutor(dbType, undoLog)
  299. if err != nil {
  300. log.Errorf("get undo executor, err: %v", err)
  301. return err
  302. }
  303. if err = undoExecutor.ExecuteOn(ctx, dbType, conn); err != nil {
  304. log.Errorf("execute on fail, err: %v", err)
  305. return err
  306. }
  307. }
  308. }
  309. if exists {
  310. if err = m.DeleteUndoLog(ctx, xid, branchID, conn); err != nil {
  311. log.Errorf("[Undo] delete undo fail, err: %v", err)
  312. return err
  313. }
  314. log.Infof("xid %v branch %v, undo_log deleted with %v", xid, branchID, undo.UndoLogStatueGlobalFinished)
  315. } else {
  316. if err = m.insertUndoLogWithGlobalFinished(ctx, xid, uint64(branchID), conn); err != nil {
  317. log.Errorf("[Undo] insert undo with global finished fail, err: %v", err)
  318. return err
  319. }
  320. log.Infof("xid %v branch %v, undo_log added with %v", xid, branchID, undo.UndoLogStatueGlobalFinished)
  321. }
  322. if err = tx.Commit(); err != nil {
  323. log.Errorf("[Undo] execute on fail, err: %v", err)
  324. return nil
  325. }
  326. return nil
  327. }
  328. func (m *BaseUndoLogManager) insertUndoLogWithGlobalFinished(ctx context.Context, xid string, branchID uint64, conn *sql.Conn) error {
  329. // todo use config to replace
  330. parseContext := make(map[string]string, 0)
  331. parseContext[serializerKey] = undo.UndoConfig.LogSerialization
  332. parseContext[compressorTypeKey] = undo.UndoConfig.CompressConfig.Type
  333. undoLogContent := m.encodeUndoLogCtx(parseContext)
  334. logParse, err := parser.GetCache().Load(parseContext[serializerKey])
  335. if err != nil {
  336. return err
  337. }
  338. rbInfo := logParse.GetDefaultContent()
  339. record := undo.UndologRecord{
  340. BranchID: branchID,
  341. XID: xid,
  342. RollbackInfo: rbInfo,
  343. LogStatus: UndoLogStatusGlobalFinished,
  344. Context: undoLogContent,
  345. }
  346. err = m.InsertUndoLogWithSqlConn(ctx, record, conn)
  347. if err != nil {
  348. log.Errorf("insert undo log fail, err: %v", err)
  349. return err
  350. }
  351. return nil
  352. }
  353. // DBType
  354. func (m *BaseUndoLogManager) DBType() types.DBType {
  355. panic("implement me")
  356. }
  357. // HasUndoLogTable check undo log table if exist
  358. func (m *BaseUndoLogManager) HasUndoLogTable(ctx context.Context, conn *sql.Conn) (res bool, err error) {
  359. if _, err = conn.QueryContext(ctx, getCheckUndoLogTableExistSql()); err != nil { //nolint:rowserrcheck,sqlclosecheck
  360. // 1146 mysql table not exist fault code
  361. if e, ok := err.(*mysql.SQLError); ok && e.Code == mysql.ErrNoSuchTable {
  362. return false, nil
  363. }
  364. log.Errorf("[HasUndoLogTable] query sql fail, err: %v", err)
  365. return
  366. }
  367. return true, nil
  368. }
  369. // getBatchDeleteUndoLogSql build batch delete undo log
  370. func (m *BaseUndoLogManager) getBatchDeleteUndoLogSql(xid []string, branchID []int64) (string, error) {
  371. if len(xid) == 0 || len(branchID) == 0 {
  372. return "", fmt.Errorf("xid or branch_id can't nil")
  373. }
  374. var undoLogDeleteSql strings.Builder
  375. undoLogDeleteSql.WriteString(" DELETE FROM ")
  376. undoLogDeleteSql.WriteString(getUndoLogTableName())
  377. undoLogDeleteSql.WriteString(" WHERE branch_id IN ")
  378. m.appendInParam(len(branchID), &undoLogDeleteSql)
  379. undoLogDeleteSql.WriteString(" AND xid IN ")
  380. m.appendInParam(len(xid), &undoLogDeleteSql)
  381. return undoLogDeleteSql.String(), nil
  382. }
  383. // appendInParam build in param
  384. func (m *BaseUndoLogManager) appendInParam(size int, str *strings.Builder) {
  385. if size <= 0 {
  386. return
  387. }
  388. str.WriteString(" (")
  389. for i := 0; i < size; i++ {
  390. str.WriteString("?")
  391. if i < size-1 {
  392. str.WriteString(",")
  393. }
  394. }
  395. str.WriteString(") ")
  396. }
  397. // Int64Slice2Str
  398. func Int64Slice2Str(values interface{}, sep string) (string, error) {
  399. v, ok := values.([]int64)
  400. if !ok {
  401. return "", fmt.Errorf("param type is fault")
  402. }
  403. var valuesText []string
  404. for i := range v {
  405. text := strconv.FormatInt(v[i], 10)
  406. valuesText = append(valuesText, text)
  407. }
  408. return strings.Join(valuesText, sep), nil
  409. }
  410. // canUndo check if it can undo
  411. func (m *BaseUndoLogManager) canUndo(state int32) bool {
  412. return state == UndoLogStatusNormal
  413. }
  414. func (m *BaseUndoLogManager) UnmarshalContext(undoContext []byte) (map[string]string, error) {
  415. res := make(map[string]string)
  416. if err := json.Unmarshal(undoContext, &res); err != nil {
  417. return nil, err
  418. }
  419. return res, nil
  420. }
  421. // getRollbackInfo parser rollback info
  422. func (m *BaseUndoLogManager) getRollbackInfo(rollbackInfo []byte, undoContext map[string]string) ([]byte, error) {
  423. var err error
  424. res := rollbackInfo
  425. // get compress type
  426. if v, ok := undoContext[compressorTypeKey]; ok {
  427. res, err = compressor.CompressorType(v).GetCompressor().Decompress(rollbackInfo)
  428. if err != nil {
  429. log.Errorf("[getRollbackInfo] decompress fail, err: %+v", err)
  430. return nil, err
  431. }
  432. }
  433. return res, nil
  434. }
  435. // getSerializer get serializer from undo context
  436. func (m *BaseUndoLogManager) getSerializer(undoLogContext map[string]string) (serializer string) {
  437. if undoLogContext == nil {
  438. return
  439. }
  440. serializer, _ = undoLogContext[serializerKey]
  441. return
  442. }
  443. func (m *BaseUndoLogManager) deserializeBranchUndoLog(rbInfo []byte, logCtx map[string]string) (*undo.BranchUndoLog, error) {
  444. var (
  445. err error
  446. logParser parser.UndoLogParser
  447. )
  448. if serialzerType := m.getSerializer(logCtx); serialzerType != "" {
  449. if logParser, err = parser.GetCache().Load(serialzerType); err != nil {
  450. return nil, err
  451. }
  452. }
  453. var branchUndoLog *undo.BranchUndoLog
  454. if branchUndoLog, err = logParser.Decode(rbInfo); err != nil {
  455. return nil, err
  456. }
  457. return branchUndoLog, nil
  458. }
  459. func (m *BaseUndoLogManager) serializeBranchUndoLog(log *undo.BranchUndoLog, serializerType string) ([]byte, error) {
  460. logParser, err := parser.GetCache().Load(serializerType)
  461. if err != nil {
  462. return nil, err
  463. }
  464. return logParser.Encode(log)
  465. }
  466. func (m *BaseUndoLogManager) encodeUndoLogCtx(undoLogCtx map[string]string) []byte {
  467. return collection.EncodeMap(undoLogCtx)
  468. }
  469. func (m *BaseUndoLogManager) decodeUndoLogCtx(undoLogCtx []byte) map[string]string {
  470. return collection.DecodeMap(undoLogCtx)
  471. }