Browse Source

feature add integration for grpc (#158)

* finish ut for interceptor

* add client interceptor

* add server interceptor

* adjust location for test file

* format in goimports

* goimformat

* adjust package struct of grpc interceptor, move constant of grpc to common package.

* adjust gprc interceptor file struct

* refact directory, optimize grpc test case

* optimize proto

* delte test file

* delte test file

* fix npe for server integration

* go mod tidy

* remove duplicate constant
tags/1.0.2-RC1
juzimao GitHub 3 years ago
parent
commit
cd2a5b05e8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 95 additions and 9 deletions
  1. +1
    -1
      CONTRIBUTING.md
  2. +1
    -1
      CONTRIBUTING_CN.md
  3. +1
    -0
      go.mod
  4. +8
    -7
      pkg/common/constants.go
  5. +84
    -0
      pkg/integration/grpc/grpc_transaction_interceptor.go

+ 1
- 1
CONTRIBUTING.md View File

@@ -117,7 +117,7 @@ Commit message could help reviewers better understand what is the purpose of sub
* docs: xxxx. For example, "docs: add docs about seata-go cluster installation".
* feature: xxxx.For example, "feature: support oracle in AT mode".
* bugfix: xxxx. For example, "bugfix: fix panic when input nil parameter".
* refactor: xxxx. For example, "refactor: simplify to make codes more readable".
* optimize: xxxx. For example, "optimize: simplify to make codes more readable".
* test: xxx. For example, "test: add unit test case for func InsertIntoArray".
* other readable and explicit expression ways.



+ 1
- 1
CONTRIBUTING_CN.md View File

@@ -117,7 +117,7 @@ upstream no-pushing (push)
* docs: xxxx. For example, "docs: add docs about seata-go cluster installation".
* feature: xxxx.For example, "feature: support oracle in AT mode".
* bugfix: xxxx. For example, "bugfix: fix panic when input nil parameter".
* refactor: xxxx. For example, "refactor: simplify to make codes more readable".
* optimize: xxxx. For example, "optimize: simplify to make codes more readable".
* test: xxx. For example, "test: add unit test case for func InsertIntoArray".
* 其他可读和显式的表达方式。



+ 1
- 0
go.mod View File

@@ -16,6 +16,7 @@ require (
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
google.golang.org/grpc v1.45.0
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect
)

+ 8
- 7
pkg/common/constants.go View File

@@ -22,11 +22,12 @@ const (
HostName = "host-name"
ActionContext = "actionContext"

SeataXidKey = "SEATA_XID"
XidKey = "TX_XID"
MdcXidKey = "X-TX-XID"
MdcBranchIDKey = "X-TX-BRANCH-ID"
BranchTypeKey = "TX_BRANCH_TYPE"
GlobalLockKey = "TX_LOCK"
SeataFilterKey = "seataDubboFilter"
SeataXidKey = "SEATA_XID"
XidKey = "TX_XID"
XidKeyLowercase = "tx_xid"
MdcXidKey = "X-TX-XID"
MdcBranchIDKey = "X-TX-BRANCH-ID"
BranchTypeKey = "TX_BRANCH_TYPE"
GlobalLockKey = "TX_LOCK"
SeataFilterKey = "seataDubboFilter"
)

+ 84
- 0
pkg/integration/grpc/grpc_transaction_interceptor.go View File

@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package grpc

import (
"context"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/tm"
)

// ClientTransactionInterceptor is client interceptor of grpc,
// it's function is obtain xid in SeataContext,
// and put it in the http header.
func ClientTransactionInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
//set the XID when intercepting a client request and release it directly when intercepting a response
if tm.IsSeataContext(ctx) {
xid := tm.GetXID(ctx)
header := make(map[string]string)
header[common.XidKey] = xid
ctx = metadata.NewOutgoingContext(ctx, metadata.New(header))
}

start := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
end := time.Now()
log.Infof("RPC: %s, start time: %s, end time: %s, err: %v", method,
start.Format("Basic"), end.Format(time.RFC3339), err)
return err
}

// ServerTransactionInterceptor is server interceptor of grpc
// it's function is get xid from grpc http header ,and put it
// into the context.
func ServerTransactionInterceptor(ctx context.Context, req interface{},
_ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
log.Errorf("missing grpc metadata")
}
var xid string
if slice := md.Get(common.XidKey); slice != nil && len(slice) > 0 {
xid = slice[0]
}
if xid == "" {
if slice := md.Get(common.XidKeyLowercase); slice != nil && len(slice) > 0 {
xid = slice[0]
}
}
if xid != "" {
ctx = tm.InitSeataContext(ctx)
tm.SetXID(ctx, xid)
log.Infof("global transaction xid is :%s")
} else {
log.Info("global transaction xid is empty")
}

m, err := handler(ctx, req)
if err != nil {
log.Errorf("RPC failed with error %v", err)
}
return m, err
}

Loading…
Cancel
Save