Browse Source

增加Client的RPC接口

master
Sydonian 3 months ago
parent
commit
bac641ab38
10 changed files with 668 additions and 0 deletions
  1. +1
    -0
      common/ecode/ecode.go
  2. +195
    -0
      common/pkgs/rpc/channel.go
  3. +27
    -0
      common/pkgs/rpc/client/client.go
  4. +77
    -0
      common/pkgs/rpc/client/client.pb.go
  5. +11
    -0
      common/pkgs/rpc/client/client.proto
  6. +142
    -0
      common/pkgs/rpc/client/client_grpc.pb.go
  7. +115
    -0
      common/pkgs/rpc/client/pool.go
  8. +27
    -0
      common/pkgs/rpc/client/publock.go
  9. +39
    -0
      common/pkgs/rpc/client/server.go
  10. +34
    -0
      common/pkgs/rpc/utils.go

+ 1
- 0
common/ecode/ecode.go View File

@@ -10,4 +10,5 @@ const (
BadArgument ErrorCode = "BadArgument" BadArgument ErrorCode = "BadArgument"
TaskNotFound ErrorCode = "TaskNotFound" TaskNotFound ErrorCode = "TaskNotFound"
Unauthorized ErrorCode = "Unauthorized" Unauthorized ErrorCode = "Unauthorized"
ChannelClosed ErrorCode = "ChannelClosed"
) )

+ 195
- 0
common/pkgs/rpc/channel.go View File

@@ -0,0 +1,195 @@
package rpc

import (
"gitlink.org.cn/cloudream/common/utils/serder"
"gitlink.org.cn/cloudream/jcs-pub/common/ecode"
)

var ErrChannelClosed = Failed(ecode.ChannelClosed, "channel closed")

type ChanSender[T any] interface {
Send(val T) *CodeError
Close()
// 关闭连接,并发送错误码。注意:客户端部分的Channel调用此函数时设置的err不会被送到服务端,因为GRPC没有提供这样的机制。
CloseWithError(err *CodeError)
}

type ChanReceiver[T any] interface {
Receive() (T, *CodeError)
Close()
// 关闭连接,并发送错误码。注意:客户端部分的Channel调用此函数时设置的err不会被送到服务端,因为GRPC没有提供这样的机制。
CloseWithError(err *CodeError)
}

type BidChan[Recv, Send any] interface {
ChanSender[Send]
ChanReceiver[Recv]
}

type fusedChannel[Recv, Send any] struct {
err *CodeError
}

func (f *fusedChannel[Recv, Send]) Receive() (Recv, *CodeError) {
var val Recv
return val, f.err
}
func (f *fusedChannel[Recv, Send]) Send(val Send) *CodeError {
return f.err
}
func (f *fusedChannel[Recv, Send]) Close() {
}
func (f *fusedChannel[Recv, Send]) CloseWithError(err *CodeError) {
}

func NewFusedChan[Recv, Send any](err *CodeError) BidChan[Recv, Send] {
return &fusedChannel[Recv, Send]{err: err}
}

type bidChanClient[Recv, Send any] struct {
cli BidChannelAPIClient
cancelFn func()
lastErr *CodeError
}

func NewBidChanClient[Recv, Send any](cli BidChannelAPIClient, cancelFn func()) BidChan[Recv, Send] {
return &bidChanClient[Recv, Send]{cli: cli, cancelFn: cancelFn}
}

func (c *bidChanClient[Recv, Send]) Send(val Send) *CodeError {
if c.lastErr != nil {
return c.lastErr
}

data, err := serder.ObjectToJSONEx(val)
if err != nil {
c.cancelFn()
c.lastErr = Failed(ecode.OperationFailed, err.Error())
return Failed(ecode.OperationFailed, err.Error())
}

err = c.cli.Send(&Request{Payload: data})
if err != nil {
c.cancelFn()
c.lastErr = getCodeError(err)
return c.lastErr
}

return nil
}

func (c *bidChanClient[Recv, Send]) Receive() (Recv, *CodeError) {
if c.lastErr != nil {
var def Recv
return def, c.lastErr
}

resp, err := c.cli.Recv()
if err != nil {
c.cancelFn()
c.lastErr = getCodeError(err)
var def Recv
return def, c.lastErr
}

resp2, err := serder.JSONToObjectEx[Recv](resp.Payload)
if err != nil {
c.cancelFn()
c.lastErr = Failed(ecode.OperationFailed, err.Error())
var def Recv
return def, c.lastErr
}

return resp2, nil
}

func (c *bidChanClient[Recv, Send]) Close() {
if c.lastErr != nil {
return
}

c.cli.CloseSend()
c.lastErr = ErrChannelClosed
}

func (c *bidChanClient[Recv, Send]) CloseWithError(err *CodeError) {
if c.lastErr != nil {
return
}

c.cancelFn()
c.lastErr = err
}

type bidChanServer[Recv, Send any] struct {
svr BidChannelAPIServer
errChan chan *CodeError
lastErr *CodeError
}

func NewBidChanServer[Recv, Send any](svr BidChannelAPIServer, errChan chan *CodeError) BidChan[Recv, Send] {
return &bidChanServer[Recv, Send]{svr: svr, errChan: errChan}
}

func (s *bidChanServer[Recv, Send]) Send(val Send) *CodeError {
if s.lastErr != nil {
return s.lastErr
}

data, err := serder.ObjectToJSONEx(val)
if err != nil {
s.lastErr = Failed(ecode.OperationFailed, err.Error())
s.errChan <- s.lastErr
return Failed(ecode.OperationFailed, err.Error())
}
err = s.svr.Send(&Response{Payload: data})
if err != nil {
s.lastErr = getCodeError(err)
s.errChan <- s.lastErr
return s.lastErr
}

return nil
}

func (s *bidChanServer[Recv, Send]) Receive() (Recv, *CodeError) {
if s.lastErr != nil {
var def Recv
return def, s.lastErr
}

req, err := s.svr.Recv()
if err != nil {
s.lastErr = getCodeError(err)
s.errChan <- s.lastErr
var def Recv
return def, s.lastErr
}

req2, err := serder.JSONToObjectEx[Recv](req.Payload)
if err != nil {
s.lastErr = Failed(ecode.OperationFailed, err.Error())
s.errChan <- s.lastErr
var def Recv
return def, s.lastErr
}

return req2, nil
}
func (s *bidChanServer[Recv, Send]) Close() {
if s.lastErr != nil {
return
}

s.lastErr = ErrChannelClosed
s.errChan <- nil
}

func (s *bidChanServer[Recv, Send]) CloseWithError(err *CodeError) {
if s.lastErr != nil {
return
}

s.lastErr = err
s.errChan <- err
}

+ 27
- 0
common/pkgs/rpc/client/client.go View File

@@ -0,0 +1,27 @@
package clirpc

import (
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc"
"google.golang.org/grpc"
)

type Client struct {
con *grpc.ClientConn
cli ClientClient
pool *Pool
fusedErr *rpc.CodeError
}

func (c *Client) Release() {
if c.con != nil {
c.pool.connPool.Release(c.pool.cfg.Address)
}
}

type TempClient struct {
Client
}

func (c *TempClient) Release() {
c.con.Close()
}

+ 77
- 0
common/pkgs/rpc/client/client.pb.go View File

@@ -0,0 +1,77 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.34.2
// protoc v4.22.3
// source: pkgs/rpc/client/client.proto

package clirpc

import (
rpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
)

const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

var File_pkgs_rpc_client_client_proto protoreflect.FileDescriptor

var file_pkgs_rpc_client_client_proto_rawDesc = []byte{
0x0a, 0x1c, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e,
0x74, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06,
0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x1a, 0x12, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63,
0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x3b, 0x0a, 0x06, 0x43, 0x6c,
0x69, 0x65, 0x6e, 0x74, 0x12, 0x31, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x4c, 0x6f, 0x63, 0x6b, 0x43,
0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69,
0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72,
0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d,
0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6c, 0x69, 0x72,
0x70, 0x63, 0x3b, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x33,
}

var file_pkgs_rpc_client_client_proto_goTypes = []any{
(*rpc.Request)(nil), // 0: rpc.Request
(*rpc.Response)(nil), // 1: rpc.Response
}
var file_pkgs_rpc_client_client_proto_depIdxs = []int32{
0, // 0: clirpc.Client.PubLockChannel:input_type -> rpc.Request
1, // 1: clirpc.Client.PubLockChannel:output_type -> rpc.Response
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}

func init() { file_pkgs_rpc_client_client_proto_init() }
func file_pkgs_rpc_client_client_proto_init() {
if File_pkgs_rpc_client_client_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pkgs_rpc_client_client_proto_rawDesc,
NumEnums: 0,
NumMessages: 0,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pkgs_rpc_client_client_proto_goTypes,
DependencyIndexes: file_pkgs_rpc_client_client_proto_depIdxs,
}.Build()
File_pkgs_rpc_client_client_proto = out.File
file_pkgs_rpc_client_client_proto_rawDesc = nil
file_pkgs_rpc_client_client_proto_goTypes = nil
file_pkgs_rpc_client_client_proto_depIdxs = nil
}

+ 11
- 0
common/pkgs/rpc/client/client.proto View File

@@ -0,0 +1,11 @@
syntax = "proto3";

import "pkgs/rpc/rpc.proto";

package clirpc;

option go_package = "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/clirpc;clirpc";

service Client {
rpc PubLockChannel(stream rpc.Request) returns(stream rpc.Response);
}

+ 142
- 0
common/pkgs/rpc/client/client_grpc.pb.go View File

@@ -0,0 +1,142 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc v4.22.3
// source: pkgs/rpc/client/client.proto

package clirpc

import (
context "context"
rpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7

const (
Client_PubLockChannel_FullMethodName = "/clirpc.Client/PubLockChannel"
)

// ClientClient is the client API for Client service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ClientClient interface {
PubLockChannel(ctx context.Context, opts ...grpc.CallOption) (Client_PubLockChannelClient, error)
}

type clientClient struct {
cc grpc.ClientConnInterface
}

func NewClientClient(cc grpc.ClientConnInterface) ClientClient {
return &clientClient{cc}
}

func (c *clientClient) PubLockChannel(ctx context.Context, opts ...grpc.CallOption) (Client_PubLockChannelClient, error) {
stream, err := c.cc.NewStream(ctx, &Client_ServiceDesc.Streams[0], Client_PubLockChannel_FullMethodName, opts...)
if err != nil {
return nil, err
}
x := &clientPubLockChannelClient{stream}
return x, nil
}

type Client_PubLockChannelClient interface {
Send(*rpc.Request) error
Recv() (*rpc.Response, error)
grpc.ClientStream
}

type clientPubLockChannelClient struct {
grpc.ClientStream
}

func (x *clientPubLockChannelClient) Send(m *rpc.Request) error {
return x.ClientStream.SendMsg(m)
}

func (x *clientPubLockChannelClient) Recv() (*rpc.Response, error) {
m := new(rpc.Response)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

// ClientServer is the server API for Client service.
// All implementations must embed UnimplementedClientServer
// for forward compatibility
type ClientServer interface {
PubLockChannel(Client_PubLockChannelServer) error
mustEmbedUnimplementedClientServer()
}

// UnimplementedClientServer must be embedded to have forward compatible implementations.
type UnimplementedClientServer struct {
}

func (UnimplementedClientServer) PubLockChannel(Client_PubLockChannelServer) error {
return status.Errorf(codes.Unimplemented, "method PubLockChannel not implemented")
}
func (UnimplementedClientServer) mustEmbedUnimplementedClientServer() {}

// UnsafeClientServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ClientServer will
// result in compilation errors.
type UnsafeClientServer interface {
mustEmbedUnimplementedClientServer()
}

func RegisterClientServer(s grpc.ServiceRegistrar, srv ClientServer) {
s.RegisterService(&Client_ServiceDesc, srv)
}

func _Client_PubLockChannel_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(ClientServer).PubLockChannel(&clientPubLockChannelServer{stream})
}

type Client_PubLockChannelServer interface {
Send(*rpc.Response) error
Recv() (*rpc.Request, error)
grpc.ServerStream
}

type clientPubLockChannelServer struct {
grpc.ServerStream
}

func (x *clientPubLockChannelServer) Send(m *rpc.Response) error {
return x.ServerStream.SendMsg(m)
}

func (x *clientPubLockChannelServer) Recv() (*rpc.Request, error) {
m := new(rpc.Request)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}

// Client_ServiceDesc is the grpc.ServiceDesc for Client service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Client_ServiceDesc = grpc.ServiceDesc{
ServiceName: "clirpc.Client",
HandlerType: (*ClientServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "PubLockChannel",
Handler: _Client_PubLockChannel_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "pkgs/rpc/client/client.proto",
}

+ 115
- 0
common/pkgs/rpc/client/pool.go View File

@@ -0,0 +1,115 @@
package clirpc

import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"

"gitlink.org.cn/cloudream/common/consts/errorcode"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)

type PoolConfig struct {
Address string
Conn rpc.PoolConfig
}

type PoolConfigJSON struct {
Address string `json:"address"`
RootCA string `json:"rootCA"`
ClientCert string `json:"clientCert"`
ClientKey string `json:"clientKey"`
}

func (c *PoolConfigJSON) Build(tokenProv rpc.AccessTokenProvider) (*PoolConfig, error) {
pc := &PoolConfig{
Address: c.Address,
}
pc.Conn.AccessTokenProvider = tokenProv

rootCA, err := os.ReadFile(c.RootCA)
if err != nil {
return nil, fmt.Errorf("load root ca: %v", err)
}
pc.Conn.RootCA = x509.NewCertPool()
if !pc.Conn.RootCA.AppendCertsFromPEM(rootCA) {
return nil, fmt.Errorf("failed to parse root ca")
}

if c.ClientCert != "" && c.ClientKey != "" {
cert, err := tls.LoadX509KeyPair(c.ClientCert, c.ClientKey)
if err != nil {
return nil, fmt.Errorf("load client cert: %v", err)
}
pc.Conn.ClientCert = &cert
} else if tokenProv == nil {
return nil, fmt.Errorf("must provide client cert or access token provider")
}

return pc, nil
}

func (c *PoolConfigJSON) BuildTempClient() (*TempClient, error) {
rootCA, err := os.ReadFile(c.RootCA)
if err != nil {
return nil, fmt.Errorf("load root ca: %v", err)
}
rootCAs := x509.NewCertPool()
if !rootCAs.AppendCertsFromPEM(rootCA) {
return nil, fmt.Errorf("failed to parse root ca")
}

gcon, err := grpc.NewClient(c.Address,
grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{
RootCAs: rootCAs,
ServerName: rpc.ClientAPISNIV1,
NextProtos: []string{"h2"},
})),
)
if err != nil {
return nil, err
}

return &TempClient{
Client: Client{
con: gcon,
cli: NewClientClient(gcon),
pool: nil,
fusedErr: nil,
},
}, nil
}

type Pool struct {
cfg PoolConfig
connPool *rpc.ConnPool
}

func NewPool(cfg PoolConfig) *Pool {
return &Pool{
cfg: cfg,
connPool: rpc.NewConnPool(cfg.Conn),
}
}

func (p *Pool) Get() *Client {
con, err := p.connPool.GetConnection(p.cfg.Address)
if err != nil {
return &Client{
con: nil,
cli: nil,
pool: p,
fusedErr: rpc.Failed(errorcode.OperationFailed, err.Error()),
}
}

return &Client{
con: con,
cli: NewClientClient(con),
pool: p,
fusedErr: nil,
}
}

+ 27
- 0
common/pkgs/rpc/client/publock.go View File

@@ -0,0 +1,27 @@
package clirpc

import (
"context"

"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc"
)

type PubLockService interface {
PubLockChannel(tx PubLockMessageChan)
}

type PubLockMessage interface {
IsPubLockMessage() bool
}
type PubLockMessageChan = rpc.BidChan[PubLockMessage, PubLockMessage]

func (c *Client) PubLockChannel(ctx context.Context) PubLockMessageChan {
if c.fusedErr != nil {
return rpc.NewFusedChan[PubLockMessage, PubLockMessage](c.fusedErr)
}

return rpc.BidChannelClient[PubLockMessage, PubLockMessage](c.cli.PubLockChannel, ctx)
}
func (s *Server) PubLockChannel(arg Client_PubLockChannelServer) error {
return rpc.BidChannelServer(s.svrImpl.PubLockChannel, arg)
}

+ 39
- 0
common/pkgs/rpc/client/server.go View File

@@ -0,0 +1,39 @@
package clirpc

import (
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc"
)

type ClientAPI interface {
PubLockService
}

type Server struct {
UnimplementedClientServer
*rpc.ServerBase
svrImpl ClientAPI
}

func NewServer(cfg rpc.Config, impl ClientAPI, tokenVerifier rpc.AccessTokenVerifier) *Server {
svr := &Server{
svrImpl: impl,
}
svr.ServerBase = rpc.NewServerBase(cfg, svr, &Client_ServiceDesc, tokenAuthAPIs, tokenVerifier, noAuthAPIs)
return svr
}

var _ ClientServer = (*Server)(nil)

var tokenAuthAPIs []string

func TokenAuth(api string) bool {
tokenAuthAPIs = append(tokenAuthAPIs, api)
return true
}

var noAuthAPIs []string

func NoAuth(api string) bool {
noAuthAPIs = append(noAuthAPIs, api)
return true
}

+ 34
- 0
common/pkgs/rpc/utils.go View File

@@ -244,6 +244,40 @@ func DownloadStreamServer[Resp DownloadStreamResp, Req any, APIRet DownloadStrea
return nil return nil
} }


type BidChannelAPIClient interface {
Send(*Request) error
Recv() (*Response, error)
grpc.ClientStream
}

func BidChannelClient[Resp any, Req any, APIRet BidChannelAPIClient](apiFn func(context.Context, ...grpc.CallOption) (APIRet, error), ctx context.Context) BidChan[Resp, Req] {
ctx, cancelFn := context.WithCancel(ctx)

ret, err := apiFn(ctx)
if err != nil {
return NewFusedChan[Resp, Req](Failed(errorcode.OperationFailed, err.Error()))
}

return NewBidChanClient[Resp, Req](ret, cancelFn)
}

type BidChannelAPIServer interface {
Send(*Response) error
Recv() (*Request, error)
grpc.ServerStream
}

func BidChannelServer[Resp any, Req any, APIArg BidChannelAPIServer](apiFn func(BidChan[Req, Resp]), arg APIArg) error {
errCh := make(chan *CodeError, 1)
ch := NewBidChanServer[Req, Resp](arg, errCh)
go apiFn(ch)
cerr := <-errCh
if cerr != nil {
return WrapCodeError(cerr)
}
return nil
}

func Failed(errCode ecode.ErrorCode, format string, args ...any) *CodeError { func Failed(errCode ecode.ErrorCode, format string, args ...any) *CodeError {
return &CodeError{ return &CodeError{
Code: string(errCode), Code: string(errCode),


Loading…
Cancel
Save