You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

basic_undo_log_builder.go 5.0 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with
  4. * this work for additional information regarding copyright ownership.
  5. * The ASF licenses this file to You under the Apache License, Version 2.0
  6. * (the "License"); you may not use this file except in compliance with
  7. * the License. You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. package builder
  18. import (
  19. "database/sql"
  20. "database/sql/driver"
  21. "io"
  22. "github.com/arana-db/parser/ast"
  23. "github.com/arana-db/parser/test_driver"
  24. gxsort "github.com/dubbogo/gost/sort"
  25. "github.com/seata/seata-go/pkg/datasource/sql/types"
  26. )
  27. type BasicUndoLogBuilder struct {
  28. }
  29. // getScanSlice get the column type for scann
  30. func (*BasicUndoLogBuilder) getScanSlice(columnNames []string, tableMeta types.TableMeta) []driver.Value {
  31. scanSlice := make([]driver.Value, 0, len(columnNames))
  32. for _, columnNmae := range columnNames {
  33. var (
  34. scanVal interface{}
  35. // 从metData获取该列的元信息
  36. columnMeta = tableMeta.Columns[columnNmae]
  37. )
  38. switch columnMeta.Info.ScanType() {
  39. case types.ScanTypeFloat32:
  40. scanVal = float32(0)
  41. break
  42. case types.ScanTypeFloat64:
  43. scanVal = float64(0)
  44. break
  45. case types.ScanTypeInt8:
  46. scanVal = int8(0)
  47. break
  48. case types.ScanTypeInt16:
  49. scanVal = int16(0)
  50. break
  51. case types.ScanTypeInt32:
  52. scanVal = int32(0)
  53. break
  54. case types.ScanTypeInt64:
  55. scanVal = int64(0)
  56. break
  57. case types.ScanTypeNullFloat:
  58. scanVal = sql.NullFloat64{}
  59. break
  60. case types.ScanTypeNullInt:
  61. scanVal = sql.NullInt64{}
  62. break
  63. case types.ScanTypeNullTime:
  64. scanVal = sql.NullTime{}
  65. break
  66. case types.ScanTypeUint8:
  67. scanVal = uint8(0)
  68. break
  69. case types.ScanTypeUint16:
  70. scanVal = uint16(0)
  71. break
  72. case types.ScanTypeUint32:
  73. scanVal = uint32(0)
  74. break
  75. case types.ScanTypeUint64:
  76. scanVal = uint64(0)
  77. break
  78. case types.ScanTypeRawBytes:
  79. scanVal = sql.RawBytes{}
  80. break
  81. case types.ScanTypeUnknown:
  82. scanVal = new(interface{})
  83. break
  84. }
  85. scanSlice = append(scanSlice, &scanVal)
  86. }
  87. return scanSlice
  88. }
  89. func (b *BasicUndoLogBuilder) buildSelectArgs(stmt *ast.SelectStmt, args []driver.Value) []driver.Value {
  90. var (
  91. selectArgsIndexs = make([]int32, 0)
  92. selectArgs = make([]driver.Value, 0)
  93. )
  94. b.traversalArgs(stmt.Where, &selectArgsIndexs)
  95. if stmt.OrderBy != nil {
  96. for _, item := range stmt.OrderBy.Items {
  97. b.traversalArgs(item, &selectArgsIndexs)
  98. }
  99. }
  100. if stmt.Limit != nil {
  101. if stmt.Limit.Offset != nil {
  102. b.traversalArgs(stmt.Limit.Offset, &selectArgsIndexs)
  103. }
  104. if stmt.Limit.Count != nil {
  105. b.traversalArgs(stmt.Limit.Count, &selectArgsIndexs)
  106. }
  107. }
  108. // sort selectArgs index array
  109. gxsort.Int32(selectArgsIndexs)
  110. for _, index := range selectArgsIndexs {
  111. selectArgs = append(selectArgs, args[index])
  112. }
  113. return selectArgs
  114. }
  115. // todo perfect all sql operation
  116. func (b *BasicUndoLogBuilder) traversalArgs(node ast.Node, argsIndex *[]int32) {
  117. if node == nil {
  118. return
  119. }
  120. switch node.(type) {
  121. case *ast.BinaryOperationExpr:
  122. expr := node.(*ast.BinaryOperationExpr)
  123. b.traversalArgs(expr.L, argsIndex)
  124. b.traversalArgs(expr.R, argsIndex)
  125. break
  126. case *ast.BetweenExpr:
  127. expr := node.(*ast.BetweenExpr)
  128. b.traversalArgs(expr.Left, argsIndex)
  129. b.traversalArgs(expr.Right, argsIndex)
  130. break
  131. case *ast.PatternInExpr:
  132. exprs := node.(*ast.PatternInExpr).List
  133. for i := 0; i < len(exprs); i++ {
  134. b.traversalArgs(exprs[i], argsIndex)
  135. }
  136. break
  137. case *test_driver.ParamMarkerExpr:
  138. *argsIndex = append(*argsIndex, int32(node.(*test_driver.ParamMarkerExpr).Order))
  139. break
  140. }
  141. }
  142. func (u *BasicUndoLogBuilder) buildRecordImages(rowsi driver.Rows, tableMetaData types.TableMeta) (*types.RecordImage, error) {
  143. // select column names
  144. columnNames := rowsi.Columns()
  145. rowImages := make([]types.RowImage, 0)
  146. ss := u.getScanSlice(columnNames, tableMetaData)
  147. for {
  148. err := rowsi.Next(ss)
  149. if err == io.EOF {
  150. break
  151. }
  152. columns := make([]types.ColumnImage, 0)
  153. // build record image
  154. for i, name := range columnNames {
  155. columnMeta := tableMetaData.Columns[name]
  156. keyType := types.IndexTypeNull
  157. if data, ok := tableMetaData.Indexs[name]; ok {
  158. keyType = data.IType
  159. }
  160. jdbcType := types.GetJDBCTypeByTypeName(columnMeta.Info.DatabaseTypeName())
  161. columns = append(columns, types.ColumnImage{
  162. KeyType: keyType,
  163. Name: name,
  164. Type: int16(jdbcType),
  165. Value: ss[i],
  166. })
  167. }
  168. rowImages = append(rowImages, types.RowImage{Columns: columns})
  169. }
  170. return &types.RecordImage{TableName: tableMetaData.Name, Rows: rowImages}, nil
  171. }