diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 52052aba..9f5f2bc1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -117,7 +117,7 @@ Commit message could help reviewers better understand what is the purpose of sub * docs: xxxx. For example, "docs: add docs about seata-go cluster installation". * feature: xxxx.For example, "feature: support oracle in AT mode". * bugfix: xxxx. For example, "bugfix: fix panic when input nil parameter". -* refactor: xxxx. For example, "refactor: simplify to make codes more readable". +* optimize: xxxx. For example, "optimize: simplify to make codes more readable". * test: xxx. For example, "test: add unit test case for func InsertIntoArray". * other readable and explicit expression ways. diff --git a/CONTRIBUTING_CN.md b/CONTRIBUTING_CN.md index c1d46622..cf774c9d 100644 --- a/CONTRIBUTING_CN.md +++ b/CONTRIBUTING_CN.md @@ -117,7 +117,7 @@ upstream no-pushing (push) * docs: xxxx. For example, "docs: add docs about seata-go cluster installation". * feature: xxxx.For example, "feature: support oracle in AT mode". * bugfix: xxxx. For example, "bugfix: fix panic when input nil parameter". -* refactor: xxxx. For example, "refactor: simplify to make codes more readable". +* optimize: xxxx. For example, "optimize: simplify to make codes more readable". * test: xxx. For example, "test: add unit test case for func InsertIntoArray". * 其他可读和显式的表达方式。 diff --git a/go.mod b/go.mod index e04c1322..d491d698 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect + google.golang.org/grpc v1.45.0 vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10 vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect ) diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 500d1d27..5a92e0ab 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -22,11 +22,12 @@ const ( HostName = "host-name" ActionContext = "actionContext" - SeataXidKey = "SEATA_XID" - XidKey = "TX_XID" - MdcXidKey = "X-TX-XID" - MdcBranchIDKey = "X-TX-BRANCH-ID" - BranchTypeKey = "TX_BRANCH_TYPE" - GlobalLockKey = "TX_LOCK" - SeataFilterKey = "seataDubboFilter" + SeataXidKey = "SEATA_XID" + XidKey = "TX_XID" + XidKeyLowercase = "tx_xid" + MdcXidKey = "X-TX-XID" + MdcBranchIDKey = "X-TX-BRANCH-ID" + BranchTypeKey = "TX_BRANCH_TYPE" + GlobalLockKey = "TX_LOCK" + SeataFilterKey = "seataDubboFilter" ) diff --git a/pkg/integration/grpc/grpc_transaction_interceptor.go b/pkg/integration/grpc/grpc_transaction_interceptor.go new file mode 100644 index 00000000..34764df9 --- /dev/null +++ b/pkg/integration/grpc/grpc_transaction_interceptor.go @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package grpc + +import ( + "context" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + + "github.com/seata/seata-go/pkg/common" + "github.com/seata/seata-go/pkg/common/log" + "github.com/seata/seata-go/pkg/tm" +) + +// ClientTransactionInterceptor is client interceptor of grpc, +// it's function is obtain xid in SeataContext, +// and put it in the http header. +func ClientTransactionInterceptor(ctx context.Context, method string, req, reply interface{}, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + //set the XID when intercepting a client request and release it directly when intercepting a response + if tm.IsSeataContext(ctx) { + xid := tm.GetXID(ctx) + header := make(map[string]string) + header[common.XidKey] = xid + ctx = metadata.NewOutgoingContext(ctx, metadata.New(header)) + } + + start := time.Now() + err := invoker(ctx, method, req, reply, cc, opts...) + end := time.Now() + log.Infof("RPC: %s, start time: %s, end time: %s, err: %v", method, + start.Format("Basic"), end.Format(time.RFC3339), err) + return err +} + +// ServerTransactionInterceptor is server interceptor of grpc +// it's function is get xid from grpc http header ,and put it +// into the context. +func ServerTransactionInterceptor(ctx context.Context, req interface{}, + _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + log.Errorf("missing grpc metadata") + } + var xid string + if slice := md.Get(common.XidKey); slice != nil && len(slice) > 0 { + xid = slice[0] + } + if xid == "" { + if slice := md.Get(common.XidKeyLowercase); slice != nil && len(slice) > 0 { + xid = slice[0] + } + } + if xid != "" { + ctx = tm.InitSeataContext(ctx) + tm.SetXID(ctx, xid) + log.Infof("global transaction xid is :%s") + } else { + log.Info("global transaction xid is empty") + } + + m, err := handler(ctx, req) + if err != nil { + log.Errorf("RPC failed with error %v", err) + } + return m, err +}