Browse Source

增加http通过rpc代理的功能

master
Sydonian 3 months ago
parent
commit
4d0f6c824b
16 changed files with 581 additions and 45 deletions
  1. +1
    -1
      client/internal/cluster/cluster.go
  2. +2
    -2
      client/internal/cmdline/serve.go
  3. +1
    -1
      client/internal/cmdline/vfstest.go
  4. +14
    -1
      client/internal/http/auth/auth.go
  5. +74
    -0
      client/internal/http/proxy/proxy.go
  6. +62
    -2
      client/internal/http/server.go
  7. +6
    -4
      client/internal/http/v1/server.go
  8. +18
    -0
      client/internal/rpc/http.go
  9. +4
    -1
      client/internal/rpc/rpc.go
  10. +4
    -4
      common/pkgs/rpc/channel.go
  11. +296
    -25
      common/pkgs/rpc/client/client.pb.go
  12. +21
    -0
      common/pkgs/rpc/client/client.proto
  13. +37
    -0
      common/pkgs/rpc/client/client_grpc.pb.go
  14. +36
    -0
      common/pkgs/rpc/client/http.go
  15. +1
    -0
      common/pkgs/rpc/client/server.go
  16. +4
    -4
      common/pkgs/rpc/utils.go

+ 1
- 1
client/internal/cluster/cluster.go View File

@@ -54,7 +54,7 @@ func (c *Cluster) Start() error {
resp, cerr := cli.GetClusterMasterInfo(ctx, &clirpc.GetClusterMasterInfo{})
cancelFn()
if cerr != nil {
log.Warnf("first report: %v, will retry after 3 seconds", err)
log.Warnf("first report: %v, will retry after 3 seconds", cerr.ToError())
time.Sleep(3 * time.Second)
continue
}


+ 2
- 2
client/internal/cmdline/serve.go View File

@@ -248,12 +248,12 @@ func serveHTTP(configPath string, opts serveHTTPOptions) {
logger.Errorf("build http config: %v", err)
os.Exit(1)
}
httpSvr := http.NewServer(httpCfg, svc)
httpSvr := http.NewServer(httpCfg, svc, clster)
httpChan := httpSvr.Start()
defer httpSvr.Stop()

// RPC接口
rpcSvr := clirpc.NewServer(config.Cfg().RPC, myrpc.NewService(publock, clster), nil)
rpcSvr := clirpc.NewServer(config.Cfg().RPC, myrpc.NewService(publock, clster, httpSvr), nil)
rpcChan := rpcSvr.Start()
defer rpcSvr.Stop()



+ 1
- 1
client/internal/cmdline/vfstest.go View File

@@ -227,7 +227,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) {
logger.Errorf("build http config: %v", err)
os.Exit(1)
}
httpSvr := http.NewServer(httpCfg, svc)
httpSvr := http.NewServer(httpCfg, svc, clster)
httpChan := httpSvr.Start()
defer httpSvr.Stop()



+ 14
- 1
client/internal/http/auth/auth.go View File

@@ -10,8 +10,11 @@ import (
"gitlink.org.cn/cloudream/jcs-pub/common/ecode"
)

type ClusterProxyRequestKey string

const (
ClientInternalSNI = "client.jcs-pub.internal"
ClientInternalSNI = "client.jcs-pub.internal"
ClusterProxyRequest ClusterProxyRequestKey = "ClusterProxyRequest"
)

type Auth struct {
@@ -44,6 +47,11 @@ func (a *Auth) TLSConfigSelector(hello *tls.ClientHelloInfo) (*tls.Config, error
}

func (a *Auth) RejectNoCertAuth(c *gin.Context) {
if c.Request.Context().Value(ClusterProxyRequest) != nil {
c.Next()
return
}

if c.Request.TLS == nil || c.Request.TLS.ServerName != ClientInternalSNI {
c.AbortWithStatusJSON(401, types.Failed(ecode.Unauthorized, "must provide client certificate"))
return
@@ -55,6 +63,11 @@ func (a *Auth) RejectNoCertAuth(c *gin.Context) {
func (a *Auth) Presigned(c *gin.Context) {
log := logger.WithField("HTTP", "Auth")

if c.Request.Context().Value(ClusterProxyRequest) != nil {
c.Next()
return
}

accID := signer.GetAccessKeyID(c.Request.URL)
if accID == "" {
log.Warn("access key id not found in query string")


+ 74
- 0
client/internal/http/proxy/proxy.go View File

@@ -0,0 +1,74 @@
package proxy

import (
"io"

"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/utils/io2"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types"
"gitlink.org.cn/cloudream/jcs-pub/common/ecode"
clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client"
)

type ClusterProxy struct {
clster *cluster.Cluster
}

func NewClusterProxy(clster *cluster.Cluster) *ClusterProxy {
return &ClusterProxy{
clster: clster,
}
}

func (p *ClusterProxy) Proxy(c *gin.Context) {
if p.clster.IsMaster() {
c.Next()
return
}

mstCli := p.clster.MasterClient().Get()
defer mstCli.Release()

preq := clirpc.HTTPProxyRequest{
URI: c.Request.RequestURI,
Method: c.Request.Method,
}

for k, v := range c.Request.Header {
for _, vv := range v {
preq.Header = append(preq.Header, &clirpc.HeaderKV{
Key: k,
Value: vv,
})
}
}

if c.Request.Body != nil {
data, err := io.ReadAll(c.Request.Body)
if err != nil {
c.AbortWithStatusJSON(500, types.Failed(ecode.OperationFailed, "read body: %v", err))
return
}

preq.Body = data
}

resp, err := mstCli.HTTPProxy(c.Request.Context(), &preq)
if err != nil {
c.AbortWithStatusJSON(500, types.Failed(ecode.OperationFailed, "proxy: %v", err))
return
}

c.Status(int(resp.StatusCode))

for _, h := range resp.Header {
c.Header(h.Key, h.Value)
}

if len(resp.Body) > 0 {
io2.WriteAll(c.Writer, resp.Body)
}

c.Abort()
}

+ 62
- 2
client/internal/http/server.go View File

@@ -1,6 +1,7 @@
package http

import (
"bytes"
"context"
"crypto/tls"
"net/http"
@@ -8,10 +9,13 @@ import (
"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/common/pkgs/async"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http/auth"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http/proxy"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types"
v1 "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/v1"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/services"
clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client"
"golang.org/x/net/http2"
)

@@ -30,15 +34,18 @@ type Server struct {
cfg types.Config
httpSrv *http.Server
svc *services.Service
clster *cluster.Cluster
eventChan *ServerEventChan
auth *auth.Auth
proxy *proxy.ClusterProxy
v1Svr *v1.Server
}

func NewServer(cfg types.Config, svc *services.Service) *Server {
func NewServer(cfg types.Config, svc *services.Service, clster *cluster.Cluster) *Server {
return &Server{
cfg: cfg,
svc: svc,
clster: clster,
eventChan: async.NewUnboundChannel[ServerEvent](),
v1Svr: v1.NewServer(&cfg, svc),
}
@@ -56,12 +63,13 @@ func (s *Server) Start() *ServerEventChan {
Handler: engine,
}
s.auth = auth.New(&s.cfg)
s.proxy = proxy.NewClusterProxy(s.clster)
s.httpSrv.TLSConfig = &tls.Config{
GetConfigForClient: s.auth.TLSConfigSelector,
}
http2.ConfigureServer(s.httpSrv, &http2.Server{})

s.v1Svr.InitRouters(engine.Group("/v1"), s.auth)
s.v1Svr.InitRouters(engine.Group("/v1"), s.auth, s.proxy)

logger.Infof("start serving http at: %s", s.cfg.Listen)

@@ -79,3 +87,55 @@ func (s *Server) Stop() {

s.httpSrv.Shutdown(context.Background())
}

func (s *Server) ServeHTTP(ctx context.Context, req *clirpc.HTTPProxyRequest) (*clirpc.HTTPProxyResponse, error) {
ctx2 := context.WithValue(ctx, auth.ClusterProxyRequest, true)

r, err := http.NewRequestWithContext(ctx2, req.Method, req.URI, bytes.NewReader(req.Body))
if err != nil {
return nil, err
}

for _, v := range req.Header {
r.Header.Add(v.Key, v.Value)
}

resp := proxyRespWriter{
header: make(http.Header),
}
s.httpSrv.Handler.ServeHTTP(&resp, r)

var header []*clirpc.HeaderKV
for k, v := range resp.header {
for _, vv := range v {
header = append(header, &clirpc.HeaderKV{
Key: k,
Value: vv,
})
}
}

return &clirpc.HTTPProxyResponse{
StatusCode: int32(resp.statusCode),
Header: header,
Body: resp.body.Bytes(),
}, nil
}

type proxyRespWriter struct {
statusCode int
header http.Header
body bytes.Buffer
}

func (w *proxyRespWriter) Header() http.Header {
return w.header
}

func (w *proxyRespWriter) Write(b []byte) (int, error) {
return w.body.Write(b)
}

func (w *proxyRespWriter) WriteHeader(statusCode int) {
w.statusCode = statusCode
}

+ 6
- 4
client/internal/http/v1/server.go View File

@@ -3,6 +3,7 @@ package http
import (
"github.com/gin-gonic/gin"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http/auth"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http/proxy"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/services"
cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1"
@@ -20,9 +21,10 @@ func NewServer(cfg *types.Config, svc *services.Service) *Server {
}
}

func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth) {
func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth, proxy *proxy.ClusterProxy) {
certAuth := ah.RejectNoCertAuth
signAuth := ah.Presigned
prxy := proxy.Proxy

rt.GET(cliapi.ObjectListPathByPath, certAuth, s.Object().ListByPath)
rt.GET(cliapi.ObjectListByIDsPath, certAuth, s.Object().ListByIDs)
@@ -66,9 +68,9 @@ func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth) {
rt.POST(cliapi.ObjectUploadPartPath, certAuth, s.Object().UploadPart)
rt.POST(cliapi.ObjectCompleteMultipartUploadPath, certAuth, s.Object().CompleteMultipartUpload)

rt.POST(cliapi.SpaceSyncerCreateTaskPath, certAuth, s.SpaceSyncer().CreateTask)
rt.GET(cliapi.SpaceSyncerGetTaskPath, certAuth, s.SpaceSyncer().GetTask)
rt.POST(cliapi.SpaceSyncerCancelTaskPath, certAuth, s.SpaceSyncer().CancelTask)
rt.POST(cliapi.SpaceSyncerCreateTaskPath, certAuth, prxy, s.SpaceSyncer().CreateTask)
rt.GET(cliapi.SpaceSyncerGetTaskPath, certAuth, prxy, s.SpaceSyncer().GetTask)
rt.POST(cliapi.SpaceSyncerCancelTaskPath, certAuth, prxy, s.SpaceSyncer().CancelTask)

rt.GET(cliapi.PresignedObjectListByPathPath, signAuth, s.Presigned().ObjectListByPath)
rt.GET(cliapi.PresignedObjectDownloadByPathPath, signAuth, s.Presigned().ObjectDownloadByPath)


+ 18
- 0
client/internal/rpc/http.go View File

@@ -0,0 +1,18 @@
package rpc

import (
"context"

"gitlink.org.cn/cloudream/jcs-pub/common/ecode"
"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc"
clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client"
)

func (svc *Service) HTTPProxy(ctx context.Context, msg *clirpc.HTTPProxyRequest) (*clirpc.HTTPProxyResponse, *rpc.CodeError) {
resp, err := svc.http.ServeHTTP(ctx, msg)
if err != nil {
return nil, rpc.Failed(ecode.OperationFailed, err.Error())
}

return resp, nil
}

+ 4
- 1
client/internal/rpc/rpc.go View File

@@ -2,6 +2,7 @@ package rpc

import (
"gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/http"
"gitlink.org.cn/cloudream/jcs-pub/client/internal/publock"
clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client"
)
@@ -9,12 +10,14 @@ import (
type Service struct {
pubLock *publock.PubLock
cluster *cluster.Cluster
http *http.Server
}

func NewService(pubLock *publock.PubLock, cluster *cluster.Cluster) *Service {
func NewService(pubLock *publock.PubLock, cluster *cluster.Cluster, http *http.Server) *Service {
return &Service{
pubLock: pubLock,
cluster: cluster,
http: http,
}
}



+ 4
- 4
common/pkgs/rpc/channel.go View File

@@ -83,7 +83,7 @@ func (c *bidChanClient[Recv, Send]) Send(val Send) *CodeError {
if err != nil {
c.cancelFn()
if c.lastErr == nil {
c.lastErr = getCodeError(err)
c.lastErr = ExtractCodeError(err)
}
return c.lastErr
}
@@ -107,7 +107,7 @@ func (c *bidChanClient[Recv, Send]) Receive() (Recv, *CodeError) {
if err != nil {
c.cancelFn()

cerr := getCodeError(err)
cerr := ExtractCodeError(err)
if c.lastErr == nil {
c.lastErr = cerr
}
@@ -187,7 +187,7 @@ func (s *bidChanServer[Recv, Send]) Send(val Send) *CodeError {
s.lock.Lock()

if err != nil {
cerr := getCodeError(err)
cerr := ExtractCodeError(err)
if s.lastErr == nil {
s.lastErr = cerr
s.errChan <- cerr
@@ -212,7 +212,7 @@ func (s *bidChanServer[Recv, Send]) Receive() (Recv, *CodeError) {
s.lock.Lock()

if err != nil {
cerr := getCodeError(err)
cerr := ExtractCodeError(err)
if s.lastErr == nil {
s.lastErr = cerr
s.errChan <- cerr


+ 296
- 25
common/pkgs/rpc/client/client.pb.go View File

@@ -11,6 +11,7 @@ import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)

const (
@@ -20,41 +21,272 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

type HeaderKV struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Key string `protobuf:"bytes,1,opt,name=Key,proto3" json:"Key,omitempty"`
Value string `protobuf:"bytes,2,opt,name=Value,proto3" json:"Value,omitempty"`
}

func (x *HeaderKV) Reset() {
*x = HeaderKV{}
if protoimpl.UnsafeEnabled {
mi := &file_pkgs_rpc_client_client_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *HeaderKV) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*HeaderKV) ProtoMessage() {}

func (x *HeaderKV) ProtoReflect() protoreflect.Message {
mi := &file_pkgs_rpc_client_client_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use HeaderKV.ProtoReflect.Descriptor instead.
func (*HeaderKV) Descriptor() ([]byte, []int) {
return file_pkgs_rpc_client_client_proto_rawDescGZIP(), []int{0}
}

func (x *HeaderKV) GetKey() string {
if x != nil {
return x.Key
}
return ""
}

func (x *HeaderKV) GetValue() string {
if x != nil {
return x.Value
}
return ""
}

type HTTPProxyRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

URI string `protobuf:"bytes,1,opt,name=URI,proto3" json:"URI,omitempty"`
Method string `protobuf:"bytes,2,opt,name=Method,proto3" json:"Method,omitempty"`
Header []*HeaderKV `protobuf:"bytes,3,rep,name=Header,proto3" json:"Header,omitempty"`
Body []byte `protobuf:"bytes,4,opt,name=Body,proto3" json:"Body,omitempty"`
}

func (x *HTTPProxyRequest) Reset() {
*x = HTTPProxyRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_pkgs_rpc_client_client_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *HTTPProxyRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*HTTPProxyRequest) ProtoMessage() {}

func (x *HTTPProxyRequest) ProtoReflect() protoreflect.Message {
mi := &file_pkgs_rpc_client_client_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use HTTPProxyRequest.ProtoReflect.Descriptor instead.
func (*HTTPProxyRequest) Descriptor() ([]byte, []int) {
return file_pkgs_rpc_client_client_proto_rawDescGZIP(), []int{1}
}

func (x *HTTPProxyRequest) GetURI() string {
if x != nil {
return x.URI
}
return ""
}

func (x *HTTPProxyRequest) GetMethod() string {
if x != nil {
return x.Method
}
return ""
}

func (x *HTTPProxyRequest) GetHeader() []*HeaderKV {
if x != nil {
return x.Header
}
return nil
}

func (x *HTTPProxyRequest) GetBody() []byte {
if x != nil {
return x.Body
}
return nil
}

type HTTPProxyResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

StatusCode int32 `protobuf:"varint,1,opt,name=StatusCode,proto3" json:"StatusCode,omitempty"`
Header []*HeaderKV `protobuf:"bytes,2,rep,name=Header,proto3" json:"Header,omitempty"`
Body []byte `protobuf:"bytes,3,opt,name=Body,proto3" json:"Body,omitempty"`
}

func (x *HTTPProxyResponse) Reset() {
*x = HTTPProxyResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_pkgs_rpc_client_client_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}

func (x *HTTPProxyResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}

func (*HTTPProxyResponse) ProtoMessage() {}

func (x *HTTPProxyResponse) ProtoReflect() protoreflect.Message {
mi := &file_pkgs_rpc_client_client_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}

// Deprecated: Use HTTPProxyResponse.ProtoReflect.Descriptor instead.
func (*HTTPProxyResponse) Descriptor() ([]byte, []int) {
return file_pkgs_rpc_client_client_proto_rawDescGZIP(), []int{2}
}

func (x *HTTPProxyResponse) GetStatusCode() int32 {
if x != nil {
return x.StatusCode
}
return 0
}

func (x *HTTPProxyResponse) GetHeader() []*HeaderKV {
if x != nil {
return x.Header
}
return nil
}

func (x *HTTPProxyResponse) GetBody() []byte {
if x != nil {
return x.Body
}
return nil
}

var File_pkgs_rpc_client_client_proto protoreflect.FileDescriptor

var file_pkgs_rpc_client_client_proto_rawDesc = []byte{
0x0a, 0x1c, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e,
0x74, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06,
0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x1a, 0x12, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63,
0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x70, 0x0a, 0x06, 0x43, 0x6c,
0x69, 0x65, 0x6e, 0x74, 0x12, 0x31, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x4c, 0x6f, 0x63, 0x6b, 0x43,
0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x33, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x43, 0x6c,
0x75, 0x73, 0x74, 0x65, 0x72, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12,
0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e,
0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x40, 0x5a, 0x3e,
0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63,
0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62,
0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63,
0x2f, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x3b, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x32, 0x0a, 0x08, 0x48, 0x65,
0x61, 0x64, 0x65, 0x72, 0x4b, 0x56, 0x12, 0x10, 0x0a, 0x03, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x03, 0x4b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75,
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x7a,
0x0a, 0x10, 0x48, 0x54, 0x54, 0x50, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x55, 0x52, 0x49, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x03, 0x55, 0x52, 0x49, 0x12, 0x16, 0x0a, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x02,
0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x28, 0x0a, 0x06,
0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x63,
0x6c, 0x69, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4b, 0x56, 0x52, 0x06,
0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x18, 0x04,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x22, 0x71, 0x0a, 0x11, 0x48, 0x54,
0x54, 0x50, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
0x1e, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x05, 0x52, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x12,
0x28, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32,
0x10, 0x2e, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4b,
0x56, 0x52, 0x06, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x42, 0x6f, 0x64,
0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x32, 0xb2, 0x01,
0x0a, 0x06, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x12, 0x31, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x4c,
0x6f, 0x63, 0x6b, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63,
0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x33, 0x0a, 0x14, 0x47,
0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x49,
0x6e, 0x66, 0x6f, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x40, 0x0a, 0x09, 0x48, 0x54, 0x54, 0x50, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x12, 0x18, 0x2e,
0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x54, 0x54, 0x50, 0x50, 0x72, 0x6f, 0x78, 0x79,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63,
0x2e, 0x48, 0x54, 0x54, 0x50, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x6f, 0x72,
0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x6a,
0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b,
0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x3b, 0x63, 0x6c,
0x69, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}

var (
file_pkgs_rpc_client_client_proto_rawDescOnce sync.Once
file_pkgs_rpc_client_client_proto_rawDescData = file_pkgs_rpc_client_client_proto_rawDesc
)

func file_pkgs_rpc_client_client_proto_rawDescGZIP() []byte {
file_pkgs_rpc_client_client_proto_rawDescOnce.Do(func() {
file_pkgs_rpc_client_client_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkgs_rpc_client_client_proto_rawDescData)
})
return file_pkgs_rpc_client_client_proto_rawDescData
}

var file_pkgs_rpc_client_client_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_pkgs_rpc_client_client_proto_goTypes = []any{
(*rpc.Request)(nil), // 0: rpc.Request
(*rpc.Response)(nil), // 1: rpc.Response
(*HeaderKV)(nil), // 0: clirpc.HeaderKV
(*HTTPProxyRequest)(nil), // 1: clirpc.HTTPProxyRequest
(*HTTPProxyResponse)(nil), // 2: clirpc.HTTPProxyResponse
(*rpc.Request)(nil), // 3: rpc.Request
(*rpc.Response)(nil), // 4: rpc.Response
}
var file_pkgs_rpc_client_client_proto_depIdxs = []int32{
0, // 0: clirpc.Client.PubLockChannel:input_type -> rpc.Request
0, // 1: clirpc.Client.GetClusterMasterInfo:input_type -> rpc.Request
1, // 2: clirpc.Client.PubLockChannel:output_type -> rpc.Response
1, // 3: clirpc.Client.GetClusterMasterInfo:output_type -> rpc.Response
2, // [2:4] is the sub-list for method output_type
0, // [0:2] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
0, // 0: clirpc.HTTPProxyRequest.Header:type_name -> clirpc.HeaderKV
0, // 1: clirpc.HTTPProxyResponse.Header:type_name -> clirpc.HeaderKV
3, // 2: clirpc.Client.PubLockChannel:input_type -> rpc.Request
3, // 3: clirpc.Client.GetClusterMasterInfo:input_type -> rpc.Request
1, // 4: clirpc.Client.HTTPProxy:input_type -> clirpc.HTTPProxyRequest
4, // 5: clirpc.Client.PubLockChannel:output_type -> rpc.Response
4, // 6: clirpc.Client.GetClusterMasterInfo:output_type -> rpc.Response
2, // 7: clirpc.Client.HTTPProxy:output_type -> clirpc.HTTPProxyResponse
5, // [5:8] is the sub-list for method output_type
2, // [2:5] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name
2, // [2:2] is the sub-list for extension extendee
0, // [0:2] is the sub-list for field type_name
}

func init() { file_pkgs_rpc_client_client_proto_init() }
@@ -62,18 +294,57 @@ func file_pkgs_rpc_client_client_proto_init() {
if File_pkgs_rpc_client_client_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_pkgs_rpc_client_client_proto_msgTypes[0].Exporter = func(v any, i int) any {
switch v := v.(*HeaderKV); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pkgs_rpc_client_client_proto_msgTypes[1].Exporter = func(v any, i int) any {
switch v := v.(*HTTPProxyRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_pkgs_rpc_client_client_proto_msgTypes[2].Exporter = func(v any, i int) any {
switch v := v.(*HTTPProxyResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pkgs_rpc_client_client_proto_rawDesc,
NumEnums: 0,
NumMessages: 0,
NumMessages: 3,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pkgs_rpc_client_client_proto_goTypes,
DependencyIndexes: file_pkgs_rpc_client_client_proto_depIdxs,
MessageInfos: file_pkgs_rpc_client_client_proto_msgTypes,
}.Build()
File_pkgs_rpc_client_client_proto = out.File
file_pkgs_rpc_client_client_proto_rawDesc = nil


+ 21
- 0
common/pkgs/rpc/client/client.proto View File

@@ -6,8 +6,29 @@ package clirpc;

option go_package = "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/clirpc;clirpc";

message HeaderKV {
string Key = 1;
string Value = 2;
}

message HTTPProxyRequest {
string URI = 1;
string Method = 2;
repeated HeaderKV Header = 3;
bytes Body = 4;
}

message HTTPProxyResponse {
int32 StatusCode = 1;
repeated HeaderKV Header = 2;
bytes Body = 3;
}


service Client {
rpc PubLockChannel(stream rpc.Request) returns(stream rpc.Response);

rpc GetClusterMasterInfo(rpc.Request) returns(rpc.Response);

rpc HTTPProxy(HTTPProxyRequest) returns(HTTPProxyResponse);
}

+ 37
- 0
common/pkgs/rpc/client/client_grpc.pb.go View File

@@ -22,6 +22,7 @@ const _ = grpc.SupportPackageIsVersion7
const (
Client_PubLockChannel_FullMethodName = "/clirpc.Client/PubLockChannel"
Client_GetClusterMasterInfo_FullMethodName = "/clirpc.Client/GetClusterMasterInfo"
Client_HTTPProxy_FullMethodName = "/clirpc.Client/HTTPProxy"
)

// ClientClient is the client API for Client service.
@@ -30,6 +31,7 @@ const (
type ClientClient interface {
PubLockChannel(ctx context.Context, opts ...grpc.CallOption) (Client_PubLockChannelClient, error)
GetClusterMasterInfo(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error)
HTTPProxy(ctx context.Context, in *HTTPProxyRequest, opts ...grpc.CallOption) (*HTTPProxyResponse, error)
}

type clientClient struct {
@@ -80,12 +82,22 @@ func (c *clientClient) GetClusterMasterInfo(ctx context.Context, in *rpc.Request
return out, nil
}

func (c *clientClient) HTTPProxy(ctx context.Context, in *HTTPProxyRequest, opts ...grpc.CallOption) (*HTTPProxyResponse, error) {
out := new(HTTPProxyResponse)
err := c.cc.Invoke(ctx, Client_HTTPProxy_FullMethodName, in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}

// ClientServer is the server API for Client service.
// All implementations must embed UnimplementedClientServer
// for forward compatibility
type ClientServer interface {
PubLockChannel(Client_PubLockChannelServer) error
GetClusterMasterInfo(context.Context, *rpc.Request) (*rpc.Response, error)
HTTPProxy(context.Context, *HTTPProxyRequest) (*HTTPProxyResponse, error)
mustEmbedUnimplementedClientServer()
}

@@ -99,6 +111,9 @@ func (UnimplementedClientServer) PubLockChannel(Client_PubLockChannelServer) err
func (UnimplementedClientServer) GetClusterMasterInfo(context.Context, *rpc.Request) (*rpc.Response, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetClusterMasterInfo not implemented")
}
func (UnimplementedClientServer) HTTPProxy(context.Context, *HTTPProxyRequest) (*HTTPProxyResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method HTTPProxy not implemented")
}
func (UnimplementedClientServer) mustEmbedUnimplementedClientServer() {}

// UnsafeClientServer may be embedded to opt out of forward compatibility for this service.
@@ -156,6 +171,24 @@ func _Client_GetClusterMasterInfo_Handler(srv interface{}, ctx context.Context,
return interceptor(ctx, in, info, handler)
}

func _Client_HTTPProxy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HTTPProxyRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ClientServer).HTTPProxy(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: Client_HTTPProxy_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ClientServer).HTTPProxy(ctx, req.(*HTTPProxyRequest))
}
return interceptor(ctx, in, info, handler)
}

// Client_ServiceDesc is the grpc.ServiceDesc for Client service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -167,6 +200,10 @@ var Client_ServiceDesc = grpc.ServiceDesc{
MethodName: "GetClusterMasterInfo",
Handler: _Client_GetClusterMasterInfo_Handler,
},
{
MethodName: "HTTPProxy",
Handler: _Client_HTTPProxy_Handler,
},
},
Streams: []grpc.StreamDesc{
{


+ 36
- 0
common/pkgs/rpc/client/http.go View File

@@ -0,0 +1,36 @@
package clirpc

import (
"context"

"gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc"
)

type HTTPService interface {
HTTPProxy(ctx context.Context, req *HTTPProxy) (*HTTPProxyResp, *rpc.CodeError)
}

type HTTPProxy = HTTPProxyRequest

type HTTPProxyResp = HTTPProxyResponse

func (c *Client) HTTPProxy(ctx context.Context, msg *HTTPProxy) (*HTTPProxyResp, *rpc.CodeError) {
if c.fusedErr != nil {
return nil, c.fusedErr
}

resp, err := c.cli.HTTPProxy(ctx, msg)
if err != nil {
return nil, rpc.ExtractCodeError(err)
}

return resp, nil
}

func (s *Server) HTTPProxy(ctx context.Context, msg *HTTPProxyRequest) (*HTTPProxyResponse, error) {
resp, err := s.svrImpl.HTTPProxy(ctx, msg)
if err != nil {
return nil, rpc.WrapCodeError(err)
}
return resp, nil
}

+ 1
- 0
common/pkgs/rpc/client/server.go View File

@@ -7,6 +7,7 @@ import (
type ClientAPI interface {
ClusterService
PubLockService
HTTPService
}

type Server struct {


+ 4
- 4
common/pkgs/rpc/utils.go View File

@@ -26,7 +26,7 @@ func UnaryClient[Resp, Req any](apiFn func(context.Context, *Request, ...grpc.Ca
})
if err != nil {
var resp Resp
return resp, getCodeError(err)
return resp, ExtractCodeError(err)
}

ret, err := serder.JSONToObjectEx[Resp](resp.Payload)
@@ -86,7 +86,7 @@ func UploadStreamClient[Resp any, Req UploadStreamReq, APIRet UploadStreamAPICli

cli, err := apiFn(ctx2)
if err != nil {
return ret, getCodeError(err)
return ret, ExtractCodeError(err)
}

cw := NewChunkedWriter(cli)
@@ -178,7 +178,7 @@ func DownloadStreamClient[Resp DownloadStreamResp, Req any, APIRet DownloadStrea
cli, err := apiFn(ctx2, &Request{Payload: data})
if err != nil {
cancelFn()
return ret, getCodeError(err)
return ret, ExtractCodeError(err)
}

cr := NewChunkedReader(cli)
@@ -301,7 +301,7 @@ func (c *CodeError) ToError() error {
return &ErrorCodeError{CE: c}
}

func getCodeError(err error) *CodeError {
func ExtractCodeError(err error) *CodeError {
status, ok := status.FromError(err)

if ok {


Loading…
Cancel
Save