You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

method_handler.h 19 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. /*
  2. *
  3. * Copyright 2015 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #ifndef GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
  19. #define GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H
  20. // IWYU pragma: private, include <grpcpp/support/method_handler.h>
  21. #include <grpcpp/impl/codegen/byte_buffer.h>
  22. #include <grpcpp/impl/codegen/core_codegen_interface.h>
  23. #include <grpcpp/impl/codegen/rpc_service_method.h>
  24. #include <grpcpp/impl/codegen/sync_stream.h>
  25. namespace grpc
  26. {
  27. namespace internal
  28. {
  29. // Invoke the method handler, fill in the status, and
  30. // return whether or not we finished safely (without an exception).
  31. // Note that exception handling is 0-cost in most compiler/library
  32. // implementations (except when an exception is actually thrown),
  33. // so this process doesn't require additional overhead in the common case.
  34. // Additionally, we don't need to return if we caught an exception or not;
  35. // the handling is the same in either case.
  36. template<class Callable>
  37. ::grpc::Status CatchingFunctionHandler(Callable&& handler)
  38. {
  39. #if GRPC_ALLOW_EXCEPTIONS
  40. try
  41. {
  42. return handler();
  43. }
  44. catch (...)
  45. {
  46. return grpc::Status(grpc::StatusCode::UNKNOWN, "Unexpected error in RPC handling");
  47. }
  48. #else // GRPC_ALLOW_EXCEPTIONS
  49. return handler();
  50. #endif // GRPC_ALLOW_EXCEPTIONS
  51. }
  52. /// A helper function with reduced templating to do the common work needed to
  53. /// actually send the server response. Uses non-const parameter for Status since
  54. /// this should only ever be called from the end of the RunHandler method.
  55. template<class ResponseType>
  56. void UnaryRunHandlerHelper(const MethodHandler::HandlerParameter& param, ResponseType* rsp, grpc::Status& status)
  57. {
  58. GPR_CODEGEN_ASSERT(!param.server_context->sent_initial_metadata_);
  59. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage, grpc::internal::CallOpServerSendStatus>
  60. ops;
  61. ops.SendInitialMetadata(&param.server_context->initial_metadata_, param.server_context->initial_metadata_flags());
  62. if (param.server_context->compression_level_set())
  63. {
  64. ops.set_compression_level(param.server_context->compression_level());
  65. }
  66. if (status.ok())
  67. {
  68. status = ops.SendMessagePtr(rsp);
  69. }
  70. ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
  71. param.call->PerformOps(&ops);
  72. param.call->cq()->Pluck(&ops);
  73. }
  74. /// A helper function with reduced templating to do deserializing.
  75. template<class RequestType>
  76. void* UnaryDeserializeHelper(grpc_byte_buffer* req, grpc::Status* status, RequestType* request)
  77. {
  78. grpc::ByteBuffer buf;
  79. buf.set_buffer(req);
  80. *status = grpc::SerializationTraits<RequestType>::Deserialize(
  81. &buf, static_cast<RequestType*>(request)
  82. );
  83. buf.Release();
  84. if (status->ok())
  85. {
  86. return request;
  87. }
  88. request->~RequestType();
  89. return nullptr;
  90. }
  91. /// A wrapper class of an application provided rpc method handler.
  92. template<class ServiceType, class RequestType, class ResponseType, class BaseRequestType = RequestType, class BaseResponseType = ResponseType>
  93. class RpcMethodHandler : public grpc::internal::MethodHandler
  94. {
  95. public:
  96. RpcMethodHandler(
  97. std::function<grpc::Status(ServiceType*, grpc::ServerContext*, const RequestType*, ResponseType*)>
  98. func,
  99. ServiceType* service
  100. ) :
  101. func_(func),
  102. service_(service)
  103. {
  104. }
  105. void RunHandler(const HandlerParameter& param) final
  106. {
  107. ResponseType rsp;
  108. grpc::Status status = param.status;
  109. if (status.ok())
  110. {
  111. status = CatchingFunctionHandler([this, &param, &rsp]
  112. { return func_(service_, static_cast<grpc::ServerContext*>(param.server_context), static_cast<RequestType*>(param.request), &rsp); });
  113. static_cast<RequestType*>(param.request)->~RequestType();
  114. }
  115. UnaryRunHandlerHelper(param, static_cast<BaseResponseType*>(&rsp), status);
  116. }
  117. void* Deserialize(grpc_call* call, grpc_byte_buffer* req, grpc::Status* status, void** /*handler_data*/) final
  118. {
  119. auto* request = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
  120. call, sizeof(RequestType)
  121. )) RequestType;
  122. return UnaryDeserializeHelper(req, status, static_cast<BaseRequestType*>(request));
  123. }
  124. private:
  125. /// Application provided rpc handler function.
  126. std::function<grpc::Status(ServiceType*, grpc::ServerContext*, const RequestType*, ResponseType*)>
  127. func_;
  128. // The class the above handler function lives in.
  129. ServiceType* service_;
  130. };
  131. /// A wrapper class of an application provided client streaming handler.
  132. template<class ServiceType, class RequestType, class ResponseType>
  133. class ClientStreamingHandler : public grpc::internal::MethodHandler
  134. {
  135. public:
  136. ClientStreamingHandler(
  137. std::function<grpc::Status(ServiceType*, grpc::ServerContext*, ServerReader<RequestType>*, ResponseType*)>
  138. func,
  139. ServiceType* service
  140. ) :
  141. func_(func),
  142. service_(service)
  143. {
  144. }
  145. void RunHandler(const HandlerParameter& param) final
  146. {
  147. ServerReader<RequestType> reader(
  148. param.call, static_cast<grpc::ServerContext*>(param.server_context)
  149. );
  150. ResponseType rsp;
  151. grpc::Status status =
  152. CatchingFunctionHandler([this, &param, &reader, &rsp]
  153. { return func_(service_, static_cast<grpc::ServerContext*>(param.server_context), &reader, &rsp); });
  154. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage, grpc::internal::CallOpServerSendStatus>
  155. ops;
  156. if (!param.server_context->sent_initial_metadata_)
  157. {
  158. ops.SendInitialMetadata(&param.server_context->initial_metadata_, param.server_context->initial_metadata_flags());
  159. if (param.server_context->compression_level_set())
  160. {
  161. ops.set_compression_level(param.server_context->compression_level());
  162. }
  163. }
  164. if (status.ok())
  165. {
  166. status = ops.SendMessagePtr(&rsp);
  167. }
  168. ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
  169. param.call->PerformOps(&ops);
  170. param.call->cq()->Pluck(&ops);
  171. }
  172. private:
  173. std::function<grpc::Status(ServiceType*, grpc::ServerContext*, ServerReader<RequestType>*, ResponseType*)>
  174. func_;
  175. ServiceType* service_;
  176. };
  177. /// A wrapper class of an application provided server streaming handler.
  178. template<class ServiceType, class RequestType, class ResponseType>
  179. class ServerStreamingHandler : public grpc::internal::MethodHandler
  180. {
  181. public:
  182. ServerStreamingHandler(std::function<grpc::Status(ServiceType*, grpc::ServerContext*, const RequestType*, ServerWriter<ResponseType>*)> func, ServiceType* service) :
  183. func_(func),
  184. service_(service)
  185. {
  186. }
  187. void RunHandler(const HandlerParameter& param) final
  188. {
  189. grpc::Status status = param.status;
  190. if (status.ok())
  191. {
  192. ServerWriter<ResponseType> writer(
  193. param.call, static_cast<grpc::ServerContext*>(param.server_context)
  194. );
  195. status = CatchingFunctionHandler([this, &param, &writer]
  196. { return func_(service_, static_cast<grpc::ServerContext*>(param.server_context), static_cast<RequestType*>(param.request), &writer); });
  197. static_cast<RequestType*>(param.request)->~RequestType();
  198. }
  199. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpServerSendStatus>
  200. ops;
  201. if (!param.server_context->sent_initial_metadata_)
  202. {
  203. ops.SendInitialMetadata(&param.server_context->initial_metadata_, param.server_context->initial_metadata_flags());
  204. if (param.server_context->compression_level_set())
  205. {
  206. ops.set_compression_level(param.server_context->compression_level());
  207. }
  208. }
  209. ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
  210. param.call->PerformOps(&ops);
  211. if (param.server_context->has_pending_ops_)
  212. {
  213. param.call->cq()->Pluck(&param.server_context->pending_ops_);
  214. }
  215. param.call->cq()->Pluck(&ops);
  216. }
  217. void* Deserialize(grpc_call* call, grpc_byte_buffer* req, grpc::Status* status, void** /*handler_data*/) final
  218. {
  219. grpc::ByteBuffer buf;
  220. buf.set_buffer(req);
  221. auto* request = new (grpc::g_core_codegen_interface->grpc_call_arena_alloc(
  222. call, sizeof(RequestType)
  223. )) RequestType();
  224. *status =
  225. grpc::SerializationTraits<RequestType>::Deserialize(&buf, request);
  226. buf.Release();
  227. if (status->ok())
  228. {
  229. return request;
  230. }
  231. request->~RequestType();
  232. return nullptr;
  233. }
  234. private:
  235. std::function<grpc::Status(ServiceType*, grpc::ServerContext*, const RequestType*, ServerWriter<ResponseType>*)>
  236. func_;
  237. ServiceType* service_;
  238. };
  239. /// A wrapper class of an application provided bidi-streaming handler.
  240. /// This also applies to server-streamed implementation of a unary method
  241. /// with the additional requirement that such methods must have done a
  242. /// write for status to be ok
  243. /// Since this is used by more than 1 class, the service is not passed in.
  244. /// Instead, it is expected to be an implicitly-captured argument of func
  245. /// (through bind or something along those lines)
  246. template<class Streamer, bool WriteNeeded>
  247. class TemplatedBidiStreamingHandler : public grpc::internal::MethodHandler
  248. {
  249. public:
  250. explicit TemplatedBidiStreamingHandler(
  251. std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func
  252. ) :
  253. func_(func),
  254. write_needed_(WriteNeeded)
  255. {
  256. }
  257. void RunHandler(const HandlerParameter& param) final
  258. {
  259. Streamer stream(param.call, static_cast<grpc::ServerContext*>(param.server_context));
  260. grpc::Status status = CatchingFunctionHandler([this, &param, &stream]
  261. { return func_(static_cast<grpc::ServerContext*>(param.server_context), &stream); });
  262. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpServerSendStatus>
  263. ops;
  264. if (!param.server_context->sent_initial_metadata_)
  265. {
  266. ops.SendInitialMetadata(&param.server_context->initial_metadata_, param.server_context->initial_metadata_flags());
  267. if (param.server_context->compression_level_set())
  268. {
  269. ops.set_compression_level(param.server_context->compression_level());
  270. }
  271. if (write_needed_ && status.ok())
  272. {
  273. // If we needed a write but never did one, we need to mark the
  274. // status as a fail
  275. status = grpc::Status(grpc::StatusCode::INTERNAL, "Service did not provide response message");
  276. }
  277. }
  278. ops.ServerSendStatus(&param.server_context->trailing_metadata_, status);
  279. param.call->PerformOps(&ops);
  280. if (param.server_context->has_pending_ops_)
  281. {
  282. param.call->cq()->Pluck(&param.server_context->pending_ops_);
  283. }
  284. param.call->cq()->Pluck(&ops);
  285. }
  286. private:
  287. std::function<grpc::Status(grpc::ServerContext*, Streamer*)> func_;
  288. const bool write_needed_;
  289. };
  290. template<class ServiceType, class RequestType, class ResponseType>
  291. class BidiStreamingHandler : public TemplatedBidiStreamingHandler<ServerReaderWriter<ResponseType, RequestType>, false>
  292. {
  293. public:
  294. BidiStreamingHandler(std::function<grpc::Status(ServiceType*, grpc::ServerContext*, ServerReaderWriter<ResponseType, RequestType>*)> func, ServiceType* service)
  295. // TODO(vjpai): When gRPC supports C++14, move-capture func in the below
  296. :
  297. TemplatedBidiStreamingHandler<
  298. ServerReaderWriter<ResponseType, RequestType>,
  299. false>(
  300. [func, service](
  301. grpc::ServerContext* ctx,
  302. ServerReaderWriter<ResponseType, RequestType>* streamer
  303. )
  304. {
  305. return func(service, ctx, streamer);
  306. }
  307. )
  308. {
  309. }
  310. };
  311. template<class RequestType, class ResponseType>
  312. class StreamedUnaryHandler : public TemplatedBidiStreamingHandler<ServerUnaryStreamer<RequestType, ResponseType>, true>
  313. {
  314. public:
  315. explicit StreamedUnaryHandler(
  316. std::function<
  317. grpc::Status(grpc::ServerContext*, ServerUnaryStreamer<RequestType, ResponseType>*)>
  318. func
  319. ) :
  320. TemplatedBidiStreamingHandler<
  321. ServerUnaryStreamer<RequestType, ResponseType>,
  322. true>(
  323. std::move(func)
  324. )
  325. {
  326. }
  327. };
  328. template<class RequestType, class ResponseType>
  329. class SplitServerStreamingHandler : public TemplatedBidiStreamingHandler<ServerSplitStreamer<RequestType, ResponseType>, false>
  330. {
  331. public:
  332. explicit SplitServerStreamingHandler(
  333. std::function<
  334. grpc::Status(grpc::ServerContext*, ServerSplitStreamer<RequestType, ResponseType>*)>
  335. func
  336. ) :
  337. TemplatedBidiStreamingHandler<
  338. ServerSplitStreamer<RequestType, ResponseType>,
  339. false>(
  340. std::move(func)
  341. )
  342. {
  343. }
  344. };
  345. /// General method handler class for errors that prevent real method use
  346. /// e.g., handle unknown method by returning UNIMPLEMENTED error.
  347. template<grpc::StatusCode code>
  348. class ErrorMethodHandler : public grpc::internal::MethodHandler
  349. {
  350. public:
  351. explicit ErrorMethodHandler(const std::string& message) :
  352. message_(message)
  353. {
  354. }
  355. template<class T>
  356. static void FillOps(grpc::ServerContextBase* context, const std::string& message, T* ops)
  357. {
  358. grpc::Status status(code, message);
  359. if (!context->sent_initial_metadata_)
  360. {
  361. ops->SendInitialMetadata(&context->initial_metadata_, context->initial_metadata_flags());
  362. if (context->compression_level_set())
  363. {
  364. ops->set_compression_level(context->compression_level());
  365. }
  366. context->sent_initial_metadata_ = true;
  367. }
  368. ops->ServerSendStatus(&context->trailing_metadata_, status);
  369. }
  370. void RunHandler(const HandlerParameter& param) final
  371. {
  372. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpServerSendStatus>
  373. ops;
  374. FillOps(param.server_context, message_, &ops);
  375. param.call->PerformOps(&ops);
  376. param.call->cq()->Pluck(&ops);
  377. }
  378. void* Deserialize(grpc_call* /*call*/, grpc_byte_buffer* req, grpc::Status* /*status*/, void** /*handler_data*/) final
  379. {
  380. // We have to destroy any request payload
  381. if (req != nullptr)
  382. {
  383. grpc::g_core_codegen_interface->grpc_byte_buffer_destroy(req);
  384. }
  385. return nullptr;
  386. }
  387. private:
  388. const std::string message_;
  389. };
  390. typedef ErrorMethodHandler<grpc::StatusCode::UNIMPLEMENTED>
  391. UnknownMethodHandler;
  392. typedef ErrorMethodHandler<grpc::StatusCode::RESOURCE_EXHAUSTED>
  393. ResourceExhaustedHandler;
  394. } // namespace internal
  395. } // namespace grpc
  396. #endif // GRPCPP_IMPL_CODEGEN_METHOD_HANDLER_H