Browse Source

feat: add grpc invoker in saga mode (#668)

* feat: add grpc invoker in saga mode

* fix: fix some chores
pull/669/head
Jingliu Xiong GitHub 1 year ago
parent
commit
90a8721983
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
11 changed files with 918 additions and 42 deletions
  1. +1
    -1
      go.mod
  2. +244
    -0
      pkg/saga/statemachine/engine/invoker/grpc_invoker.go
  3. +168
    -0
      pkg/saga/statemachine/engine/invoker/grpc_invoker_test.go
  4. +32
    -1
      pkg/saga/statemachine/engine/invoker/invoker.go
  5. +1
    -1
      pkg/saga/statemachine/statelang/parser/statemachine_parser.go
  6. +11
    -11
      pkg/saga/statemachine/statelang/parser/task_state_json_parser.go
  7. +37
    -28
      pkg/saga/statemachine/statelang/state/task_state.go
  8. +35
    -0
      testdata/saga/engine/invoker/grpc/product.go
  9. +219
    -0
      testdata/saga/engine/invoker/grpc/product.pb.go
  10. +20
    -0
      testdata/saga/engine/invoker/grpc/product.proto
  11. +150
    -0
      testdata/saga/engine/invoker/grpc/product_grpc.pb.go

+ 1
- 1
go.mod View File

@@ -34,6 +34,7 @@ require (
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/agiledragon/gomonkey/v2 v2.9.0
github.com/mattn/go-sqlite3 v1.14.19
google.golang.org/protobuf v1.30.0
)

require (
@@ -86,7 +87,6 @@ require (
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)


+ 244
- 0
pkg/saga/statemachine/engine/invoker/grpc_invoker.go View File

@@ -0,0 +1,244 @@
package invoker

import (
"context"
"errors"
"fmt"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
"github.com/seata/seata-go/pkg/util/log"
"google.golang.org/grpc"
"reflect"
"strings"
"sync"
"time"
)

type GRPCInvoker struct {
clients map[string]GRPCClient
clientsMapLock sync.Mutex
needClose bool
}

func NewGRPCInvoker() *GRPCInvoker {
return &GRPCInvoker{
clients: make(map[string]GRPCClient),
}
}

func (g *GRPCInvoker) NeedClose() bool {
return g.needClose
}

func (g *GRPCInvoker) SetNeedClose(needClose bool) {
g.needClose = needClose
}

func (g *GRPCInvoker) RegisterClient(serviceName string, client GRPCClient) {
g.clientsMapLock.Lock()
defer g.clientsMapLock.Unlock()

g.clients[serviceName] = client
}

func (g *GRPCInvoker) GetClient(serviceName string) GRPCClient {
g.clientsMapLock.Lock()
defer g.clientsMapLock.Unlock()

if client, ok := g.clients[serviceName]; ok {
return client
}

return nil
}

func (g *GRPCInvoker) Invoke(ctx context.Context, input []any, service state.ServiceTaskState) (output []reflect.Value, err error) {
serviceTaskStateImpl := service.(*state.ServiceTaskStateImpl)
client := g.GetClient(serviceTaskStateImpl.ServiceName())
if client == nil {
return nil, errors.New(fmt.Sprintf("no grpc client %s for service task state", serviceTaskStateImpl.ServiceName()))
}

// context is the first arg in grpc client method
input = append([]any{ctx}, input...)
if serviceTaskStateImpl.IsAsync() {
go func() {
_, err := client.CallMethod(serviceTaskStateImpl, input)
if err != nil {
log.Errorf("invoke Service[%s].%s failed, err is %s", serviceTaskStateImpl.ServiceName(),
serviceTaskStateImpl.ServiceMethod(), err)
}
}()
return nil, nil
} else {
return client.CallMethod(serviceTaskStateImpl, input)
}
}

func (g *GRPCInvoker) Close(ctx context.Context) error {
if g.needClose {
for _, client := range g.clients {
err := client.CloseConnection()
if err != nil {
return err
}
}
}
return nil
}

type GRPCClient interface {
CallMethod(serviceTaskStateImpl *state.ServiceTaskStateImpl, input []any) ([]reflect.Value, error)

CloseConnection() error
}

type GPRCClientImpl struct {
serviceName string
client any
connection *grpc.ClientConn
methodLock sync.Mutex
}

func NewGRPCClient(serviceName string, client any, connection *grpc.ClientConn) *GPRCClientImpl {
return &GPRCClientImpl{
serviceName: serviceName,
client: client,
connection: connection,
}
}

func (g *GPRCClientImpl) CallMethod(serviceTaskStateImpl *state.ServiceTaskStateImpl, input []any) ([]reflect.Value, error) {

if serviceTaskStateImpl.Method() == nil {
err := g.initMethod(serviceTaskStateImpl)
if err != nil {
return nil, err
}
}
method := serviceTaskStateImpl.Method()

args := make([]reflect.Value, 0, len(input))
for _, arg := range input {
args = append(args, reflect.ValueOf(arg))
}

retryCountMap := make(map[state.Retry]int)
for {
res, err, shouldRetry := func() (res []reflect.Value, resErr error, shouldRetry bool) {
defer func() {
// err may happen in the method invoke (panic) and method return, we try to find err and use it to decide retry by
// whether contains exception or not
if r := recover(); r != nil {
errStr := fmt.Sprintf("%v", r)
retry := g.matchRetry(serviceTaskStateImpl, errStr)
res = nil
resErr = errors.New(errStr)

if retry == nil {
return
}
shouldRetry = g.needRetry(serviceTaskStateImpl, retryCountMap, retry, resErr)
return
}
}()

outs := method.Call(args)
// err is the last arg in grpc client method
if err, ok := outs[len(outs)-1].Interface().(error); ok {
errStr := err.Error()
retry := g.matchRetry(serviceTaskStateImpl, errStr)
res = nil
resErr = err

if retry == nil {
return
}
shouldRetry = g.needRetry(serviceTaskStateImpl, retryCountMap, retry, resErr)
return
}

// invoke success
res = outs
resErr = nil
shouldRetry = false
return
}()

if !shouldRetry {
if err != nil {
return nil, errors.New(fmt.Sprintf("invoke Service[%s] failed, not satisfy retry config, the last err is %s",
serviceTaskStateImpl.ServiceName(), err))
}
return res, nil
}
}
}

func (g *GPRCClientImpl) CloseConnection() error {
if g.connection != nil {
err := g.connection.Close()
if err != nil {
return err
}
}
return nil
}

func (g *GPRCClientImpl) initMethod(serviceTaskStateImpl *state.ServiceTaskStateImpl) error {
methodName := serviceTaskStateImpl.ServiceMethod()
g.methodLock.Lock()
defer g.methodLock.Unlock()
clientValue := reflect.ValueOf(g.client)
if clientValue.IsZero() {
return errors.New(fmt.Sprintf("invalid client value when grpc client call, serviceName: %s", g.serviceName))
}
method := clientValue.MethodByName(methodName)
if method.IsZero() {
return errors.New(fmt.Sprintf("invalid client method when grpc client call, serviceName: %s, serviceMethod: %s",
g.serviceName, methodName))
}
serviceTaskStateImpl.SetMethod(&method)
return nil
}

func (g *GPRCClientImpl) matchRetry(impl *state.ServiceTaskStateImpl, str string) state.Retry {
if impl.Retry() != nil {
for _, retry := range impl.Retry() {
if retry.Exceptions() != nil {
for _, exception := range retry.Exceptions() {
if strings.Contains(str, exception) {
return retry
}
}
}
}
}
return nil
}

func (g *GPRCClientImpl) needRetry(impl *state.ServiceTaskStateImpl, countMap map[state.Retry]int, retry state.Retry, err error) bool {
attempt, exist := countMap[retry]
if !exist {
countMap[retry] = 0
}

if attempt >= retry.MaxAttempt() {
return false
}

intervalSecond := retry.IntervalSecond()
backoffRate := retry.BackoffRate()
var currentInterval int64
if attempt == 0 {
currentInterval = int64(intervalSecond * 1000)
} else {
currentInterval = int64(intervalSecond * backoffRate * float64(attempt) * 1000)
}

log.Warnf("invoke service[%s.%s] failed, will retry after %s millis, current retry count: %s, current err: %s",
impl.ServiceName(), impl.ServiceMethod(), currentInterval, attempt, err)

time.Sleep(time.Duration(currentInterval) * time.Millisecond)
countMap[retry] = attempt + 1
return true
}

+ 168
- 0
pkg/saga/statemachine/engine/invoker/grpc_invoker_test.go View File

@@ -0,0 +1,168 @@
package invoker

import (
"context"
"errors"
"fmt"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"testing"
"time"

pb "github.com/seata/seata-go/testdata/saga/engine/invoker/grpc"
)

type MockGPRCClientImpl struct {
GPRCClientImpl
}

type mockClientImpl struct {
invokeCount int
}

func (m *mockClientImpl) SayHelloRight(ctx context.Context, word string) (string, error) {
m.invokeCount++
fmt.Println("invoke right")
return word, nil
}

func (m *mockClientImpl) SayHelloRightLater(ctx context.Context, word string, delay int) (string, error) {
m.invokeCount++
if delay == m.invokeCount {
fmt.Println("invoke right")
return word, nil
}
fmt.Println("invoke fail")
return "", errors.New("invoke failed")
}

func TestGRPCInvokerInvokeSucceedWithOutRetry(t *testing.T) {
ctx := context.Background()
invoker := newGRPCServiceInvoker()
values, err := invoker.Invoke(ctx, []any{"hello"}, newHelloServiceTaskState())
if err != nil {
t.Error(err)
return
}
if values == nil || len(values) == 0 {
t.Error("no value in values")
return
}
if values[0].Interface().(string) != "hello" {
t.Errorf("expect hello, but got %v", values[0].Interface())
}
if _, ok := values[1].Interface().(error); ok {
t.Errorf("expect nil, but got %v", values[1].Interface())
}
}

func TestGRPCInvokerInvokeSucceedInRetry(t *testing.T) {
ctx := context.Background()
invoker := newGRPCServiceInvoker()
values, err := invoker.Invoke(ctx, []any{"hello", 2}, newHelloServiceTaskStateWithRetry())
if err != nil {
t.Error(err)
return
}
if values == nil || len(values) == 0 {
t.Error("no value in values")
return
}
if values[0].Interface().(string) != "hello" {
t.Errorf("expect hello, but got %v", values[0].Interface())
}
if _, ok := values[1].Interface().(error); ok {
t.Errorf("expect nil, but got %v", values[1].Interface())
}
}

func TestGRPCInvokerInvokeFailedInRetry(t *testing.T) {
ctx := context.Background()
invoker := newGRPCServiceInvoker()
_, err := invoker.Invoke(ctx, []any{"hello", 5}, newHelloServiceTaskStateWithRetry())
if err != nil {
assert.Error(t, err)
}
}

func TestGRPCInvokerInvokeE2E(t *testing.T) {
go func() {
pb.StartProductServer()
}()
time.Sleep(3000 * time.Millisecond)
conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("did not connect: %v", err)
}
c := pb.NewProductInfoClient(conn)
grpcClient := NewGRPCClient("product", c, conn)

invoker := NewGRPCInvoker()
invoker.RegisterClient("product", grpcClient)
ctx := context.Background()
values, err := invoker.Invoke(ctx, []any{&pb.Product{Id: "123"}}, newProductServiceTaskState())
if err != nil {
t.Error(err)
return
}
t.Log(values)
err = invoker.Close(ctx)
if err != nil {
t.Error(err)
return
}
}

func newGRPCServiceInvoker() ServiceInvoker {
mockGRPCInvoker := NewGRPCInvoker()
mockGRPCClient := &mockClientImpl{}
mockClient := NewGRPCClient("hello", mockGRPCClient, &grpc.ClientConn{})
mockGRPCInvoker.RegisterClient("hello", mockClient)
return mockGRPCInvoker
}

func newProductServiceTaskState() state.ServiceTaskState {
serviceTaskStateImpl := state.NewServiceTaskStateImpl()
serviceTaskStateImpl.SetName("product")
serviceTaskStateImpl.SetIsAsync(false)
serviceTaskStateImpl.SetServiceName("product")
serviceTaskStateImpl.SetServiceType("GRPC")
serviceTaskStateImpl.SetServiceMethod("AddProduct")

retryImpl := &state.RetryImpl{}
retryImpl.SetExceptions([]string{"fail"})
retryImpl.SetIntervalSecond(1)
retryImpl.SetMaxAttempt(3)
retryImpl.SetBackoffRate(0.9)
serviceTaskStateImpl.SetRetry([]state.Retry{retryImpl})
return serviceTaskStateImpl
}

func newHelloServiceTaskState() state.ServiceTaskState {
serviceTaskStateImpl := state.NewServiceTaskStateImpl()
serviceTaskStateImpl.SetName("hello")
serviceTaskStateImpl.SetIsAsync(false)
serviceTaskStateImpl.SetServiceName("hello")
serviceTaskStateImpl.SetServiceType("GRPC")
serviceTaskStateImpl.SetServiceMethod("SayHelloRight")
return serviceTaskStateImpl
}

func newHelloServiceTaskStateWithRetry() state.ServiceTaskState {
serviceTaskStateImpl := state.NewServiceTaskStateImpl()
serviceTaskStateImpl.SetName("hello")
serviceTaskStateImpl.SetIsAsync(false)
serviceTaskStateImpl.SetServiceName("hello")
serviceTaskStateImpl.SetServiceType("GRPC")
serviceTaskStateImpl.SetServiceMethod("SayHelloRightLater")

retryImpl := &state.RetryImpl{}
retryImpl.SetExceptions([]string{"fail"})
retryImpl.SetIntervalSecond(1)
retryImpl.SetMaxAttempt(3)
retryImpl.SetBackoffRate(0.9)
serviceTaskStateImpl.SetRetry([]state.Retry{retryImpl})
return serviceTaskStateImpl
}

+ 32
- 1
pkg/saga/statemachine/engine/invoker/invoker.go View File

@@ -1,5 +1,12 @@
package invoker

import (
"context"
"github.com/seata/seata-go/pkg/saga/statemachine/statelang/state"
"reflect"
"sync"
)

type ScriptInvokerManager interface {
}

@@ -7,8 +14,32 @@ type ScriptInvoker interface {
}

type ServiceInvokerManager interface {
ServiceInvoker(serviceType string) ServiceInvoker
PutServiceInvoker(serviceType string, invoker ServiceInvoker)
}

type ServiceInvoker interface {
Invoke()
Invoke(ctx context.Context, input []any, service state.ServiceTaskState) (output []reflect.Value, err error)
Close(ctx context.Context) error
}

type ServiceInvokerManagerImpl struct {
invokers map[string]ServiceInvoker
mutex sync.Mutex
}

func NewServiceInvokerManagerImpl() *ServiceInvokerManagerImpl {
return &ServiceInvokerManagerImpl{
invokers: make(map[string]ServiceInvoker),
}
}

func (manager *ServiceInvokerManagerImpl) ServiceInvoker(serviceType string) ServiceInvoker {
return manager.invokers[serviceType]
}

func (manager *ServiceInvokerManagerImpl) PutServiceInvoker(serviceType string, invoker ServiceInvoker) {
manager.mutex.Lock()
defer manager.mutex.Unlock()
manager.invokers[serviceType] = invoker
}

+ 1
- 1
pkg/saga/statemachine/statelang/parser/statemachine_parser.go View File

@@ -136,7 +136,7 @@ func (b BaseStateParser) GetBoolOrDefault(stateName string, stateMap map[string]

valueAsBool, ok := value.(bool)
if !ok {
return false, errors.New("State [" + stateName + "] " + key + " illegal, required bool")
return defaultValue, errors.New("State [" + stateName + "] " + key + " illegal, required bool")
}
return valueAsBool, nil
}


+ 11
- 11
pkg/saga/statemachine/statelang/parser/task_state_json_parser.go View File

@@ -157,16 +157,16 @@ func (a *AbstractTaskStateParser) parseRetries(stateName string, retryInterfaces
return nil, errors.New("State [" + stateName + "] " + "Retry illegal, require map[string]interface{}")
}
retry := &state.RetryImpl{}
errorTypes, err := a.GetSliceOrDefault(stateName, retryMap, "Exceptions", nil)
exceptions, err := a.GetSliceOrDefault(stateName, retryMap, "Exceptions", nil)
if err != nil {
return nil, err
}
if errorTypes != nil {
errorTypeNames := make([]string, 0)
for _, errorType := range errorTypes {
errorTypeNames = append(errorTypeNames, errorType.(string))
if exceptions != nil {
errors := make([]string, 0)
for _, errorType := range exceptions {
errors = append(errors, errorType.(string))
}
retry.SetErrorTypeNames(errorTypeNames)
retry.SetExceptions(errors)
}

maxAttempts, err := a.GetIntOrDefault(stateName, retryMap, "MaxAttempts", 0)
@@ -191,14 +191,14 @@ func (a *AbstractTaskStateParser) parseRetries(stateName string, retryInterfaces
return retries, nil
}

func (a *AbstractTaskStateParser) parseCatches(stateName string, catchInterfaces []interface{}) ([]state.ErrorMatch, error) {
errorMatches := make([]state.ErrorMatch, 0, len(catchInterfaces))
func (a *AbstractTaskStateParser) parseCatches(stateName string, catchInterfaces []interface{}) ([]state.ExceptionMatch, error) {
errorMatches := make([]state.ExceptionMatch, 0, len(catchInterfaces))
for _, catchInterface := range catchInterfaces {
catchMap, ok := catchInterface.(map[string]interface{})
if !ok {
return nil, errors.New("State [" + stateName + "] " + "Catch illegal, require map[string]interface{}")
}
errorMatch := &state.ErrorMatchImpl{}
errorMatch := &state.ExceptionMatchImpl{}
errorInterfaces, err := a.GetSliceOrDefault(stateName, catchMap, "Exceptions", nil)
if err != nil {
return nil, err
@@ -208,7 +208,7 @@ func (a *AbstractTaskStateParser) parseCatches(stateName string, catchInterfaces
for _, errorType := range errorInterfaces {
errorNames = append(errorNames, errorType.(string))
}
errorMatch.SetErrors(errorNames)
errorMatch.SetExceptions(errorNames)
}
next, err := a.GetStringOrDefault(stateName, catchMap, "Next", "")
if err != nil {
@@ -242,7 +242,7 @@ func (s ServiceTaskStateParser) Parse(stateName string, stateMap map[string]inte
return nil, err
}

serviceName, err := s.GetString(stateName, stateMap, "ServiceName")
serviceName, err := s.GetString(stateName, stateMap, "serviceName")
if err != nil {
return nil, err
}


+ 37
- 28
pkg/saga/statemachine/statelang/state/task_state.go View File

@@ -17,7 +17,7 @@ type TaskState interface {

Retry() []Retry

Catches() []ErrorMatch
Catches() []ExceptionMatch

Status() map[string]string

@@ -36,18 +36,18 @@ type Loop interface {
CompletionCondition() string
}

type ErrorMatch interface {
Errors() []string
type ExceptionMatch interface {
Exceptions() []string

ErrorTypes() []reflect.Type
ExceptionTypes() []reflect.Type

SetErrorTypes(errorTypes []reflect.Type)
SetExceptionTypes(ExceptionTypes []reflect.Type)

Next() string
}

type Retry interface {
ErrorTypeNames() []string
Exceptions() []string

IntervalSecond() float64

@@ -77,7 +77,7 @@ type ServiceTaskState interface {
type AbstractTaskState struct {
*statelang.BaseState
loop Loop
catches []ErrorMatch
catches []ExceptionMatch
input []interface{}
output map[string]interface{}
compensatePersistModeUpdate bool
@@ -140,7 +140,7 @@ func (a *AbstractTaskState) SetLoop(loop Loop) {
a.loop = loop
}

func (a *AbstractTaskState) SetCatches(catches []ErrorMatch) {
func (a *AbstractTaskState) SetCatches(catches []ExceptionMatch) {
a.catches = catches
}

@@ -172,7 +172,7 @@ func (a *AbstractTaskState) ForUpdate() bool {
return a.forUpdate
}

func (a *AbstractTaskState) Catches() []ErrorMatch {
func (a *AbstractTaskState) Catches() []ExceptionMatch {
return a.catches
}

@@ -198,6 +198,7 @@ type ServiceTaskStateImpl struct {
serviceName string
serviceMethod string
parameterTypes []string
method *reflect.Value
persist bool
retryPersistModeUpdate bool
compensatePersistModeUpdate bool
@@ -210,6 +211,14 @@ func NewServiceTaskStateImpl() *ServiceTaskStateImpl {
}
}

func (s *ServiceTaskStateImpl) Method() *reflect.Value {
return s.method
}

func (s *ServiceTaskStateImpl) SetMethod(method *reflect.Value) {
s.method = method
}

func (s *ServiceTaskStateImpl) IsAsync() bool {
return s.isAsync
}
@@ -327,14 +336,14 @@ func (l *LoopImpl) CompletionCondition() string {
}

type RetryImpl struct {
errorTypeNames []string
exceptions []string
intervalSecond float64
maxAttempt int
backoffRate float64
}

func (r *RetryImpl) SetErrorTypeNames(errorTypeNames []string) {
r.errorTypeNames = errorTypeNames
func (r *RetryImpl) SetExceptions(exceptions []string) {
r.exceptions = exceptions
}

func (r *RetryImpl) SetIntervalSecond(intervalSecond float64) {
@@ -349,8 +358,8 @@ func (r *RetryImpl) SetBackoffRate(backoffRate float64) {
r.backoffRate = backoffRate
}

func (r *RetryImpl) ErrorTypeNames() []string {
return r.errorTypeNames
func (r *RetryImpl) Exceptions() []string {
return r.exceptions
}

func (r *RetryImpl) IntervalSecond() float64 {
@@ -365,33 +374,33 @@ func (r *RetryImpl) BackoffRate() float64 {
return r.backoffRate
}

type ErrorMatchImpl struct {
errors []string
errorTypes []reflect.Type
next string
type ExceptionMatchImpl struct {
exceptions []string
exceptionTypes []reflect.Type
next string
}

func (e *ErrorMatchImpl) SetErrors(errors []string) {
e.errors = errors
func (e *ExceptionMatchImpl) SetExceptions(errors []string) {
e.exceptions = errors
}

func (e *ErrorMatchImpl) SetNext(next string) {
func (e *ExceptionMatchImpl) SetNext(next string) {
e.next = next
}

func (e *ErrorMatchImpl) Errors() []string {
return e.errors
func (e *ExceptionMatchImpl) Exceptions() []string {
return e.exceptions
}

func (e *ErrorMatchImpl) ErrorTypes() []reflect.Type {
return e.errorTypes
func (e *ExceptionMatchImpl) ExceptionTypes() []reflect.Type {
return e.exceptionTypes
}

func (e *ErrorMatchImpl) SetErrorTypes(errorTypes []reflect.Type) {
e.errorTypes = errorTypes
func (e *ExceptionMatchImpl) SetExceptionTypes(exceptionTypes []reflect.Type) {
e.exceptionTypes = exceptionTypes
}

func (e *ErrorMatchImpl) Next() string {
func (e *ExceptionMatchImpl) Next() string {
return e.next
}



+ 35
- 0
testdata/saga/engine/invoker/grpc/product.go View File

@@ -0,0 +1,35 @@
package product

import (
"context"
"log"
"net"

"google.golang.org/grpc"
)

type server struct {
UnimplementedProductInfoServer
}

func (*server) AddProduct(context.Context, *Product) (*ProductId, error) {
log.Println("add product success")
return &ProductId{Value: "1"}, nil
}
func (*server) GetProduct(context.Context, *ProductId) (*Product, error) {
log.Println("get product success")
return &Product{Id: "1"}, nil
}

func StartProductServer() {
lis, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
RegisterProductInfoServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

+ 219
- 0
testdata/saga/engine/invoker/grpc/product.pb.go View File

@@ -0,0 +1,219 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.32.0
// protoc v4.25.3
// source: product.proto

package product

import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)

const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

type Product struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
}

func (x *Product) Reset() {
*x = Product{}
if protoimpl.UnsafeEnabled {
mi := &file_product_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *Product) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*Product) ProtoMessage() {}

func (x *Product) ProtoReflect() protoreflect.Message {
mi := &file_product_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use Product.ProtoReflect.Descriptor instead.
func (*Product) Descriptor() ([]byte, []int) {
return file_product_proto_rawDescGZIP(), []int{0}
}

func (x *Product) GetId() string {
if x != nil {
return x.Id
}
return ""
}

type ProductId struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Value string `protobuf:"bytes,1,opt,name=value,proto3" json:"value,omitempty"`
}

func (x *ProductId) Reset() {
*x = ProductId{}
if protoimpl.UnsafeEnabled {
mi := &file_product_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *ProductId) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*ProductId) ProtoMessage() {}

func (x *ProductId) ProtoReflect() protoreflect.Message {
mi := &file_product_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use ProductId.ProtoReflect.Descriptor instead.
func (*ProductId) Descriptor() ([]byte, []int) {
return file_product_proto_rawDescGZIP(), []int{1}
}

func (x *ProductId) GetValue() string {
if x != nil {
return x.Value
}
return ""
}

var File_product_proto protoreflect.FileDescriptor

var file_product_proto_rawDesc = []byte{
0x0a, 0x0d, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12,
0x07, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x22, 0x19, 0x0a, 0x07, 0x50, 0x72, 0x6f, 0x64,
0x75, 0x63, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x02, 0x69, 0x64, 0x22, 0x21, 0x0a, 0x09, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x49, 0x64,
0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x32, 0x75, 0x0a, 0x0b, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63,
0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x32, 0x0a, 0x0a, 0x61, 0x64, 0x64, 0x50, 0x72, 0x6f, 0x64,
0x75, 0x63, 0x74, 0x12, 0x10, 0x2e, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x2e, 0x50, 0x72,
0x6f, 0x64, 0x75, 0x63, 0x74, 0x1a, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x2e,
0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x49, 0x64, 0x12, 0x32, 0x0a, 0x0a, 0x67, 0x65, 0x74,
0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x64, 0x75, 0x63,
0x74, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x49, 0x64, 0x1a, 0x10, 0x2e, 0x70, 0x72,
0x6f, 0x64, 0x75, 0x63, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x64, 0x75, 0x63, 0x74, 0x42, 0x56, 0x0a,
0x1b, 0x69, 0x6f, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65,
0x73, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x42, 0x0f, 0x48, 0x65,
0x6c, 0x6c, 0x6f, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a,
0x24, 0x74, 0x65, 0x73, 0x74, 0x64, 0x61, 0x74, 0x61, 0x2f, 0x73, 0x61, 0x67, 0x61, 0x2f, 0x65,
0x6e, 0x67, 0x69, 0x6e, 0x65, 0x2f, 0x69, 0x6e, 0x76, 0x6f, 0x6b, 0x65, 0x72, 0x2f, 0x70, 0x72,
0x6f, 0x64, 0x75, 0x63, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}

var (
file_product_proto_rawDescOnce sync.Once
file_product_proto_rawDescData = file_product_proto_rawDesc
)

func file_product_proto_rawDescGZIP() []byte {
file_product_proto_rawDescOnce.Do(func() {
file_product_proto_rawDescData = protoimpl.X.CompressGZIP(file_product_proto_rawDescData)
})
return file_product_proto_rawDescData
}

var file_product_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
var file_product_proto_goTypes = []interface{}{
(*Product)(nil), // 0: product.Product
(*ProductId)(nil), // 1: product.ProductId
}
var file_product_proto_depIdxs = []int32{
0, // 0: product.ProductInfo.addProduct:input_type -> product.Product
1, // 1: product.ProductInfo.getProduct:input_type -> product.ProductId
1, // 2: product.ProductInfo.addProduct:output_type -> product.ProductId
0, // 3: product.ProductInfo.getProduct:output_type -> product.Product
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}

func init() { file_product_proto_init() }
func file_product_proto_init() {
if File_product_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_product_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Product); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_product_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ProductId); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_product_proto_rawDesc,
NumEnums: 0,
NumMessages: 2,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_product_proto_goTypes,
DependencyIndexes: file_product_proto_depIdxs,
MessageInfos: file_product_proto_msgTypes,
}.Build()
File_product_proto = out.File
file_product_proto_rawDesc = nil
file_product_proto_goTypes = nil
file_product_proto_depIdxs = nil
}

+ 20
- 0
testdata/saga/engine/invoker/grpc/product.proto View File

@@ -0,0 +1,20 @@
syntax = "proto3";
package product;

option go_package = "testdata/saga/engine/invoker/product";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";

service ProductInfo {
rpc addProduct(Product) returns (ProductId);
rpc getProduct(ProductId) returns (Product);
}

message Product {
string id = 1;
}

message ProductId {
string value = 1;
}

+ 150
- 0
testdata/saga/engine/invoker/grpc/product_grpc.pb.go View File

@@ -0,0 +1,150 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.25.3
// source: product.proto

package product

import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7

const (
ProductInfo_AddProduct_FullMethodName = "/product.ProductInfo/addProduct"
ProductInfo_GetProduct_FullMethodName = "/product.ProductInfo/getProduct"
)

// ProductInfoClient is the client API for ProductInfo service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ProductInfoClient interface {
// 添加商品
AddProduct(ctx context.Context, in *Product, opts ...grpc.CallOption) (*ProductId, error)
// 获取商品
GetProduct(ctx context.Context, in *ProductId, opts ...grpc.CallOption) (*Product, error)
}

type productInfoClient struct {
cc grpc.ClientConnInterface
}

func NewProductInfoClient(cc grpc.ClientConnInterface) ProductInfoClient {
return &productInfoClient{cc}
}

func (c *productInfoClient) AddProduct(ctx context.Context, in *Product, opts ...grpc.CallOption) (*ProductId, error) {
out := new(ProductId)
err := c.cc.Invoke(ctx, ProductInfo_AddProduct_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

func (c *productInfoClient) GetProduct(ctx context.Context, in *ProductId, opts ...grpc.CallOption) (*Product, error) {
out := new(Product)
err := c.cc.Invoke(ctx, ProductInfo_GetProduct_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

// ProductInfoServer is the server API for ProductInfo service.
// All implementations must embed UnimplementedProductInfoServer
// for forward compatibility
type ProductInfoServer interface {
// 添加商品
AddProduct(context.Context, *Product) (*ProductId, error)
// 获取商品
GetProduct(context.Context, *ProductId) (*Product, error)
mustEmbedUnimplementedProductInfoServer()
}

// UnimplementedProductInfoServer must be embedded to have forward compatible implementations.
type UnimplementedProductInfoServer struct {
}

func (UnimplementedProductInfoServer) AddProduct(context.Context, *Product) (*ProductId, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddProduct not implemented")
}
func (UnimplementedProductInfoServer) GetProduct(context.Context, *ProductId) (*Product, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetProduct not implemented")
}
func (UnimplementedProductInfoServer) mustEmbedUnimplementedProductInfoServer() {}

// UnsafeProductInfoServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ProductInfoServer will
// result in compilation errors.
type UnsafeProductInfoServer interface {
mustEmbedUnimplementedProductInfoServer()
}

func RegisterProductInfoServer(s grpc.ServiceRegistrar, srv ProductInfoServer) {
s.RegisterService(&ProductInfo_ServiceDesc, srv)
}

func _ProductInfo_AddProduct_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Product)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ProductInfoServer).AddProduct(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ProductInfo_AddProduct_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ProductInfoServer).AddProduct(ctx, req.(*Product))
}
return interceptor(ctx, in, info, handler)
}

func _ProductInfo_GetProduct_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ProductId)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ProductInfoServer).GetProduct(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: ProductInfo_GetProduct_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ProductInfoServer).GetProduct(ctx, req.(*ProductId))
}
return interceptor(ctx, in, info, handler)
}

// ProductInfo_ServiceDesc is the grpc.ServiceDesc for ProductInfo service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var ProductInfo_ServiceDesc = grpc.ServiceDesc{
ServiceName: "product.ProductInfo",
HandlerType: (*ProductInfoServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "addProduct",
Handler: _ProductInfo_AddProduct_Handler,
},
{
MethodName: "getProduct",
Handler: _ProductInfo_GetProduct_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "product.proto",
}

Loading…
Cancel
Save