diff --git a/client/internal/cluster/cluster.go b/client/internal/cluster/cluster.go index 493259e..f7d9fae 100644 --- a/client/internal/cluster/cluster.go +++ b/client/internal/cluster/cluster.go @@ -54,7 +54,7 @@ func (c *Cluster) Start() error { resp, cerr := cli.GetClusterMasterInfo(ctx, &clirpc.GetClusterMasterInfo{}) cancelFn() if cerr != nil { - log.Warnf("first report: %v, will retry after 3 seconds", err) + log.Warnf("first report: %v, will retry after 3 seconds", cerr.ToError()) time.Sleep(3 * time.Second) continue } diff --git a/client/internal/cmdline/serve.go b/client/internal/cmdline/serve.go index 9f6a6bd..e0808b4 100644 --- a/client/internal/cmdline/serve.go +++ b/client/internal/cmdline/serve.go @@ -248,12 +248,12 @@ func serveHTTP(configPath string, opts serveHTTPOptions) { logger.Errorf("build http config: %v", err) os.Exit(1) } - httpSvr := http.NewServer(httpCfg, svc) + httpSvr := http.NewServer(httpCfg, svc, clster) httpChan := httpSvr.Start() defer httpSvr.Stop() // RPC接口 - rpcSvr := clirpc.NewServer(config.Cfg().RPC, myrpc.NewService(publock, clster), nil) + rpcSvr := clirpc.NewServer(config.Cfg().RPC, myrpc.NewService(publock, clster, httpSvr), nil) rpcChan := rpcSvr.Start() defer rpcSvr.Stop() diff --git a/client/internal/cmdline/vfstest.go b/client/internal/cmdline/vfstest.go index 238c89f..88f2237 100644 --- a/client/internal/cmdline/vfstest.go +++ b/client/internal/cmdline/vfstest.go @@ -227,7 +227,7 @@ func vfsTest(configPath string, opts serveHTTPOptions) { logger.Errorf("build http config: %v", err) os.Exit(1) } - httpSvr := http.NewServer(httpCfg, svc) + httpSvr := http.NewServer(httpCfg, svc, clster) httpChan := httpSvr.Start() defer httpSvr.Stop() diff --git a/client/internal/http/auth/auth.go b/client/internal/http/auth/auth.go index a65a053..faee49d 100644 --- a/client/internal/http/auth/auth.go +++ b/client/internal/http/auth/auth.go @@ -10,8 +10,11 @@ import ( "gitlink.org.cn/cloudream/jcs-pub/common/ecode" ) +type ClusterProxyRequestKey string + const ( - ClientInternalSNI = "client.jcs-pub.internal" + ClientInternalSNI = "client.jcs-pub.internal" + ClusterProxyRequest ClusterProxyRequestKey = "ClusterProxyRequest" ) type Auth struct { @@ -44,6 +47,11 @@ func (a *Auth) TLSConfigSelector(hello *tls.ClientHelloInfo) (*tls.Config, error } func (a *Auth) RejectNoCertAuth(c *gin.Context) { + if c.Request.Context().Value(ClusterProxyRequest) != nil { + c.Next() + return + } + if c.Request.TLS == nil || c.Request.TLS.ServerName != ClientInternalSNI { c.AbortWithStatusJSON(401, types.Failed(ecode.Unauthorized, "must provide client certificate")) return @@ -55,6 +63,11 @@ func (a *Auth) RejectNoCertAuth(c *gin.Context) { func (a *Auth) Presigned(c *gin.Context) { log := logger.WithField("HTTP", "Auth") + if c.Request.Context().Value(ClusterProxyRequest) != nil { + c.Next() + return + } + accID := signer.GetAccessKeyID(c.Request.URL) if accID == "" { log.Warn("access key id not found in query string") diff --git a/client/internal/http/proxy/proxy.go b/client/internal/http/proxy/proxy.go new file mode 100644 index 0000000..d9a485a --- /dev/null +++ b/client/internal/http/proxy/proxy.go @@ -0,0 +1,74 @@ +package proxy + +import ( + "io" + + "github.com/gin-gonic/gin" + "gitlink.org.cn/cloudream/common/utils/io2" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" + "gitlink.org.cn/cloudream/jcs-pub/common/ecode" + clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" +) + +type ClusterProxy struct { + clster *cluster.Cluster +} + +func NewClusterProxy(clster *cluster.Cluster) *ClusterProxy { + return &ClusterProxy{ + clster: clster, + } +} + +func (p *ClusterProxy) Proxy(c *gin.Context) { + if p.clster.IsMaster() { + c.Next() + return + } + + mstCli := p.clster.MasterClient().Get() + defer mstCli.Release() + + preq := clirpc.HTTPProxyRequest{ + URI: c.Request.RequestURI, + Method: c.Request.Method, + } + + for k, v := range c.Request.Header { + for _, vv := range v { + preq.Header = append(preq.Header, &clirpc.HeaderKV{ + Key: k, + Value: vv, + }) + } + } + + if c.Request.Body != nil { + data, err := io.ReadAll(c.Request.Body) + if err != nil { + c.AbortWithStatusJSON(500, types.Failed(ecode.OperationFailed, "read body: %v", err)) + return + } + + preq.Body = data + } + + resp, err := mstCli.HTTPProxy(c.Request.Context(), &preq) + if err != nil { + c.AbortWithStatusJSON(500, types.Failed(ecode.OperationFailed, "proxy: %v", err)) + return + } + + c.Status(int(resp.StatusCode)) + + for _, h := range resp.Header { + c.Header(h.Key, h.Value) + } + + if len(resp.Body) > 0 { + io2.WriteAll(c.Writer, resp.Body) + } + + c.Abort() +} diff --git a/client/internal/http/server.go b/client/internal/http/server.go index ef8d3c1..00b4fde 100644 --- a/client/internal/http/server.go +++ b/client/internal/http/server.go @@ -1,6 +1,7 @@ package http import ( + "bytes" "context" "crypto/tls" "net/http" @@ -8,10 +9,13 @@ import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/common/pkgs/async" "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/auth" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/proxy" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" v1 "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/v1" "gitlink.org.cn/cloudream/jcs-pub/client/internal/services" + clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" "golang.org/x/net/http2" ) @@ -30,15 +34,18 @@ type Server struct { cfg types.Config httpSrv *http.Server svc *services.Service + clster *cluster.Cluster eventChan *ServerEventChan auth *auth.Auth + proxy *proxy.ClusterProxy v1Svr *v1.Server } -func NewServer(cfg types.Config, svc *services.Service) *Server { +func NewServer(cfg types.Config, svc *services.Service, clster *cluster.Cluster) *Server { return &Server{ cfg: cfg, svc: svc, + clster: clster, eventChan: async.NewUnboundChannel[ServerEvent](), v1Svr: v1.NewServer(&cfg, svc), } @@ -56,12 +63,13 @@ func (s *Server) Start() *ServerEventChan { Handler: engine, } s.auth = auth.New(&s.cfg) + s.proxy = proxy.NewClusterProxy(s.clster) s.httpSrv.TLSConfig = &tls.Config{ GetConfigForClient: s.auth.TLSConfigSelector, } http2.ConfigureServer(s.httpSrv, &http2.Server{}) - s.v1Svr.InitRouters(engine.Group("/v1"), s.auth) + s.v1Svr.InitRouters(engine.Group("/v1"), s.auth, s.proxy) logger.Infof("start serving http at: %s", s.cfg.Listen) @@ -79,3 +87,55 @@ func (s *Server) Stop() { s.httpSrv.Shutdown(context.Background()) } + +func (s *Server) ServeHTTP(ctx context.Context, req *clirpc.HTTPProxyRequest) (*clirpc.HTTPProxyResponse, error) { + ctx2 := context.WithValue(ctx, auth.ClusterProxyRequest, true) + + r, err := http.NewRequestWithContext(ctx2, req.Method, req.URI, bytes.NewReader(req.Body)) + if err != nil { + return nil, err + } + + for _, v := range req.Header { + r.Header.Add(v.Key, v.Value) + } + + resp := proxyRespWriter{ + header: make(http.Header), + } + s.httpSrv.Handler.ServeHTTP(&resp, r) + + var header []*clirpc.HeaderKV + for k, v := range resp.header { + for _, vv := range v { + header = append(header, &clirpc.HeaderKV{ + Key: k, + Value: vv, + }) + } + } + + return &clirpc.HTTPProxyResponse{ + StatusCode: int32(resp.statusCode), + Header: header, + Body: resp.body.Bytes(), + }, nil +} + +type proxyRespWriter struct { + statusCode int + header http.Header + body bytes.Buffer +} + +func (w *proxyRespWriter) Header() http.Header { + return w.header +} + +func (w *proxyRespWriter) Write(b []byte) (int, error) { + return w.body.Write(b) +} + +func (w *proxyRespWriter) WriteHeader(statusCode int) { + w.statusCode = statusCode +} diff --git a/client/internal/http/v1/server.go b/client/internal/http/v1/server.go index 5f2c81d..1b6f28a 100644 --- a/client/internal/http/v1/server.go +++ b/client/internal/http/v1/server.go @@ -3,6 +3,7 @@ package http import ( "github.com/gin-gonic/gin" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/auth" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/proxy" "gitlink.org.cn/cloudream/jcs-pub/client/internal/http/types" "gitlink.org.cn/cloudream/jcs-pub/client/internal/services" cliapi "gitlink.org.cn/cloudream/jcs-pub/client/sdk/api/v1" @@ -20,9 +21,10 @@ func NewServer(cfg *types.Config, svc *services.Service) *Server { } } -func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth) { +func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth, proxy *proxy.ClusterProxy) { certAuth := ah.RejectNoCertAuth signAuth := ah.Presigned + prxy := proxy.Proxy rt.GET(cliapi.ObjectListPathByPath, certAuth, s.Object().ListByPath) rt.GET(cliapi.ObjectListByIDsPath, certAuth, s.Object().ListByIDs) @@ -66,9 +68,9 @@ func (s *Server) InitRouters(rt gin.IRoutes, ah *auth.Auth) { rt.POST(cliapi.ObjectUploadPartPath, certAuth, s.Object().UploadPart) rt.POST(cliapi.ObjectCompleteMultipartUploadPath, certAuth, s.Object().CompleteMultipartUpload) - rt.POST(cliapi.SpaceSyncerCreateTaskPath, certAuth, s.SpaceSyncer().CreateTask) - rt.GET(cliapi.SpaceSyncerGetTaskPath, certAuth, s.SpaceSyncer().GetTask) - rt.POST(cliapi.SpaceSyncerCancelTaskPath, certAuth, s.SpaceSyncer().CancelTask) + rt.POST(cliapi.SpaceSyncerCreateTaskPath, certAuth, prxy, s.SpaceSyncer().CreateTask) + rt.GET(cliapi.SpaceSyncerGetTaskPath, certAuth, prxy, s.SpaceSyncer().GetTask) + rt.POST(cliapi.SpaceSyncerCancelTaskPath, certAuth, prxy, s.SpaceSyncer().CancelTask) rt.GET(cliapi.PresignedObjectListByPathPath, signAuth, s.Presigned().ObjectListByPath) rt.GET(cliapi.PresignedObjectDownloadByPathPath, signAuth, s.Presigned().ObjectDownloadByPath) diff --git a/client/internal/rpc/http.go b/client/internal/rpc/http.go new file mode 100644 index 0000000..e142dca --- /dev/null +++ b/client/internal/rpc/http.go @@ -0,0 +1,18 @@ +package rpc + +import ( + "context" + + "gitlink.org.cn/cloudream/jcs-pub/common/ecode" + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" + clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" +) + +func (svc *Service) HTTPProxy(ctx context.Context, msg *clirpc.HTTPProxyRequest) (*clirpc.HTTPProxyResponse, *rpc.CodeError) { + resp, err := svc.http.ServeHTTP(ctx, msg) + if err != nil { + return nil, rpc.Failed(ecode.OperationFailed, err.Error()) + } + + return resp, nil +} diff --git a/client/internal/rpc/rpc.go b/client/internal/rpc/rpc.go index 280b9f3..c2fceff 100644 --- a/client/internal/rpc/rpc.go +++ b/client/internal/rpc/rpc.go @@ -2,6 +2,7 @@ package rpc import ( "gitlink.org.cn/cloudream/jcs-pub/client/internal/cluster" + "gitlink.org.cn/cloudream/jcs-pub/client/internal/http" "gitlink.org.cn/cloudream/jcs-pub/client/internal/publock" clirpc "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/client" ) @@ -9,12 +10,14 @@ import ( type Service struct { pubLock *publock.PubLock cluster *cluster.Cluster + http *http.Server } -func NewService(pubLock *publock.PubLock, cluster *cluster.Cluster) *Service { +func NewService(pubLock *publock.PubLock, cluster *cluster.Cluster, http *http.Server) *Service { return &Service{ pubLock: pubLock, cluster: cluster, + http: http, } } diff --git a/common/pkgs/rpc/channel.go b/common/pkgs/rpc/channel.go index 2424c92..5c6fad6 100644 --- a/common/pkgs/rpc/channel.go +++ b/common/pkgs/rpc/channel.go @@ -83,7 +83,7 @@ func (c *bidChanClient[Recv, Send]) Send(val Send) *CodeError { if err != nil { c.cancelFn() if c.lastErr == nil { - c.lastErr = getCodeError(err) + c.lastErr = ExtractCodeError(err) } return c.lastErr } @@ -107,7 +107,7 @@ func (c *bidChanClient[Recv, Send]) Receive() (Recv, *CodeError) { if err != nil { c.cancelFn() - cerr := getCodeError(err) + cerr := ExtractCodeError(err) if c.lastErr == nil { c.lastErr = cerr } @@ -187,7 +187,7 @@ func (s *bidChanServer[Recv, Send]) Send(val Send) *CodeError { s.lock.Lock() if err != nil { - cerr := getCodeError(err) + cerr := ExtractCodeError(err) if s.lastErr == nil { s.lastErr = cerr s.errChan <- cerr @@ -212,7 +212,7 @@ func (s *bidChanServer[Recv, Send]) Receive() (Recv, *CodeError) { s.lock.Lock() if err != nil { - cerr := getCodeError(err) + cerr := ExtractCodeError(err) if s.lastErr == nil { s.lastErr = cerr s.errChan <- cerr diff --git a/common/pkgs/rpc/client/client.pb.go b/common/pkgs/rpc/client/client.pb.go index d51a97b..8a761a9 100644 --- a/common/pkgs/rpc/client/client.pb.go +++ b/common/pkgs/rpc/client/client.pb.go @@ -11,6 +11,7 @@ import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" + sync "sync" ) const ( @@ -20,41 +21,272 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type HeaderKV struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key string `protobuf:"bytes,1,opt,name=Key,proto3" json:"Key,omitempty"` + Value string `protobuf:"bytes,2,opt,name=Value,proto3" json:"Value,omitempty"` +} + +func (x *HeaderKV) Reset() { + *x = HeaderKV{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_rpc_client_client_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HeaderKV) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HeaderKV) ProtoMessage() {} + +func (x *HeaderKV) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_rpc_client_client_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HeaderKV.ProtoReflect.Descriptor instead. +func (*HeaderKV) Descriptor() ([]byte, []int) { + return file_pkgs_rpc_client_client_proto_rawDescGZIP(), []int{0} +} + +func (x *HeaderKV) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +func (x *HeaderKV) GetValue() string { + if x != nil { + return x.Value + } + return "" +} + +type HTTPProxyRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + URI string `protobuf:"bytes,1,opt,name=URI,proto3" json:"URI,omitempty"` + Method string `protobuf:"bytes,2,opt,name=Method,proto3" json:"Method,omitempty"` + Header []*HeaderKV `protobuf:"bytes,3,rep,name=Header,proto3" json:"Header,omitempty"` + Body []byte `protobuf:"bytes,4,opt,name=Body,proto3" json:"Body,omitempty"` +} + +func (x *HTTPProxyRequest) Reset() { + *x = HTTPProxyRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_rpc_client_client_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HTTPProxyRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HTTPProxyRequest) ProtoMessage() {} + +func (x *HTTPProxyRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_rpc_client_client_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HTTPProxyRequest.ProtoReflect.Descriptor instead. +func (*HTTPProxyRequest) Descriptor() ([]byte, []int) { + return file_pkgs_rpc_client_client_proto_rawDescGZIP(), []int{1} +} + +func (x *HTTPProxyRequest) GetURI() string { + if x != nil { + return x.URI + } + return "" +} + +func (x *HTTPProxyRequest) GetMethod() string { + if x != nil { + return x.Method + } + return "" +} + +func (x *HTTPProxyRequest) GetHeader() []*HeaderKV { + if x != nil { + return x.Header + } + return nil +} + +func (x *HTTPProxyRequest) GetBody() []byte { + if x != nil { + return x.Body + } + return nil +} + +type HTTPProxyResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StatusCode int32 `protobuf:"varint,1,opt,name=StatusCode,proto3" json:"StatusCode,omitempty"` + Header []*HeaderKV `protobuf:"bytes,2,rep,name=Header,proto3" json:"Header,omitempty"` + Body []byte `protobuf:"bytes,3,opt,name=Body,proto3" json:"Body,omitempty"` +} + +func (x *HTTPProxyResponse) Reset() { + *x = HTTPProxyResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkgs_rpc_client_client_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HTTPProxyResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HTTPProxyResponse) ProtoMessage() {} + +func (x *HTTPProxyResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkgs_rpc_client_client_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HTTPProxyResponse.ProtoReflect.Descriptor instead. +func (*HTTPProxyResponse) Descriptor() ([]byte, []int) { + return file_pkgs_rpc_client_client_proto_rawDescGZIP(), []int{2} +} + +func (x *HTTPProxyResponse) GetStatusCode() int32 { + if x != nil { + return x.StatusCode + } + return 0 +} + +func (x *HTTPProxyResponse) GetHeader() []*HeaderKV { + if x != nil { + return x.Header + } + return nil +} + +func (x *HTTPProxyResponse) GetBody() []byte { + if x != nil { + return x.Body + } + return nil +} + var File_pkgs_rpc_client_client_proto protoreflect.FileDescriptor var file_pkgs_rpc_client_client_proto_rawDesc = []byte{ 0x0a, 0x1c, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2f, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x1a, 0x12, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, - 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x70, 0x0a, 0x06, 0x43, 0x6c, - 0x69, 0x65, 0x6e, 0x74, 0x12, 0x31, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x4c, 0x6f, 0x63, 0x6b, 0x43, - 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x33, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x43, 0x6c, - 0x75, 0x73, 0x74, 0x65, 0x72, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, - 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, - 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x40, 0x5a, 0x3e, - 0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x6f, 0x72, 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, - 0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x6a, 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, - 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, - 0x2f, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x3b, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x2f, 0x72, 0x70, 0x63, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x32, 0x0a, 0x08, 0x48, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x4b, 0x56, 0x12, 0x10, 0x0a, 0x03, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x03, 0x4b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x22, 0x7a, + 0x0a, 0x10, 0x48, 0x54, 0x54, 0x50, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x55, 0x52, 0x49, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x03, 0x55, 0x52, 0x49, 0x12, 0x16, 0x0a, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x28, 0x0a, 0x06, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x63, + 0x6c, 0x69, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4b, 0x56, 0x52, 0x06, + 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x22, 0x71, 0x0a, 0x11, 0x48, 0x54, + 0x54, 0x50, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x1e, 0x0a, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x0a, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x43, 0x6f, 0x64, 0x65, 0x12, + 0x28, 0x0a, 0x06, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x10, 0x2e, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x4b, + 0x56, 0x52, 0x06, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x42, 0x6f, 0x64, + 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x32, 0xb2, 0x01, + 0x0a, 0x06, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x12, 0x31, 0x0a, 0x0e, 0x50, 0x75, 0x62, 0x4c, + 0x6f, 0x63, 0x6b, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, + 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x33, 0x0a, 0x14, 0x47, + 0x65, 0x74, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x49, + 0x6e, 0x66, 0x6f, 0x12, 0x0c, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x0d, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x40, 0x0a, 0x09, 0x48, 0x54, 0x54, 0x50, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x12, 0x18, 0x2e, + 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x2e, 0x48, 0x54, 0x54, 0x50, 0x50, 0x72, 0x6f, 0x78, 0x79, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, + 0x2e, 0x48, 0x54, 0x54, 0x50, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x6c, 0x69, 0x6e, 0x6b, 0x2e, 0x6f, 0x72, + 0x67, 0x2e, 0x63, 0x6e, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x6a, + 0x63, 0x73, 0x2d, 0x70, 0x75, 0x62, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x70, 0x6b, + 0x67, 0x73, 0x2f, 0x72, 0x70, 0x63, 0x2f, 0x63, 0x6c, 0x69, 0x72, 0x70, 0x63, 0x3b, 0x63, 0x6c, + 0x69, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } +var ( + file_pkgs_rpc_client_client_proto_rawDescOnce sync.Once + file_pkgs_rpc_client_client_proto_rawDescData = file_pkgs_rpc_client_client_proto_rawDesc +) + +func file_pkgs_rpc_client_client_proto_rawDescGZIP() []byte { + file_pkgs_rpc_client_client_proto_rawDescOnce.Do(func() { + file_pkgs_rpc_client_client_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkgs_rpc_client_client_proto_rawDescData) + }) + return file_pkgs_rpc_client_client_proto_rawDescData +} + +var file_pkgs_rpc_client_client_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_pkgs_rpc_client_client_proto_goTypes = []any{ - (*rpc.Request)(nil), // 0: rpc.Request - (*rpc.Response)(nil), // 1: rpc.Response + (*HeaderKV)(nil), // 0: clirpc.HeaderKV + (*HTTPProxyRequest)(nil), // 1: clirpc.HTTPProxyRequest + (*HTTPProxyResponse)(nil), // 2: clirpc.HTTPProxyResponse + (*rpc.Request)(nil), // 3: rpc.Request + (*rpc.Response)(nil), // 4: rpc.Response } var file_pkgs_rpc_client_client_proto_depIdxs = []int32{ - 0, // 0: clirpc.Client.PubLockChannel:input_type -> rpc.Request - 0, // 1: clirpc.Client.GetClusterMasterInfo:input_type -> rpc.Request - 1, // 2: clirpc.Client.PubLockChannel:output_type -> rpc.Response - 1, // 3: clirpc.Client.GetClusterMasterInfo:output_type -> rpc.Response - 2, // [2:4] is the sub-list for method output_type - 0, // [0:2] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 0, // 0: clirpc.HTTPProxyRequest.Header:type_name -> clirpc.HeaderKV + 0, // 1: clirpc.HTTPProxyResponse.Header:type_name -> clirpc.HeaderKV + 3, // 2: clirpc.Client.PubLockChannel:input_type -> rpc.Request + 3, // 3: clirpc.Client.GetClusterMasterInfo:input_type -> rpc.Request + 1, // 4: clirpc.Client.HTTPProxy:input_type -> clirpc.HTTPProxyRequest + 4, // 5: clirpc.Client.PubLockChannel:output_type -> rpc.Response + 4, // 6: clirpc.Client.GetClusterMasterInfo:output_type -> rpc.Response + 2, // 7: clirpc.Client.HTTPProxy:output_type -> clirpc.HTTPProxyResponse + 5, // [5:8] is the sub-list for method output_type + 2, // [2:5] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_pkgs_rpc_client_client_proto_init() } @@ -62,18 +294,57 @@ func file_pkgs_rpc_client_client_proto_init() { if File_pkgs_rpc_client_client_proto != nil { return } + if !protoimpl.UnsafeEnabled { + file_pkgs_rpc_client_client_proto_msgTypes[0].Exporter = func(v any, i int) any { + switch v := v.(*HeaderKV); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkgs_rpc_client_client_proto_msgTypes[1].Exporter = func(v any, i int) any { + switch v := v.(*HTTPProxyRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkgs_rpc_client_client_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*HTTPProxyResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkgs_rpc_client_client_proto_rawDesc, NumEnums: 0, - NumMessages: 0, + NumMessages: 3, NumExtensions: 0, NumServices: 1, }, GoTypes: file_pkgs_rpc_client_client_proto_goTypes, DependencyIndexes: file_pkgs_rpc_client_client_proto_depIdxs, + MessageInfos: file_pkgs_rpc_client_client_proto_msgTypes, }.Build() File_pkgs_rpc_client_client_proto = out.File file_pkgs_rpc_client_client_proto_rawDesc = nil diff --git a/common/pkgs/rpc/client/client.proto b/common/pkgs/rpc/client/client.proto index 3091b0d..d6e35d6 100644 --- a/common/pkgs/rpc/client/client.proto +++ b/common/pkgs/rpc/client/client.proto @@ -6,8 +6,29 @@ package clirpc; option go_package = "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc/clirpc;clirpc"; +message HeaderKV { + string Key = 1; + string Value = 2; +} + +message HTTPProxyRequest { + string URI = 1; + string Method = 2; + repeated HeaderKV Header = 3; + bytes Body = 4; +} + +message HTTPProxyResponse { + int32 StatusCode = 1; + repeated HeaderKV Header = 2; + bytes Body = 3; +} + + service Client { rpc PubLockChannel(stream rpc.Request) returns(stream rpc.Response); rpc GetClusterMasterInfo(rpc.Request) returns(rpc.Response); + + rpc HTTPProxy(HTTPProxyRequest) returns(HTTPProxyResponse); } \ No newline at end of file diff --git a/common/pkgs/rpc/client/client_grpc.pb.go b/common/pkgs/rpc/client/client_grpc.pb.go index 11bca54..9f942bc 100644 --- a/common/pkgs/rpc/client/client_grpc.pb.go +++ b/common/pkgs/rpc/client/client_grpc.pb.go @@ -22,6 +22,7 @@ const _ = grpc.SupportPackageIsVersion7 const ( Client_PubLockChannel_FullMethodName = "/clirpc.Client/PubLockChannel" Client_GetClusterMasterInfo_FullMethodName = "/clirpc.Client/GetClusterMasterInfo" + Client_HTTPProxy_FullMethodName = "/clirpc.Client/HTTPProxy" ) // ClientClient is the client API for Client service. @@ -30,6 +31,7 @@ const ( type ClientClient interface { PubLockChannel(ctx context.Context, opts ...grpc.CallOption) (Client_PubLockChannelClient, error) GetClusterMasterInfo(ctx context.Context, in *rpc.Request, opts ...grpc.CallOption) (*rpc.Response, error) + HTTPProxy(ctx context.Context, in *HTTPProxyRequest, opts ...grpc.CallOption) (*HTTPProxyResponse, error) } type clientClient struct { @@ -80,12 +82,22 @@ func (c *clientClient) GetClusterMasterInfo(ctx context.Context, in *rpc.Request return out, nil } +func (c *clientClient) HTTPProxy(ctx context.Context, in *HTTPProxyRequest, opts ...grpc.CallOption) (*HTTPProxyResponse, error) { + out := new(HTTPProxyResponse) + err := c.cc.Invoke(ctx, Client_HTTPProxy_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ClientServer is the server API for Client service. // All implementations must embed UnimplementedClientServer // for forward compatibility type ClientServer interface { PubLockChannel(Client_PubLockChannelServer) error GetClusterMasterInfo(context.Context, *rpc.Request) (*rpc.Response, error) + HTTPProxy(context.Context, *HTTPProxyRequest) (*HTTPProxyResponse, error) mustEmbedUnimplementedClientServer() } @@ -99,6 +111,9 @@ func (UnimplementedClientServer) PubLockChannel(Client_PubLockChannelServer) err func (UnimplementedClientServer) GetClusterMasterInfo(context.Context, *rpc.Request) (*rpc.Response, error) { return nil, status.Errorf(codes.Unimplemented, "method GetClusterMasterInfo not implemented") } +func (UnimplementedClientServer) HTTPProxy(context.Context, *HTTPProxyRequest) (*HTTPProxyResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method HTTPProxy not implemented") +} func (UnimplementedClientServer) mustEmbedUnimplementedClientServer() {} // UnsafeClientServer may be embedded to opt out of forward compatibility for this service. @@ -156,6 +171,24 @@ func _Client_GetClusterMasterInfo_Handler(srv interface{}, ctx context.Context, return interceptor(ctx, in, info, handler) } +func _Client_HTTPProxy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HTTPProxyRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ClientServer).HTTPProxy(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Client_HTTPProxy_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ClientServer).HTTPProxy(ctx, req.(*HTTPProxyRequest)) + } + return interceptor(ctx, in, info, handler) +} + // Client_ServiceDesc is the grpc.ServiceDesc for Client service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -167,6 +200,10 @@ var Client_ServiceDesc = grpc.ServiceDesc{ MethodName: "GetClusterMasterInfo", Handler: _Client_GetClusterMasterInfo_Handler, }, + { + MethodName: "HTTPProxy", + Handler: _Client_HTTPProxy_Handler, + }, }, Streams: []grpc.StreamDesc{ { diff --git a/common/pkgs/rpc/client/http.go b/common/pkgs/rpc/client/http.go new file mode 100644 index 0000000..0b7c9dc --- /dev/null +++ b/common/pkgs/rpc/client/http.go @@ -0,0 +1,36 @@ +package clirpc + +import ( + "context" + + "gitlink.org.cn/cloudream/jcs-pub/common/pkgs/rpc" +) + +type HTTPService interface { + HTTPProxy(ctx context.Context, req *HTTPProxy) (*HTTPProxyResp, *rpc.CodeError) +} + +type HTTPProxy = HTTPProxyRequest + +type HTTPProxyResp = HTTPProxyResponse + +func (c *Client) HTTPProxy(ctx context.Context, msg *HTTPProxy) (*HTTPProxyResp, *rpc.CodeError) { + if c.fusedErr != nil { + return nil, c.fusedErr + } + + resp, err := c.cli.HTTPProxy(ctx, msg) + if err != nil { + return nil, rpc.ExtractCodeError(err) + } + + return resp, nil +} + +func (s *Server) HTTPProxy(ctx context.Context, msg *HTTPProxyRequest) (*HTTPProxyResponse, error) { + resp, err := s.svrImpl.HTTPProxy(ctx, msg) + if err != nil { + return nil, rpc.WrapCodeError(err) + } + return resp, nil +} diff --git a/common/pkgs/rpc/client/server.go b/common/pkgs/rpc/client/server.go index 9bfaf64..f05fb30 100644 --- a/common/pkgs/rpc/client/server.go +++ b/common/pkgs/rpc/client/server.go @@ -7,6 +7,7 @@ import ( type ClientAPI interface { ClusterService PubLockService + HTTPService } type Server struct { diff --git a/common/pkgs/rpc/utils.go b/common/pkgs/rpc/utils.go index 72d28d9..834f128 100644 --- a/common/pkgs/rpc/utils.go +++ b/common/pkgs/rpc/utils.go @@ -26,7 +26,7 @@ func UnaryClient[Resp, Req any](apiFn func(context.Context, *Request, ...grpc.Ca }) if err != nil { var resp Resp - return resp, getCodeError(err) + return resp, ExtractCodeError(err) } ret, err := serder.JSONToObjectEx[Resp](resp.Payload) @@ -86,7 +86,7 @@ func UploadStreamClient[Resp any, Req UploadStreamReq, APIRet UploadStreamAPICli cli, err := apiFn(ctx2) if err != nil { - return ret, getCodeError(err) + return ret, ExtractCodeError(err) } cw := NewChunkedWriter(cli) @@ -178,7 +178,7 @@ func DownloadStreamClient[Resp DownloadStreamResp, Req any, APIRet DownloadStrea cli, err := apiFn(ctx2, &Request{Payload: data}) if err != nil { cancelFn() - return ret, getCodeError(err) + return ret, ExtractCodeError(err) } cr := NewChunkedReader(cli) @@ -301,7 +301,7 @@ func (c *CodeError) ToError() error { return &ErrorCodeError{CE: c} } -func getCodeError(err error) *CodeError { +func ExtractCodeError(err error) *CodeError { status, ok := status.FromError(err) if ok {