From 380ba38572d3dc8931d77c43717521cccbaef21b Mon Sep 17 00:00:00 2001 From: Jason Deng <76831112+jasondeng1997@users.noreply.github.com> Date: Sat, 4 Feb 2023 21:47:16 +0800 Subject: [PATCH] [WIP]feat: support xa oracle connection (#391) * doc: v1.0.2-RC1 release file (#338) --------- Co-authored-by: Shaozhou Hu <1094091844@qq.com> --- go.mod | 1 + go.sum | 2 + pkg/datasource/sql/conn/oracle.go | 111 +++++++++++++++++++++++++ pkg/datasource/sql/conn/oracle_test.go | 105 +++++++++++++++++++++++ pkg/datasource/sql/conn/resource_xa.go | 91 ++++++++++++++++++++ 5 files changed, 310 insertions(+) create mode 100644 pkg/datasource/sql/conn/oracle.go create mode 100644 pkg/datasource/sql/conn/oracle_test.go create mode 100644 pkg/datasource/sql/conn/resource_xa.go diff --git a/go.mod b/go.mod index 761f506e..cea1ba7b 100644 --- a/go.mod +++ b/go.mod @@ -22,6 +22,7 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.2 github.com/prometheus/common v0.32.1 + github.com/sijms/go-ora/v2 v2.5.17 github.com/stretchr/testify v1.8.0 go.uber.org/atomic v1.9.0 go.uber.org/zap v1.21.0 diff --git a/go.sum b/go.sum index 55640110..362e3d18 100644 --- a/go.sum +++ b/go.sum @@ -773,6 +773,8 @@ github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+ github.com/shirou/gopsutil/v3 v3.22.2 h1:wCrArWFkHYIdDxx/FSfF5RB4dpJYW6t7rcp3+zL8uks= github.com/shirou/gopsutil/v3 v3.22.2/go.mod h1:WapW1AOOPlHyXr+yOyw3uYx36enocrtSoSBy0L5vUHY= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/sijms/go-ora/v2 v2.5.17 h1:7FS8vswmAHint/r/fmgpKEczBnLZH64PNSkTiVradhY= +github.com/sijms/go-ora/v2 v2.5.17/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= diff --git a/pkg/datasource/sql/conn/oracle.go b/pkg/datasource/sql/conn/oracle.go new file mode 100644 index 00000000..78820965 --- /dev/null +++ b/pkg/datasource/sql/conn/oracle.go @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package conn + +import ( + "context" + "database/sql/driver" + "strings" + "time" + + _ "github.com/sijms/go-ora/v2" +) + +type OracleXAConn struct { + driver.Conn +} + +func (c *OracleXAConn) Commit(xid string, onePhase bool) error { + var sb strings.Builder + sb.WriteString("XA COMMIT ") + sb.WriteString(xid) + if onePhase { + sb.WriteString(" ONE PHASE") + } + + conn, _ := c.Conn.(driver.ExecerContext) + _, err := conn.ExecContext(context.TODO(), sb.String(), nil) + return err +} + +func (c *OracleXAConn) End(xid string, flags int) error { + var sb strings.Builder + sb.WriteString("XA END ") + sb.WriteString(xid) + + conn, _ := c.Conn.(driver.ExecerContext) + _, err := conn.ExecContext(context.TODO(), sb.String(), nil) + return err +} + +func (c *OracleXAConn) Forget(xid string) error { + // TODO implement me + panic("implement me") +} + +func (c *OracleXAConn) GetTransactionTimeout() time.Duration { + // TODO implement me + panic("implement me") +} + +func (c *OracleXAConn) IsSameRM(resource XAResource) bool { + // TODO implement me + panic("implement me") +} + +func (c *OracleXAConn) XAPrepare(xid string) (int, error) { + var sb strings.Builder + sb.WriteString("XA PREPARE ") + sb.WriteString(xid) + + conn, _ := c.Conn.(driver.ExecerContext) + if _, err := conn.ExecContext(context.TODO(), sb.String(), nil); err != nil { + return -1, err + } + return 0, nil +} + +func (c *OracleXAConn) Recover(flag int) []string { + // TODO implement me + panic("implement me") +} + +func (c *OracleXAConn) Rollback(xid string) error { + var sb strings.Builder + sb.WriteString("XA ROLLBACK ") + sb.WriteString(xid) + + conn, _ := c.Conn.(driver.ExecerContext) + _, err := conn.ExecContext(context.TODO(), sb.String(), nil) + return err +} + +func (c *OracleXAConn) SetTransactionTimeout(duration time.Duration) bool { + // TODO implement me + panic("implement me") +} + +func (c *OracleXAConn) Start(xid string, flags int) error { + var sb strings.Builder + sb.WriteString("XA START") + sb.WriteString(xid) + + conn, _ := c.Conn.(driver.ExecerContext) + _, err := conn.ExecContext(context.TODO(), sb.String(), nil) + return err +} diff --git a/pkg/datasource/sql/conn/oracle_test.go b/pkg/datasource/sql/conn/oracle_test.go new file mode 100644 index 00000000..ef8e4eac --- /dev/null +++ b/pkg/datasource/sql/conn/oracle_test.go @@ -0,0 +1,105 @@ +package conn + +import ( + "database/sql/driver" + "testing" + "time" +) + +func TestOracleXAConn_Commit(t *testing.T) { + type fields struct { + Conn driver.Conn + } + type args struct { + xid string + onePhase bool + } + var tests []struct { + name string + fields fields + args args + wantErr bool + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &OracleXAConn{ + Conn: tt.fields.Conn, + } + if err := c.Commit(tt.args.xid, tt.args.onePhase); (err != nil) != tt.wantErr { + t.Errorf("Commit() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestOracleXAConn_End(t *testing.T) { + type fields struct { + Conn driver.Conn + } + type args struct { + xid string + flags int + } + var tests []struct { + name string + fields fields + args args + wantErr bool + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &OracleXAConn{ + Conn: tt.fields.Conn, + } + if err := c.End(tt.args.xid, tt.args.flags); (err != nil) != tt.wantErr { + t.Errorf("End() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestOracleXAConn_Forget(t *testing.T) { + type fields struct { + Conn driver.Conn + } + type args struct { + xid string + } + var tests []struct { + name string + fields fields + args args + wantErr bool + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &OracleXAConn{ + Conn: tt.fields.Conn, + } + if err := c.Forget(tt.args.xid); (err != nil) != tt.wantErr { + t.Errorf("Forget() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + +func TestOracleXAConn_GetTransactionTimeout(t *testing.T) { + type fields struct { + Conn driver.Conn + } + var tests []struct { + name string + fields fields + want time.Duration + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &OracleXAConn{ + Conn: tt.fields.Conn, + } + if got := c.GetTransactionTimeout(); got != tt.want { + t.Errorf("GetTransactionTimeout() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/datasource/sql/conn/resource_xa.go b/pkg/datasource/sql/conn/resource_xa.go new file mode 100644 index 00000000..52bbced5 --- /dev/null +++ b/pkg/datasource/sql/conn/resource_xa.go @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package conn + +import "time" + +const ( + // TMENDICANT Ends a recovery scan. + TMENDRSCAN = 0x00800000 + + /** + * Disassociates the caller and marks the transaction branch + * rollback-only. + */ + TMFAIL = 0x20000000 + + /** + * Caller is joining existing transaction branch. + */ + TMJOIN = 0x00200000 + + /** + * Use TMNOFLAGS to indicate no flags value is selected. + */ + TMNOFLAGS = 0x00000000 + + /** + * Caller is using one-phase optimization. + */ + TMONEPHASE = 0x40000000 + + /** + * Caller is resuming association with a suspended + * transaction branch. + */ + TMRESUME = 0x08000000 + + /** + * Starts a recovery scan. + */ + TMSTARTRSCAN = 0x01000000 + + /** + * Disassociates caller from a transaction branch. + */ + TMSUCCESS = 0x04000000 + + /** + * Caller is suspending (not ending) its association with + * a transaction branch. + */ + TMSUSPEND = 0x02000000 + + /** + * The transaction branch has been read-only and has been committed. + */ + XA_RDONLY = 0x00000003 + + /** + * The transaction work has been prepared normally. + */ + XA_OK = 0 +) + +type XAResource interface { + Commit(xid string, onePhase bool) error + End(xid string, flags int) error + Forget(xid string) error + GetTransactionTimeout() time.Duration + IsSameRM(resource XAResource) bool + XAPrepare(xid string) (int, error) + Recover(flag int) []string + Rollback(xid string) error + SetTransactionTimeout(duration time.Duration) bool + Start(xid string, flags int) error +}