You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

sync_stream.h 42 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044
  1. /*
  2. *
  3. * Copyright 2019 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. #ifndef GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
  18. #define GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H
  19. // IWYU pragma: private, include <grpcpp/support/sync_stream.h>
  20. #include <grpcpp/impl/codegen/call.h>
  21. #include <grpcpp/impl/codegen/channel_interface.h>
  22. #include <grpcpp/impl/codegen/client_context.h>
  23. #include <grpcpp/impl/codegen/completion_queue.h>
  24. #include <grpcpp/impl/codegen/core_codegen_interface.h>
  25. #include <grpcpp/impl/codegen/server_context.h>
  26. #include <grpcpp/impl/codegen/service_type.h>
  27. #include <grpcpp/impl/codegen/status.h>
  28. namespace grpc
  29. {
  30. namespace internal
  31. {
  32. /// Common interface for all synchronous client side streaming.
  33. class ClientStreamingInterface
  34. {
  35. public:
  36. virtual ~ClientStreamingInterface()
  37. {
  38. }
  39. /// Block waiting until the stream finishes and a final status of the call is
  40. /// available.
  41. ///
  42. /// It is appropriate to call this method exactly once when both:
  43. /// * the calling code (client-side) has no more message to send
  44. /// (this can be declared implicitly by calling this method, or
  45. /// explicitly through an earlier call to <i>WritesDone</i> method of the
  46. /// class in use, e.g. \a ClientWriterInterface::WritesDone or
  47. /// \a ClientReaderWriterInterface::WritesDone).
  48. /// * there are no more messages to be received from the server (which can
  49. /// be known implicitly, or explicitly from an earlier call to \a
  50. /// ReaderInterface::Read that returned "false").
  51. ///
  52. /// This function will return either:
  53. /// - when all incoming messages have been read and the server has
  54. /// returned status.
  55. /// - when the server has returned a non-OK status.
  56. /// - OR when the call failed for some reason and the library generated a
  57. /// status.
  58. ///
  59. /// Return values:
  60. /// - \a Status contains the status code, message and details for the call
  61. /// - the \a ClientContext associated with this call is updated with
  62. /// possible trailing metadata sent from the server.
  63. virtual grpc::Status Finish() = 0;
  64. };
  65. /// Common interface for all synchronous server side streaming.
  66. class ServerStreamingInterface
  67. {
  68. public:
  69. virtual ~ServerStreamingInterface()
  70. {
  71. }
  72. /// Block to send initial metadata to client.
  73. /// This call is optional, but if it is used, it cannot be used concurrently
  74. /// with or after the \a Finish method.
  75. ///
  76. /// The initial metadata that will be sent to the client will be
  77. /// taken from the \a ServerContext associated with the call.
  78. virtual void SendInitialMetadata() = 0;
  79. };
  80. /// An interface that yields a sequence of messages of type \a R.
  81. template<class R>
  82. class ReaderInterface
  83. {
  84. public:
  85. virtual ~ReaderInterface()
  86. {
  87. }
  88. /// Get an upper bound on the next message size available for reading on this
  89. /// stream.
  90. virtual bool NextMessageSize(uint32_t* sz) = 0;
  91. /// Block to read a message and parse to \a msg. Returns \a true on success.
  92. /// This is thread-safe with respect to \a Write or \WritesDone methods on
  93. /// the same stream. It should not be called concurrently with another \a
  94. /// Read on the same stream as the order of delivery will not be defined.
  95. ///
  96. /// \param[out] msg The read message.
  97. ///
  98. /// \return \a false when there will be no more incoming messages, either
  99. /// because the other side has called \a WritesDone() or the stream has failed
  100. /// (or been cancelled).
  101. virtual bool Read(R* msg) = 0;
  102. };
  103. /// An interface that can be fed a sequence of messages of type \a W.
  104. template<class W>
  105. class WriterInterface
  106. {
  107. public:
  108. virtual ~WriterInterface()
  109. {
  110. }
  111. /// Block to write \a msg to the stream with WriteOptions \a options.
  112. /// This is thread-safe with respect to \a ReaderInterface::Read
  113. ///
  114. /// \param msg The message to be written to the stream.
  115. /// \param options The WriteOptions affecting the write operation.
  116. ///
  117. /// \return \a true on success, \a false when the stream has been closed.
  118. virtual bool Write(const W& msg, grpc::WriteOptions options) = 0;
  119. /// Block to write \a msg to the stream with default write options.
  120. /// This is thread-safe with respect to \a ReaderInterface::Read
  121. ///
  122. /// \param msg The message to be written to the stream.
  123. ///
  124. /// \return \a true on success, \a false when the stream has been closed.
  125. inline bool Write(const W& msg)
  126. {
  127. return Write(msg, grpc::WriteOptions());
  128. }
  129. /// Write \a msg and coalesce it with the writing of trailing metadata, using
  130. /// WriteOptions \a options.
  131. ///
  132. /// For client, WriteLast is equivalent of performing Write and WritesDone in
  133. /// a single step. \a msg and trailing metadata are coalesced and sent on wire
  134. /// by calling this function. For server, WriteLast buffers the \a msg.
  135. /// The writing of \a msg is held until the service handler returns,
  136. /// where \a msg and trailing metadata are coalesced and sent on wire.
  137. /// Note that WriteLast can only buffer \a msg up to the flow control window
  138. /// size. If \a msg size is larger than the window size, it will be sent on
  139. /// wire without buffering.
  140. ///
  141. /// \param[in] msg The message to be written to the stream.
  142. /// \param[in] options The WriteOptions to be used to write this message.
  143. void WriteLast(const W& msg, grpc::WriteOptions options)
  144. {
  145. Write(msg, options.set_last_message());
  146. }
  147. };
  148. } // namespace internal
  149. /// Client-side interface for streaming reads of message of type \a R.
  150. template<class R>
  151. class ClientReaderInterface : public internal::ClientStreamingInterface, public internal::ReaderInterface<R>
  152. {
  153. public:
  154. /// Block to wait for initial metadata from server. The received metadata
  155. /// can only be accessed after this call returns. Should only be called before
  156. /// the first read. Calling this method is optional, and if it is not called
  157. /// the metadata will be available in ClientContext after the first read.
  158. virtual void WaitForInitialMetadata() = 0;
  159. };
  160. namespace internal
  161. {
  162. template<class R>
  163. class ClientReaderFactory
  164. {
  165. public:
  166. template<class W>
  167. static ClientReader<R>* Create(grpc::ChannelInterface* channel, const grpc::internal::RpcMethod& method, grpc::ClientContext* context, const W& request)
  168. {
  169. return new ClientReader<R>(channel, method, context, request);
  170. }
  171. };
  172. } // namespace internal
  173. /// Synchronous (blocking) client-side API for doing server-streaming RPCs,
  174. /// where the stream of messages coming from the server has messages
  175. /// of type \a R.
  176. template<class R>
  177. class ClientReader final : public ClientReaderInterface<R>
  178. {
  179. public:
  180. /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
  181. /// semantics.
  182. ///
  183. // Side effect:
  184. /// Once complete, the initial metadata read from
  185. /// the server will be accessible through the \a ClientContext used to
  186. /// construct this object.
  187. void WaitForInitialMetadata() override
  188. {
  189. GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
  190. grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops;
  191. ops.RecvInitialMetadata(context_);
  192. call_.PerformOps(&ops);
  193. cq_.Pluck(&ops); /// status ignored
  194. }
  195. bool NextMessageSize(uint32_t* sz) override
  196. {
  197. int result = call_.max_receive_message_size();
  198. *sz = (result > 0) ? result : UINT32_MAX;
  199. return true;
  200. }
  201. /// See the \a ReaderInterface.Read method for semantics.
  202. /// Side effect:
  203. /// This also receives initial metadata from the server, if not
  204. /// already received (if initial metadata is received, it can be then
  205. /// accessed through the \a ClientContext associated with this call).
  206. bool Read(R* msg) override
  207. {
  208. grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, grpc::internal::CallOpRecvMessage<R>>
  209. ops;
  210. if (!context_->initial_metadata_received_)
  211. {
  212. ops.RecvInitialMetadata(context_);
  213. }
  214. ops.RecvMessage(msg);
  215. call_.PerformOps(&ops);
  216. return cq_.Pluck(&ops) && ops.got_message;
  217. }
  218. /// See the \a ClientStreamingInterface.Finish method for semantics.
  219. ///
  220. /// Side effect:
  221. /// The \a ClientContext associated with this call is updated with
  222. /// possible metadata received from the server.
  223. grpc::Status Finish() override
  224. {
  225. grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, grpc::internal::CallOpClientRecvStatus>
  226. ops;
  227. if (!context_->initial_metadata_received_)
  228. {
  229. ops.RecvInitialMetadata(context_);
  230. }
  231. grpc::Status status;
  232. ops.ClientRecvStatus(context_, &status);
  233. call_.PerformOps(&ops);
  234. GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
  235. return status;
  236. }
  237. private:
  238. friend class internal::ClientReaderFactory<R>;
  239. grpc::ClientContext* context_;
  240. grpc::CompletionQueue cq_;
  241. grpc::internal::Call call_;
  242. /// Block to create a stream and write the initial metadata and \a request
  243. /// out. Note that \a context will be used to fill in custom initial
  244. /// metadata used to send to the server when starting the call.
  245. template<class W>
  246. ClientReader(grpc::ChannelInterface* channel, const grpc::internal::RpcMethod& method, grpc::ClientContext* context, const W& request) :
  247. context_(context),
  248. cq_(grpc_completion_queue_attributes{
  249. GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, nullptr}), // Pluckable cq
  250. call_(channel->CreateCall(method, context, &cq_))
  251. {
  252. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage, grpc::internal::CallOpClientSendClose>
  253. ops;
  254. ops.SendInitialMetadata(&context->send_initial_metadata_, context->initial_metadata_flags());
  255. // TODO(ctiller): don't assert
  256. GPR_CODEGEN_ASSERT(ops.SendMessagePtr(&request).ok());
  257. ops.ClientSendClose();
  258. call_.PerformOps(&ops);
  259. cq_.Pluck(&ops);
  260. }
  261. };
  262. /// Client-side interface for streaming writes of message type \a W.
  263. template<class W>
  264. class ClientWriterInterface : public internal::ClientStreamingInterface, public internal::WriterInterface<W>
  265. {
  266. public:
  267. /// Half close writing from the client. (signal that the stream of messages
  268. /// coming from the client is complete).
  269. /// Blocks until currently-pending writes are completed.
  270. /// Thread safe with respect to \a ReaderInterface::Read operations only
  271. ///
  272. /// \return Whether the writes were successful.
  273. virtual bool WritesDone() = 0;
  274. };
  275. namespace internal
  276. {
  277. template<class W>
  278. class ClientWriterFactory
  279. {
  280. public:
  281. template<class R>
  282. static ClientWriter<W>* Create(grpc::ChannelInterface* channel, const grpc::internal::RpcMethod& method, grpc::ClientContext* context, R* response)
  283. {
  284. return new ClientWriter<W>(channel, method, context, response);
  285. }
  286. };
  287. } // namespace internal
  288. /// Synchronous (blocking) client-side API for doing client-streaming RPCs,
  289. /// where the outgoing message stream coming from the client has messages of
  290. /// type \a W.
  291. template<class W>
  292. class ClientWriter : public ClientWriterInterface<W>
  293. {
  294. public:
  295. /// See the \a ClientStreamingInterface.WaitForInitialMetadata method for
  296. /// semantics.
  297. ///
  298. // Side effect:
  299. /// Once complete, the initial metadata read from the server will be
  300. /// accessible through the \a ClientContext used to construct this object.
  301. void WaitForInitialMetadata()
  302. {
  303. GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
  304. grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops;
  305. ops.RecvInitialMetadata(context_);
  306. call_.PerformOps(&ops);
  307. cq_.Pluck(&ops); // status ignored
  308. }
  309. /// See the WriterInterface.Write(const W& msg, WriteOptions options) method
  310. /// for semantics.
  311. ///
  312. /// Side effect:
  313. /// Also sends initial metadata if not already sent (using the
  314. /// \a ClientContext associated with this call).
  315. using internal::WriterInterface<W>::Write;
  316. bool Write(const W& msg, grpc::WriteOptions options) override
  317. {
  318. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage, grpc::internal::CallOpClientSendClose>
  319. ops;
  320. if (options.is_last_message())
  321. {
  322. options.set_buffer_hint();
  323. ops.ClientSendClose();
  324. }
  325. if (context_->initial_metadata_corked_)
  326. {
  327. ops.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags());
  328. context_->set_initial_metadata_corked(false);
  329. }
  330. if (!ops.SendMessagePtr(&msg, options).ok())
  331. {
  332. return false;
  333. }
  334. call_.PerformOps(&ops);
  335. return cq_.Pluck(&ops);
  336. }
  337. bool WritesDone() override
  338. {
  339. grpc::internal::CallOpSet<grpc::internal::CallOpClientSendClose> ops;
  340. ops.ClientSendClose();
  341. call_.PerformOps(&ops);
  342. return cq_.Pluck(&ops);
  343. }
  344. /// See the ClientStreamingInterface.Finish method for semantics.
  345. /// Side effects:
  346. /// - Also receives initial metadata if not already received.
  347. /// - Attempts to fill in the \a response parameter passed
  348. /// to the constructor of this instance with the response
  349. /// message from the server.
  350. grpc::Status Finish() override
  351. {
  352. grpc::Status status;
  353. if (!context_->initial_metadata_received_)
  354. {
  355. finish_ops_.RecvInitialMetadata(context_);
  356. }
  357. finish_ops_.ClientRecvStatus(context_, &status);
  358. call_.PerformOps(&finish_ops_);
  359. GPR_CODEGEN_ASSERT(cq_.Pluck(&finish_ops_));
  360. return status;
  361. }
  362. private:
  363. friend class internal::ClientWriterFactory<W>;
  364. /// Block to create a stream (i.e. send request headers and other initial
  365. /// metadata to the server). Note that \a context will be used to fill
  366. /// in custom initial metadata. \a response will be filled in with the
  367. /// single expected response message from the server upon a successful
  368. /// call to the \a Finish method of this instance.
  369. template<class R>
  370. ClientWriter(grpc::ChannelInterface* channel, const grpc::internal::RpcMethod& method, grpc::ClientContext* context, R* response) :
  371. context_(context),
  372. cq_(grpc_completion_queue_attributes{
  373. GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, nullptr}), // Pluckable cq
  374. call_(channel->CreateCall(method, context, &cq_))
  375. {
  376. finish_ops_.RecvMessage(response);
  377. finish_ops_.AllowNoMessage();
  378. if (!context_->initial_metadata_corked_)
  379. {
  380. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
  381. ops.SendInitialMetadata(&context->send_initial_metadata_, context->initial_metadata_flags());
  382. call_.PerformOps(&ops);
  383. cq_.Pluck(&ops);
  384. }
  385. }
  386. grpc::ClientContext* context_;
  387. grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, grpc::internal::CallOpGenericRecvMessage, grpc::internal::CallOpClientRecvStatus>
  388. finish_ops_;
  389. grpc::CompletionQueue cq_;
  390. grpc::internal::Call call_;
  391. };
  392. /// Client-side interface for bi-directional streaming with
  393. /// client-to-server stream messages of type \a W and
  394. /// server-to-client stream messages of type \a R.
  395. template<class W, class R>
  396. class ClientReaderWriterInterface : public internal::ClientStreamingInterface, public internal::WriterInterface<W>, public internal::ReaderInterface<R>
  397. {
  398. public:
  399. /// Block to wait for initial metadata from server. The received metadata
  400. /// can only be accessed after this call returns. Should only be called before
  401. /// the first read. Calling this method is optional, and if it is not called
  402. /// the metadata will be available in ClientContext after the first read.
  403. virtual void WaitForInitialMetadata() = 0;
  404. /// Half close writing from the client. (signal that the stream of messages
  405. /// coming from the client is complete).
  406. /// Blocks until currently-pending writes are completed.
  407. /// Thread-safe with respect to \a ReaderInterface::Read
  408. ///
  409. /// \return Whether the writes were successful.
  410. virtual bool WritesDone() = 0;
  411. };
  412. namespace internal
  413. {
  414. template<class W, class R>
  415. class ClientReaderWriterFactory
  416. {
  417. public:
  418. static ClientReaderWriter<W, R>* Create(
  419. grpc::ChannelInterface* channel, const grpc::internal::RpcMethod& method, grpc::ClientContext* context
  420. )
  421. {
  422. return new ClientReaderWriter<W, R>(channel, method, context);
  423. }
  424. };
  425. } // namespace internal
  426. /// Synchronous (blocking) client-side API for bi-directional streaming RPCs,
  427. /// where the outgoing message stream coming from the client has messages of
  428. /// type \a W, and the incoming messages stream coming from the server has
  429. /// messages of type \a R.
  430. template<class W, class R>
  431. class ClientReaderWriter final : public ClientReaderWriterInterface<W, R>
  432. {
  433. public:
  434. /// Block waiting to read initial metadata from the server.
  435. /// This call is optional, but if it is used, it cannot be used concurrently
  436. /// with or after the \a Finish method.
  437. ///
  438. /// Once complete, the initial metadata read from the server will be
  439. /// accessible through the \a ClientContext used to construct this object.
  440. void WaitForInitialMetadata() override
  441. {
  442. GPR_CODEGEN_ASSERT(!context_->initial_metadata_received_);
  443. grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata> ops;
  444. ops.RecvInitialMetadata(context_);
  445. call_.PerformOps(&ops);
  446. cq_.Pluck(&ops); // status ignored
  447. }
  448. bool NextMessageSize(uint32_t* sz) override
  449. {
  450. int result = call_.max_receive_message_size();
  451. *sz = (result > 0) ? result : UINT32_MAX;
  452. return true;
  453. }
  454. /// See the \a ReaderInterface.Read method for semantics.
  455. /// Side effect:
  456. /// Also receives initial metadata if not already received (updates the \a
  457. /// ClientContext associated with this call in that case).
  458. bool Read(R* msg) override
  459. {
  460. grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, grpc::internal::CallOpRecvMessage<R>>
  461. ops;
  462. if (!context_->initial_metadata_received_)
  463. {
  464. ops.RecvInitialMetadata(context_);
  465. }
  466. ops.RecvMessage(msg);
  467. call_.PerformOps(&ops);
  468. return cq_.Pluck(&ops) && ops.got_message;
  469. }
  470. /// See the \a WriterInterface.Write method for semantics.
  471. ///
  472. /// Side effect:
  473. /// Also sends initial metadata if not already sent (using the
  474. /// \a ClientContext associated with this call to fill in values).
  475. using internal::WriterInterface<W>::Write;
  476. bool Write(const W& msg, grpc::WriteOptions options) override
  477. {
  478. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata, grpc::internal::CallOpSendMessage, grpc::internal::CallOpClientSendClose>
  479. ops;
  480. if (options.is_last_message())
  481. {
  482. options.set_buffer_hint();
  483. ops.ClientSendClose();
  484. }
  485. if (context_->initial_metadata_corked_)
  486. {
  487. ops.SendInitialMetadata(&context_->send_initial_metadata_, context_->initial_metadata_flags());
  488. context_->set_initial_metadata_corked(false);
  489. }
  490. if (!ops.SendMessagePtr(&msg, options).ok())
  491. {
  492. return false;
  493. }
  494. call_.PerformOps(&ops);
  495. return cq_.Pluck(&ops);
  496. }
  497. bool WritesDone() override
  498. {
  499. grpc::internal::CallOpSet<grpc::internal::CallOpClientSendClose> ops;
  500. ops.ClientSendClose();
  501. call_.PerformOps(&ops);
  502. return cq_.Pluck(&ops);
  503. }
  504. /// See the ClientStreamingInterface.Finish method for semantics.
  505. ///
  506. /// Side effect:
  507. /// - the \a ClientContext associated with this call is updated with
  508. /// possible trailing metadata sent from the server.
  509. grpc::Status Finish() override
  510. {
  511. grpc::internal::CallOpSet<grpc::internal::CallOpRecvInitialMetadata, grpc::internal::CallOpClientRecvStatus>
  512. ops;
  513. if (!context_->initial_metadata_received_)
  514. {
  515. ops.RecvInitialMetadata(context_);
  516. }
  517. grpc::Status status;
  518. ops.ClientRecvStatus(context_, &status);
  519. call_.PerformOps(&ops);
  520. GPR_CODEGEN_ASSERT(cq_.Pluck(&ops));
  521. return status;
  522. }
  523. private:
  524. friend class internal::ClientReaderWriterFactory<W, R>;
  525. grpc::ClientContext* context_;
  526. grpc::CompletionQueue cq_;
  527. grpc::internal::Call call_;
  528. /// Block to create a stream and write the initial metadata and \a request
  529. /// out. Note that \a context will be used to fill in custom initial metadata
  530. /// used to send to the server when starting the call.
  531. ClientReaderWriter(grpc::ChannelInterface* channel, const grpc::internal::RpcMethod& method, grpc::ClientContext* context) :
  532. context_(context),
  533. cq_(grpc_completion_queue_attributes{
  534. GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK, GRPC_CQ_DEFAULT_POLLING, nullptr}), // Pluckable cq
  535. call_(channel->CreateCall(method, context, &cq_))
  536. {
  537. if (!context_->initial_metadata_corked_)
  538. {
  539. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
  540. ops.SendInitialMetadata(&context->send_initial_metadata_, context->initial_metadata_flags());
  541. call_.PerformOps(&ops);
  542. cq_.Pluck(&ops);
  543. }
  544. }
  545. };
  546. /// Server-side interface for streaming reads of message of type \a R.
  547. template<class R>
  548. class ServerReaderInterface : public internal::ServerStreamingInterface, public internal::ReaderInterface<R>
  549. {
  550. };
  551. /// Synchronous (blocking) server-side API for doing client-streaming RPCs,
  552. /// where the incoming message stream coming from the client has messages of
  553. /// type \a R.
  554. template<class R>
  555. class ServerReader final : public ServerReaderInterface<R>
  556. {
  557. public:
  558. /// See the \a ServerStreamingInterface.SendInitialMetadata method
  559. /// for semantics. Note that initial metadata will be affected by the
  560. /// \a ServerContext associated with this call.
  561. void SendInitialMetadata() override
  562. {
  563. GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
  564. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
  565. ops.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags());
  566. if (ctx_->compression_level_set())
  567. {
  568. ops.set_compression_level(ctx_->compression_level());
  569. }
  570. ctx_->sent_initial_metadata_ = true;
  571. call_->PerformOps(&ops);
  572. call_->cq()->Pluck(&ops);
  573. }
  574. bool NextMessageSize(uint32_t* sz) override
  575. {
  576. int result = call_->max_receive_message_size();
  577. *sz = (result > 0) ? result : UINT32_MAX;
  578. return true;
  579. }
  580. bool Read(R* msg) override
  581. {
  582. grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> ops;
  583. ops.RecvMessage(msg);
  584. call_->PerformOps(&ops);
  585. bool ok = call_->cq()->Pluck(&ops) && ops.got_message;
  586. if (!ok)
  587. {
  588. ctx_->MaybeMarkCancelledOnRead();
  589. }
  590. return ok;
  591. }
  592. private:
  593. grpc::internal::Call* const call_;
  594. ServerContext* const ctx_;
  595. template<class ServiceType, class RequestType, class ResponseType>
  596. friend class internal::ClientStreamingHandler;
  597. ServerReader(grpc::internal::Call* call, grpc::ServerContext* ctx) :
  598. call_(call),
  599. ctx_(ctx)
  600. {
  601. }
  602. };
  603. /// Server-side interface for streaming writes of message of type \a W.
  604. template<class W>
  605. class ServerWriterInterface : public internal::ServerStreamingInterface, public internal::WriterInterface<W>
  606. {
  607. };
  608. /// Synchronous (blocking) server-side API for doing for doing a
  609. /// server-streaming RPCs, where the outgoing message stream coming from the
  610. /// server has messages of type \a W.
  611. template<class W>
  612. class ServerWriter final : public ServerWriterInterface<W>
  613. {
  614. public:
  615. /// See the \a ServerStreamingInterface.SendInitialMetadata method
  616. /// for semantics.
  617. /// Note that initial metadata will be affected by the
  618. /// \a ServerContext associated with this call.
  619. void SendInitialMetadata() override
  620. {
  621. GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
  622. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
  623. ops.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags());
  624. if (ctx_->compression_level_set())
  625. {
  626. ops.set_compression_level(ctx_->compression_level());
  627. }
  628. ctx_->sent_initial_metadata_ = true;
  629. call_->PerformOps(&ops);
  630. call_->cq()->Pluck(&ops);
  631. }
  632. /// See the \a WriterInterface.Write method for semantics.
  633. ///
  634. /// Side effect:
  635. /// Also sends initial metadata if not already sent (using the
  636. /// \a ClientContext associated with this call to fill in values).
  637. using internal::WriterInterface<W>::Write;
  638. bool Write(const W& msg, grpc::WriteOptions options) override
  639. {
  640. if (options.is_last_message())
  641. {
  642. options.set_buffer_hint();
  643. }
  644. if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok())
  645. {
  646. return false;
  647. }
  648. if (!ctx_->sent_initial_metadata_)
  649. {
  650. ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags());
  651. if (ctx_->compression_level_set())
  652. {
  653. ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
  654. }
  655. ctx_->sent_initial_metadata_ = true;
  656. }
  657. call_->PerformOps(&ctx_->pending_ops_);
  658. // if this is the last message we defer the pluck until AFTER we start
  659. // the trailing md op. This prevents hangs. See
  660. // https://github.com/grpc/grpc/issues/11546
  661. if (options.is_last_message())
  662. {
  663. ctx_->has_pending_ops_ = true;
  664. return true;
  665. }
  666. ctx_->has_pending_ops_ = false;
  667. return call_->cq()->Pluck(&ctx_->pending_ops_);
  668. }
  669. private:
  670. grpc::internal::Call* const call_;
  671. grpc::ServerContext* const ctx_;
  672. template<class ServiceType, class RequestType, class ResponseType>
  673. friend class internal::ServerStreamingHandler;
  674. ServerWriter(grpc::internal::Call* call, grpc::ServerContext* ctx) :
  675. call_(call),
  676. ctx_(ctx)
  677. {
  678. }
  679. };
  680. /// Server-side interface for bi-directional streaming.
  681. template<class W, class R>
  682. class ServerReaderWriterInterface : public internal::ServerStreamingInterface, public internal::WriterInterface<W>, public internal::ReaderInterface<R>
  683. {
  684. };
  685. /// Actual implementation of bi-directional streaming
  686. namespace internal
  687. {
  688. template<class W, class R>
  689. class ServerReaderWriterBody final
  690. {
  691. public:
  692. ServerReaderWriterBody(grpc::internal::Call* call, grpc::ServerContext* ctx) :
  693. call_(call),
  694. ctx_(ctx)
  695. {
  696. }
  697. void SendInitialMetadata()
  698. {
  699. GPR_CODEGEN_ASSERT(!ctx_->sent_initial_metadata_);
  700. grpc::internal::CallOpSet<grpc::internal::CallOpSendInitialMetadata> ops;
  701. ops.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags());
  702. if (ctx_->compression_level_set())
  703. {
  704. ops.set_compression_level(ctx_->compression_level());
  705. }
  706. ctx_->sent_initial_metadata_ = true;
  707. call_->PerformOps(&ops);
  708. call_->cq()->Pluck(&ops);
  709. }
  710. bool NextMessageSize(uint32_t* sz)
  711. {
  712. int result = call_->max_receive_message_size();
  713. *sz = (result > 0) ? result : UINT32_MAX;
  714. return true;
  715. }
  716. bool Read(R* msg)
  717. {
  718. grpc::internal::CallOpSet<grpc::internal::CallOpRecvMessage<R>> ops;
  719. ops.RecvMessage(msg);
  720. call_->PerformOps(&ops);
  721. bool ok = call_->cq()->Pluck(&ops) && ops.got_message;
  722. if (!ok)
  723. {
  724. ctx_->MaybeMarkCancelledOnRead();
  725. }
  726. return ok;
  727. }
  728. bool Write(const W& msg, grpc::WriteOptions options)
  729. {
  730. if (options.is_last_message())
  731. {
  732. options.set_buffer_hint();
  733. }
  734. if (!ctx_->pending_ops_.SendMessagePtr(&msg, options).ok())
  735. {
  736. return false;
  737. }
  738. if (!ctx_->sent_initial_metadata_)
  739. {
  740. ctx_->pending_ops_.SendInitialMetadata(&ctx_->initial_metadata_, ctx_->initial_metadata_flags());
  741. if (ctx_->compression_level_set())
  742. {
  743. ctx_->pending_ops_.set_compression_level(ctx_->compression_level());
  744. }
  745. ctx_->sent_initial_metadata_ = true;
  746. }
  747. call_->PerformOps(&ctx_->pending_ops_);
  748. // if this is the last message we defer the pluck until AFTER we start
  749. // the trailing md op. This prevents hangs. See
  750. // https://github.com/grpc/grpc/issues/11546
  751. if (options.is_last_message())
  752. {
  753. ctx_->has_pending_ops_ = true;
  754. return true;
  755. }
  756. ctx_->has_pending_ops_ = false;
  757. return call_->cq()->Pluck(&ctx_->pending_ops_);
  758. }
  759. private:
  760. grpc::internal::Call* const call_;
  761. grpc::ServerContext* const ctx_;
  762. };
  763. } // namespace internal
  764. /// Synchronous (blocking) server-side API for a bidirectional
  765. /// streaming call, where the incoming message stream coming from the client has
  766. /// messages of type \a R, and the outgoing message streaming coming from
  767. /// the server has messages of type \a W.
  768. template<class W, class R>
  769. class ServerReaderWriter final : public ServerReaderWriterInterface<W, R>
  770. {
  771. public:
  772. /// See the \a ServerStreamingInterface.SendInitialMetadata method
  773. /// for semantics. Note that initial metadata will be affected by the
  774. /// \a ServerContext associated with this call.
  775. void SendInitialMetadata() override
  776. {
  777. body_.SendInitialMetadata();
  778. }
  779. bool NextMessageSize(uint32_t* sz) override
  780. {
  781. return body_.NextMessageSize(sz);
  782. }
  783. bool Read(R* msg) override
  784. {
  785. return body_.Read(msg);
  786. }
  787. /// See the \a WriterInterface.Write(const W& msg, WriteOptions options)
  788. /// method for semantics.
  789. /// Side effect:
  790. /// Also sends initial metadata if not already sent (using the \a
  791. /// ServerContext associated with this call).
  792. using internal::WriterInterface<W>::Write;
  793. bool Write(const W& msg, grpc::WriteOptions options) override
  794. {
  795. return body_.Write(msg, options);
  796. }
  797. private:
  798. internal::ServerReaderWriterBody<W, R> body_;
  799. friend class internal::TemplatedBidiStreamingHandler<ServerReaderWriter<W, R>, false>;
  800. ServerReaderWriter(grpc::internal::Call* call, grpc::ServerContext* ctx) :
  801. body_(call, ctx)
  802. {
  803. }
  804. };
  805. /// A class to represent a flow-controlled unary call. This is something
  806. /// of a hybrid between conventional unary and streaming. This is invoked
  807. /// through a unary call on the client side, but the server responds to it
  808. /// as though it were a single-ping-pong streaming call. The server can use
  809. /// the \a NextMessageSize method to determine an upper-bound on the size of
  810. /// the message. A key difference relative to streaming: ServerUnaryStreamer
  811. /// must have exactly 1 Read and exactly 1 Write, in that order, to function
  812. /// correctly. Otherwise, the RPC is in error.
  813. template<class RequestType, class ResponseType>
  814. class ServerUnaryStreamer final : public ServerReaderWriterInterface<ResponseType, RequestType>
  815. {
  816. public:
  817. /// Block to send initial metadata to client.
  818. /// Implicit input parameter:
  819. /// - the \a ServerContext associated with this call will be used for
  820. /// sending initial metadata.
  821. void SendInitialMetadata() override
  822. {
  823. body_.SendInitialMetadata();
  824. }
  825. /// Get an upper bound on the request message size from the client.
  826. bool NextMessageSize(uint32_t* sz) override
  827. {
  828. return body_.NextMessageSize(sz);
  829. }
  830. /// Read a message of type \a R into \a msg. Completion will be notified by \a
  831. /// tag on the associated completion queue.
  832. /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
  833. /// should not be called concurrently with other streaming APIs
  834. /// on the same stream. It is not meaningful to call it concurrently
  835. /// with another \a ReaderInterface::Read on the same stream since reads on
  836. /// the same stream are delivered in order.
  837. ///
  838. /// \param[out] msg Where to eventually store the read message.
  839. /// \param[in] tag The tag identifying the operation.
  840. bool Read(RequestType* request) override
  841. {
  842. if (read_done_)
  843. {
  844. return false;
  845. }
  846. read_done_ = true;
  847. return body_.Read(request);
  848. }
  849. /// Block to write \a msg to the stream with WriteOptions \a options.
  850. /// This is thread-safe with respect to \a ReaderInterface::Read
  851. ///
  852. /// \param msg The message to be written to the stream.
  853. /// \param options The WriteOptions affecting the write operation.
  854. ///
  855. /// \return \a true on success, \a false when the stream has been closed.
  856. using internal::WriterInterface<ResponseType>::Write;
  857. bool Write(const ResponseType& response, grpc::WriteOptions options) override
  858. {
  859. if (write_done_ || !read_done_)
  860. {
  861. return false;
  862. }
  863. write_done_ = true;
  864. return body_.Write(response, options);
  865. }
  866. private:
  867. internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
  868. bool read_done_;
  869. bool write_done_;
  870. friend class internal::TemplatedBidiStreamingHandler<
  871. ServerUnaryStreamer<RequestType, ResponseType>,
  872. true>;
  873. ServerUnaryStreamer(grpc::internal::Call* call, grpc::ServerContext* ctx) :
  874. body_(call, ctx),
  875. read_done_(false),
  876. write_done_(false)
  877. {
  878. }
  879. };
  880. /// A class to represent a flow-controlled server-side streaming call.
  881. /// This is something of a hybrid between server-side and bidi streaming.
  882. /// This is invoked through a server-side streaming call on the client side,
  883. /// but the server responds to it as though it were a bidi streaming call that
  884. /// must first have exactly 1 Read and then any number of Writes.
  885. template<class RequestType, class ResponseType>
  886. class ServerSplitStreamer final : public ServerReaderWriterInterface<ResponseType, RequestType>
  887. {
  888. public:
  889. /// Block to send initial metadata to client.
  890. /// Implicit input parameter:
  891. /// - the \a ServerContext associated with this call will be used for
  892. /// sending initial metadata.
  893. void SendInitialMetadata() override
  894. {
  895. body_.SendInitialMetadata();
  896. }
  897. /// Get an upper bound on the request message size from the client.
  898. bool NextMessageSize(uint32_t* sz) override
  899. {
  900. return body_.NextMessageSize(sz);
  901. }
  902. /// Read a message of type \a R into \a msg. Completion will be notified by \a
  903. /// tag on the associated completion queue.
  904. /// This is thread-safe with respect to \a Write or \a WritesDone methods. It
  905. /// should not be called concurrently with other streaming APIs
  906. /// on the same stream. It is not meaningful to call it concurrently
  907. /// with another \a ReaderInterface::Read on the same stream since reads on
  908. /// the same stream are delivered in order.
  909. ///
  910. /// \param[out] msg Where to eventually store the read message.
  911. /// \param[in] tag The tag identifying the operation.
  912. bool Read(RequestType* request) override
  913. {
  914. if (read_done_)
  915. {
  916. return false;
  917. }
  918. read_done_ = true;
  919. return body_.Read(request);
  920. }
  921. /// Block to write \a msg to the stream with WriteOptions \a options.
  922. /// This is thread-safe with respect to \a ReaderInterface::Read
  923. ///
  924. /// \param msg The message to be written to the stream.
  925. /// \param options The WriteOptions affecting the write operation.
  926. ///
  927. /// \return \a true on success, \a false when the stream has been closed.
  928. using internal::WriterInterface<ResponseType>::Write;
  929. bool Write(const ResponseType& response, grpc::WriteOptions options) override
  930. {
  931. return read_done_ && body_.Write(response, options);
  932. }
  933. private:
  934. internal::ServerReaderWriterBody<ResponseType, RequestType> body_;
  935. bool read_done_;
  936. friend class internal::TemplatedBidiStreamingHandler<
  937. ServerSplitStreamer<RequestType, ResponseType>,
  938. false>;
  939. ServerSplitStreamer(grpc::internal::Call* call, grpc::ServerContext* ctx) :
  940. body_(call, ctx),
  941. read_done_(false)
  942. {
  943. }
  944. };
  945. } // namespace grpc
  946. #endif // GRPCPP_IMPL_CODEGEN_SYNC_STREAM_H