You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

server_callback.h 36 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953
  1. /*
  2. *
  3. * Copyright 2019 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
  18. #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H
  19. // IWYU pragma: private, include <grpcpp/support/server_callback.h>
  20. #include <atomic>
  21. #include <functional>
  22. #include <type_traits>
  23. #include <grpcpp/impl/codegen/call.h>
  24. #include <grpcpp/impl/codegen/call_op_set.h>
  25. #include <grpcpp/impl/codegen/callback_common.h>
  26. #include <grpcpp/impl/codegen/config.h>
  27. #include <grpcpp/impl/codegen/core_codegen_interface.h>
  28. #include <grpcpp/impl/codegen/message_allocator.h>
  29. #include <grpcpp/impl/codegen/status.h>
  30. #include <grpcpp/impl/codegen/sync.h>
  31. namespace grpc
  32. {
  33. // Declare base class of all reactors as internal
  34. namespace internal
  35. {
  36. // Forward declarations
  37. template<class Request, class Response>
  38. class CallbackUnaryHandler;
  39. template<class Request, class Response>
  40. class CallbackClientStreamingHandler;
  41. template<class Request, class Response>
  42. class CallbackServerStreamingHandler;
  43. template<class Request, class Response>
  44. class CallbackBidiHandler;
  45. class ServerReactor
  46. {
  47. public:
  48. virtual ~ServerReactor() = default;
  49. virtual void OnDone() = 0;
  50. virtual void OnCancel() = 0;
  51. // The following is not API. It is for internal use only and specifies whether
  52. // all reactions of this Reactor can be run without an extra executor
  53. // scheduling. This should only be used for internally-defined reactors with
  54. // trivial reactions.
  55. virtual bool InternalInlineable()
  56. {
  57. return false;
  58. }
  59. private:
  60. template<class Request, class Response>
  61. friend class CallbackUnaryHandler;
  62. template<class Request, class Response>
  63. friend class CallbackClientStreamingHandler;
  64. template<class Request, class Response>
  65. friend class CallbackServerStreamingHandler;
  66. template<class Request, class Response>
  67. friend class CallbackBidiHandler;
  68. };
  69. /// The base class of ServerCallbackUnary etc.
  70. class ServerCallbackCall
  71. {
  72. public:
  73. virtual ~ServerCallbackCall()
  74. {
  75. }
  76. // This object is responsible for tracking when it is safe to call OnDone and
  77. // OnCancel. OnDone should not be called until the method handler is complete,
  78. // Finish has been called, the ServerContext CompletionOp (which tracks
  79. // cancellation or successful completion) has completed, and all outstanding
  80. // Read/Write actions have seen their reactions. OnCancel should not be called
  81. // until after the method handler is done and the RPC has completed with a
  82. // cancellation. This is tracked by counting how many of these conditions have
  83. // been met and calling OnCancel when none remain unmet.
  84. // Public versions of MaybeDone: one where we don't know the reactor in
  85. // advance (used for the ServerContext CompletionOp), and one for where we
  86. // know the inlineability of the OnDone reaction. You should set the inline
  87. // flag to true if either the Reactor is InternalInlineable() or if this
  88. // callback is already being forced to run dispatched to an executor
  89. // (typically because it contains additional work than just the MaybeDone).
  90. void MaybeDone()
  91. {
  92. if (GPR_UNLIKELY(Unref() == 1))
  93. {
  94. ScheduleOnDone(reactor()->InternalInlineable());
  95. }
  96. }
  97. void MaybeDone(bool inline_ondone)
  98. {
  99. if (GPR_UNLIKELY(Unref() == 1))
  100. {
  101. ScheduleOnDone(inline_ondone);
  102. }
  103. }
  104. // Fast version called with known reactor passed in, used from derived
  105. // classes, typically in non-cancel case
  106. void MaybeCallOnCancel(ServerReactor* reactor)
  107. {
  108. if (GPR_UNLIKELY(UnblockCancellation()))
  109. {
  110. CallOnCancel(reactor);
  111. }
  112. }
  113. // Slower version called from object that doesn't know the reactor a priori
  114. // (such as the ServerContext CompletionOp which is formed before the
  115. // reactor). This is used in cancel cases only, so it's ok to be slower and
  116. // invoke a virtual function.
  117. void MaybeCallOnCancel()
  118. {
  119. if (GPR_UNLIKELY(UnblockCancellation()))
  120. {
  121. CallOnCancel(reactor());
  122. }
  123. }
  124. protected:
  125. /// Increases the reference count
  126. void Ref()
  127. {
  128. callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed);
  129. }
  130. private:
  131. virtual ServerReactor* reactor() = 0;
  132. // CallOnDone performs the work required at completion of the RPC: invoking
  133. // the OnDone function and doing all necessary cleanup. This function is only
  134. // ever invoked on a fully-Unref'fed ServerCallbackCall.
  135. virtual void CallOnDone() = 0;
  136. // If the OnDone reaction is inlineable, execute it inline. Otherwise send it
  137. // to an executor.
  138. void ScheduleOnDone(bool inline_ondone);
  139. // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
  140. // it to an executor.
  141. void CallOnCancel(ServerReactor* reactor);
  142. // Implement the cancellation constraint counter. Return true if OnCancel
  143. // should be called, false otherwise.
  144. bool UnblockCancellation()
  145. {
  146. return on_cancel_conditions_remaining_.fetch_sub(
  147. 1, std::memory_order_acq_rel
  148. ) == 1;
  149. }
  150. /// Decreases the reference count and returns the previous value
  151. int Unref()
  152. {
  153. return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
  154. }
  155. std::atomic_int on_cancel_conditions_remaining_{2};
  156. std::atomic_int callbacks_outstanding_{
  157. 3}; // reserve for start, Finish, and CompletionOp
  158. };
  159. template<class Request, class Response>
  160. class DefaultMessageHolder : public MessageHolder<Request, Response>
  161. {
  162. public:
  163. DefaultMessageHolder()
  164. {
  165. this->set_request(&request_obj_);
  166. this->set_response(&response_obj_);
  167. }
  168. void Release() override
  169. {
  170. // the object is allocated in the call arena.
  171. this->~DefaultMessageHolder<Request, Response>();
  172. }
  173. private:
  174. Request request_obj_;
  175. Response response_obj_;
  176. };
  177. } // namespace internal
  178. // Forward declarations
  179. class ServerUnaryReactor;
  180. template<class Request>
  181. class ServerReadReactor;
  182. template<class Response>
  183. class ServerWriteReactor;
  184. template<class Request, class Response>
  185. class ServerBidiReactor;
  186. // NOTE: The actual call/stream object classes are provided as API only to
  187. // support mocking. There are no implementations of these class interfaces in
  188. // the API.
  189. class ServerCallbackUnary : public internal::ServerCallbackCall
  190. {
  191. public:
  192. ~ServerCallbackUnary() override
  193. {
  194. }
  195. virtual void Finish(grpc::Status s) = 0;
  196. virtual void SendInitialMetadata() = 0;
  197. protected:
  198. // Use a template rather than explicitly specifying ServerUnaryReactor to
  199. // delay binding and avoid a circular forward declaration issue
  200. template<class Reactor>
  201. void BindReactor(Reactor* reactor)
  202. {
  203. reactor->InternalBindCall(this);
  204. }
  205. };
  206. template<class Request>
  207. class ServerCallbackReader : public internal::ServerCallbackCall
  208. {
  209. public:
  210. ~ServerCallbackReader() override
  211. {
  212. }
  213. virtual void Finish(grpc::Status s) = 0;
  214. virtual void SendInitialMetadata() = 0;
  215. virtual void Read(Request* msg) = 0;
  216. protected:
  217. void BindReactor(ServerReadReactor<Request>* reactor)
  218. {
  219. reactor->InternalBindReader(this);
  220. }
  221. };
  222. template<class Response>
  223. class ServerCallbackWriter : public internal::ServerCallbackCall
  224. {
  225. public:
  226. ~ServerCallbackWriter() override
  227. {
  228. }
  229. virtual void Finish(grpc::Status s) = 0;
  230. virtual void SendInitialMetadata() = 0;
  231. virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
  232. virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options, grpc::Status s) = 0;
  233. protected:
  234. void BindReactor(ServerWriteReactor<Response>* reactor)
  235. {
  236. reactor->InternalBindWriter(this);
  237. }
  238. };
  239. template<class Request, class Response>
  240. class ServerCallbackReaderWriter : public internal::ServerCallbackCall
  241. {
  242. public:
  243. ~ServerCallbackReaderWriter() override
  244. {
  245. }
  246. virtual void Finish(grpc::Status s) = 0;
  247. virtual void SendInitialMetadata() = 0;
  248. virtual void Read(Request* msg) = 0;
  249. virtual void Write(const Response* msg, grpc::WriteOptions options) = 0;
  250. virtual void WriteAndFinish(const Response* msg, grpc::WriteOptions options, grpc::Status s) = 0;
  251. protected:
  252. void BindReactor(ServerBidiReactor<Request, Response>* reactor)
  253. {
  254. reactor->InternalBindStream(this);
  255. }
  256. };
  257. // The following classes are the reactor interfaces that are to be implemented
  258. // by the user, returned as the output parameter of the method handler for a
  259. // callback method. Note that none of the classes are pure; all reactions have a
  260. // default empty reaction so that the user class only needs to override those
  261. // reactions that it cares about. The reaction methods will be invoked by the
  262. // library in response to the completion of various operations. Reactions must
  263. // not include blocking operations (such as blocking I/O, starting synchronous
  264. // RPCs, or waiting on condition variables). Reactions may be invoked
  265. // concurrently, except that OnDone is called after all others (assuming proper
  266. // API usage). The reactor may not be deleted until OnDone is called.
  267. /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
  268. template<class Request, class Response>
  269. class ServerBidiReactor : public internal::ServerReactor
  270. {
  271. public:
  272. // NOTE: Initializing stream_ as a constructor initializer rather than a
  273. // default initializer because gcc-4.x requires a copy constructor for
  274. // default initializing a templated member, which isn't ok for atomic.
  275. // TODO(vjpai): Switch to default constructor and default initializer when
  276. // gcc-4.x is no longer supported
  277. ServerBidiReactor() :
  278. stream_(nullptr)
  279. {
  280. }
  281. ~ServerBidiReactor() override = default;
  282. /// Send any initial metadata stored in the RPC context. If not invoked,
  283. /// any initial metadata will be passed along with the first Write or the
  284. /// Finish (if there are no writes).
  285. void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(stream_mu_)
  286. {
  287. ServerCallbackReaderWriter<Request, Response>* stream =
  288. stream_.load(std::memory_order_acquire);
  289. if (stream == nullptr)
  290. {
  291. grpc::internal::MutexLock l(&stream_mu_);
  292. stream = stream_.load(std::memory_order_relaxed);
  293. if (stream == nullptr)
  294. {
  295. backlog_.send_initial_metadata_wanted = true;
  296. return;
  297. }
  298. }
  299. stream->SendInitialMetadata();
  300. }
  301. /// Initiate a read operation.
  302. ///
  303. /// \param[out] req Where to eventually store the read message. Valid when
  304. /// the library calls OnReadDone
  305. void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(stream_mu_)
  306. {
  307. ServerCallbackReaderWriter<Request, Response>* stream =
  308. stream_.load(std::memory_order_acquire);
  309. if (stream == nullptr)
  310. {
  311. grpc::internal::MutexLock l(&stream_mu_);
  312. stream = stream_.load(std::memory_order_relaxed);
  313. if (stream == nullptr)
  314. {
  315. backlog_.read_wanted = req;
  316. return;
  317. }
  318. }
  319. stream->Read(req);
  320. }
  321. /// Initiate a write operation.
  322. ///
  323. /// \param[in] resp The message to be written. The library does not take
  324. /// ownership but the caller must ensure that the message is
  325. /// not deleted or modified until OnWriteDone is called.
  326. void StartWrite(const Response* resp)
  327. {
  328. StartWrite(resp, grpc::WriteOptions());
  329. }
  330. /// Initiate a write operation with specified options.
  331. ///
  332. /// \param[in] resp The message to be written. The library does not take
  333. /// ownership but the caller must ensure that the message is
  334. /// not deleted or modified until OnWriteDone is called.
  335. /// \param[in] options The WriteOptions to use for writing this message
  336. void StartWrite(const Response* resp, grpc::WriteOptions options)
  337. ABSL_LOCKS_EXCLUDED(stream_mu_)
  338. {
  339. ServerCallbackReaderWriter<Request, Response>* stream =
  340. stream_.load(std::memory_order_acquire);
  341. if (stream == nullptr)
  342. {
  343. grpc::internal::MutexLock l(&stream_mu_);
  344. stream = stream_.load(std::memory_order_relaxed);
  345. if (stream == nullptr)
  346. {
  347. backlog_.write_wanted = resp;
  348. backlog_.write_options_wanted = options;
  349. return;
  350. }
  351. }
  352. stream->Write(resp, options);
  353. }
  354. /// Initiate a write operation with specified options and final RPC Status,
  355. /// which also causes any trailing metadata for this RPC to be sent out.
  356. /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
  357. /// single step. A key difference, though, is that this operation doesn't have
  358. /// an OnWriteDone reaction - it is considered complete only when OnDone is
  359. /// available. An RPC can either have StartWriteAndFinish or Finish, but not
  360. /// both.
  361. ///
  362. /// \param[in] resp The message to be written. The library does not take
  363. /// ownership but the caller must ensure that the message is
  364. /// not deleted or modified until OnDone is called.
  365. /// \param[in] options The WriteOptions to use for writing this message
  366. /// \param[in] s The status outcome of this RPC
  367. void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options, grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_)
  368. {
  369. ServerCallbackReaderWriter<Request, Response>* stream =
  370. stream_.load(std::memory_order_acquire);
  371. if (stream == nullptr)
  372. {
  373. grpc::internal::MutexLock l(&stream_mu_);
  374. stream = stream_.load(std::memory_order_relaxed);
  375. if (stream == nullptr)
  376. {
  377. backlog_.write_and_finish_wanted = true;
  378. backlog_.write_wanted = resp;
  379. backlog_.write_options_wanted = options;
  380. backlog_.status_wanted = std::move(s);
  381. return;
  382. }
  383. }
  384. stream->WriteAndFinish(resp, options, std::move(s));
  385. }
  386. /// Inform system of a planned write operation with specified options, but
  387. /// allow the library to schedule the actual write coalesced with the writing
  388. /// of trailing metadata (which takes place on a Finish call).
  389. ///
  390. /// \param[in] resp The message to be written. The library does not take
  391. /// ownership but the caller must ensure that the message is
  392. /// not deleted or modified until OnWriteDone is called.
  393. /// \param[in] options The WriteOptions to use for writing this message
  394. void StartWriteLast(const Response* resp, grpc::WriteOptions options)
  395. {
  396. StartWrite(resp, options.set_last_message());
  397. }
  398. /// Indicate that the stream is to be finished and the trailing metadata and
  399. /// RPC status are to be sent. Every RPC MUST be finished using either Finish
  400. /// or StartWriteAndFinish (but not both), even if the RPC is already
  401. /// cancelled.
  402. ///
  403. /// \param[in] s The status outcome of this RPC
  404. void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(stream_mu_)
  405. {
  406. ServerCallbackReaderWriter<Request, Response>* stream =
  407. stream_.load(std::memory_order_acquire);
  408. if (stream == nullptr)
  409. {
  410. grpc::internal::MutexLock l(&stream_mu_);
  411. stream = stream_.load(std::memory_order_relaxed);
  412. if (stream == nullptr)
  413. {
  414. backlog_.finish_wanted = true;
  415. backlog_.status_wanted = std::move(s);
  416. return;
  417. }
  418. }
  419. stream->Finish(std::move(s));
  420. }
  421. /// Notifies the application that an explicit StartSendInitialMetadata
  422. /// operation completed. Not used when the sending of initial metadata
  423. /// piggybacks onto the first write.
  424. ///
  425. /// \param[in] ok Was it successful? If false, no further write-side operation
  426. /// will succeed.
  427. virtual void OnSendInitialMetadataDone(bool /*ok*/)
  428. {
  429. }
  430. /// Notifies the application that a StartRead operation completed.
  431. ///
  432. /// \param[in] ok Was it successful? If false, no further read-side operation
  433. /// will succeed.
  434. virtual void OnReadDone(bool /*ok*/)
  435. {
  436. }
  437. /// Notifies the application that a StartWrite (or StartWriteLast) operation
  438. /// completed.
  439. ///
  440. /// \param[in] ok Was it successful? If false, no further write-side operation
  441. /// will succeed.
  442. virtual void OnWriteDone(bool /*ok*/)
  443. {
  444. }
  445. /// Notifies the application that all operations associated with this RPC
  446. /// have completed. This is an override (from the internal base class) but
  447. /// still abstract, so derived classes MUST override it to be instantiated.
  448. void OnDone() override = 0;
  449. /// Notifies the application that this RPC has been cancelled. This is an
  450. /// override (from the internal base class) but not final, so derived classes
  451. /// should override it if they want to take action.
  452. void OnCancel() override
  453. {
  454. }
  455. private:
  456. friend class ServerCallbackReaderWriter<Request, Response>;
  457. // May be overridden by internal implementation details. This is not a public
  458. // customization point.
  459. virtual void InternalBindStream(
  460. ServerCallbackReaderWriter<Request, Response>* stream
  461. )
  462. {
  463. grpc::internal::MutexLock l(&stream_mu_);
  464. if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted))
  465. {
  466. stream->SendInitialMetadata();
  467. }
  468. if (GPR_UNLIKELY(backlog_.read_wanted != nullptr))
  469. {
  470. stream->Read(backlog_.read_wanted);
  471. }
  472. if (GPR_UNLIKELY(backlog_.write_and_finish_wanted))
  473. {
  474. stream->WriteAndFinish(backlog_.write_wanted, std::move(backlog_.write_options_wanted), std::move(backlog_.status_wanted));
  475. }
  476. else
  477. {
  478. if (GPR_UNLIKELY(backlog_.write_wanted != nullptr))
  479. {
  480. stream->Write(backlog_.write_wanted, std::move(backlog_.write_options_wanted));
  481. }
  482. if (GPR_UNLIKELY(backlog_.finish_wanted))
  483. {
  484. stream->Finish(std::move(backlog_.status_wanted));
  485. }
  486. }
  487. // Set stream_ last so that other functions can use it lock-free
  488. stream_.store(stream, std::memory_order_release);
  489. }
  490. grpc::internal::Mutex stream_mu_;
  491. // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
  492. // once C++17 or ABSL is supported since stream and backlog are
  493. // mutually exclusive in this class. Do likewise with the
  494. // remaining reactor classes and their backlogs as well.
  495. std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
  496. struct PreBindBacklog
  497. {
  498. bool send_initial_metadata_wanted = false;
  499. bool write_and_finish_wanted = false;
  500. bool finish_wanted = false;
  501. Request* read_wanted = nullptr;
  502. const Response* write_wanted = nullptr;
  503. grpc::WriteOptions write_options_wanted;
  504. grpc::Status status_wanted;
  505. };
  506. PreBindBacklog backlog_ ABSL_GUARDED_BY(stream_mu_);
  507. };
  508. /// \a ServerReadReactor is the interface for a client-streaming RPC.
  509. template<class Request>
  510. class ServerReadReactor : public internal::ServerReactor
  511. {
  512. public:
  513. ServerReadReactor() :
  514. reader_(nullptr)
  515. {
  516. }
  517. ~ServerReadReactor() override = default;
  518. /// The following operation initiations are exactly like ServerBidiReactor.
  519. void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(reader_mu_)
  520. {
  521. ServerCallbackReader<Request>* reader =
  522. reader_.load(std::memory_order_acquire);
  523. if (reader == nullptr)
  524. {
  525. grpc::internal::MutexLock l(&reader_mu_);
  526. reader = reader_.load(std::memory_order_relaxed);
  527. if (reader == nullptr)
  528. {
  529. backlog_.send_initial_metadata_wanted = true;
  530. return;
  531. }
  532. }
  533. reader->SendInitialMetadata();
  534. }
  535. void StartRead(Request* req) ABSL_LOCKS_EXCLUDED(reader_mu_)
  536. {
  537. ServerCallbackReader<Request>* reader =
  538. reader_.load(std::memory_order_acquire);
  539. if (reader == nullptr)
  540. {
  541. grpc::internal::MutexLock l(&reader_mu_);
  542. reader = reader_.load(std::memory_order_relaxed);
  543. if (reader == nullptr)
  544. {
  545. backlog_.read_wanted = req;
  546. return;
  547. }
  548. }
  549. reader->Read(req);
  550. }
  551. void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(reader_mu_)
  552. {
  553. ServerCallbackReader<Request>* reader =
  554. reader_.load(std::memory_order_acquire);
  555. if (reader == nullptr)
  556. {
  557. grpc::internal::MutexLock l(&reader_mu_);
  558. reader = reader_.load(std::memory_order_relaxed);
  559. if (reader == nullptr)
  560. {
  561. backlog_.finish_wanted = true;
  562. backlog_.status_wanted = std::move(s);
  563. return;
  564. }
  565. }
  566. reader->Finish(std::move(s));
  567. }
  568. /// The following notifications are exactly like ServerBidiReactor.
  569. virtual void OnSendInitialMetadataDone(bool /*ok*/)
  570. {
  571. }
  572. virtual void OnReadDone(bool /*ok*/)
  573. {
  574. }
  575. void OnDone() override = 0;
  576. void OnCancel() override
  577. {
  578. }
  579. private:
  580. friend class ServerCallbackReader<Request>;
  581. // May be overridden by internal implementation details. This is not a public
  582. // customization point.
  583. virtual void InternalBindReader(ServerCallbackReader<Request>* reader)
  584. ABSL_LOCKS_EXCLUDED(reader_mu_)
  585. {
  586. grpc::internal::MutexLock l(&reader_mu_);
  587. if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted))
  588. {
  589. reader->SendInitialMetadata();
  590. }
  591. if (GPR_UNLIKELY(backlog_.read_wanted != nullptr))
  592. {
  593. reader->Read(backlog_.read_wanted);
  594. }
  595. if (GPR_UNLIKELY(backlog_.finish_wanted))
  596. {
  597. reader->Finish(std::move(backlog_.status_wanted));
  598. }
  599. // Set reader_ last so that other functions can use it lock-free
  600. reader_.store(reader, std::memory_order_release);
  601. }
  602. grpc::internal::Mutex reader_mu_;
  603. std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
  604. struct PreBindBacklog
  605. {
  606. bool send_initial_metadata_wanted = false;
  607. bool finish_wanted = false;
  608. Request* read_wanted = nullptr;
  609. grpc::Status status_wanted;
  610. };
  611. PreBindBacklog backlog_ ABSL_GUARDED_BY(reader_mu_);
  612. };
  613. /// \a ServerWriteReactor is the interface for a server-streaming RPC.
  614. template<class Response>
  615. class ServerWriteReactor : public internal::ServerReactor
  616. {
  617. public:
  618. ServerWriteReactor() :
  619. writer_(nullptr)
  620. {
  621. }
  622. ~ServerWriteReactor() override = default;
  623. /// The following operation initiations are exactly like ServerBidiReactor.
  624. void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(writer_mu_)
  625. {
  626. ServerCallbackWriter<Response>* writer =
  627. writer_.load(std::memory_order_acquire);
  628. if (writer == nullptr)
  629. {
  630. grpc::internal::MutexLock l(&writer_mu_);
  631. writer = writer_.load(std::memory_order_relaxed);
  632. if (writer == nullptr)
  633. {
  634. backlog_.send_initial_metadata_wanted = true;
  635. return;
  636. }
  637. }
  638. writer->SendInitialMetadata();
  639. }
  640. void StartWrite(const Response* resp)
  641. {
  642. StartWrite(resp, grpc::WriteOptions());
  643. }
  644. void StartWrite(const Response* resp, grpc::WriteOptions options)
  645. ABSL_LOCKS_EXCLUDED(writer_mu_)
  646. {
  647. ServerCallbackWriter<Response>* writer =
  648. writer_.load(std::memory_order_acquire);
  649. if (writer == nullptr)
  650. {
  651. grpc::internal::MutexLock l(&writer_mu_);
  652. writer = writer_.load(std::memory_order_relaxed);
  653. if (writer == nullptr)
  654. {
  655. backlog_.write_wanted = resp;
  656. backlog_.write_options_wanted = options;
  657. return;
  658. }
  659. }
  660. writer->Write(resp, options);
  661. }
  662. void StartWriteAndFinish(const Response* resp, grpc::WriteOptions options, grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
  663. {
  664. ServerCallbackWriter<Response>* writer =
  665. writer_.load(std::memory_order_acquire);
  666. if (writer == nullptr)
  667. {
  668. grpc::internal::MutexLock l(&writer_mu_);
  669. writer = writer_.load(std::memory_order_relaxed);
  670. if (writer == nullptr)
  671. {
  672. backlog_.write_and_finish_wanted = true;
  673. backlog_.write_wanted = resp;
  674. backlog_.write_options_wanted = options;
  675. backlog_.status_wanted = std::move(s);
  676. return;
  677. }
  678. }
  679. writer->WriteAndFinish(resp, options, std::move(s));
  680. }
  681. void StartWriteLast(const Response* resp, grpc::WriteOptions options)
  682. {
  683. StartWrite(resp, options.set_last_message());
  684. }
  685. void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(writer_mu_)
  686. {
  687. ServerCallbackWriter<Response>* writer =
  688. writer_.load(std::memory_order_acquire);
  689. if (writer == nullptr)
  690. {
  691. grpc::internal::MutexLock l(&writer_mu_);
  692. writer = writer_.load(std::memory_order_relaxed);
  693. if (writer == nullptr)
  694. {
  695. backlog_.finish_wanted = true;
  696. backlog_.status_wanted = std::move(s);
  697. return;
  698. }
  699. }
  700. writer->Finish(std::move(s));
  701. }
  702. /// The following notifications are exactly like ServerBidiReactor.
  703. virtual void OnSendInitialMetadataDone(bool /*ok*/)
  704. {
  705. }
  706. virtual void OnWriteDone(bool /*ok*/)
  707. {
  708. }
  709. void OnDone() override = 0;
  710. void OnCancel() override
  711. {
  712. }
  713. private:
  714. friend class ServerCallbackWriter<Response>;
  715. // May be overridden by internal implementation details. This is not a public
  716. // customization point.
  717. virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer)
  718. ABSL_LOCKS_EXCLUDED(writer_mu_)
  719. {
  720. grpc::internal::MutexLock l(&writer_mu_);
  721. if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted))
  722. {
  723. writer->SendInitialMetadata();
  724. }
  725. if (GPR_UNLIKELY(backlog_.write_and_finish_wanted))
  726. {
  727. writer->WriteAndFinish(backlog_.write_wanted, std::move(backlog_.write_options_wanted), std::move(backlog_.status_wanted));
  728. }
  729. else
  730. {
  731. if (GPR_UNLIKELY(backlog_.write_wanted != nullptr))
  732. {
  733. writer->Write(backlog_.write_wanted, std::move(backlog_.write_options_wanted));
  734. }
  735. if (GPR_UNLIKELY(backlog_.finish_wanted))
  736. {
  737. writer->Finish(std::move(backlog_.status_wanted));
  738. }
  739. }
  740. // Set writer_ last so that other functions can use it lock-free
  741. writer_.store(writer, std::memory_order_release);
  742. }
  743. grpc::internal::Mutex writer_mu_;
  744. std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
  745. struct PreBindBacklog
  746. {
  747. bool send_initial_metadata_wanted = false;
  748. bool write_and_finish_wanted = false;
  749. bool finish_wanted = false;
  750. const Response* write_wanted = nullptr;
  751. grpc::WriteOptions write_options_wanted;
  752. grpc::Status status_wanted;
  753. };
  754. PreBindBacklog backlog_ ABSL_GUARDED_BY(writer_mu_);
  755. };
  756. class ServerUnaryReactor : public internal::ServerReactor
  757. {
  758. public:
  759. ServerUnaryReactor() :
  760. call_(nullptr)
  761. {
  762. }
  763. ~ServerUnaryReactor() override = default;
  764. /// StartSendInitialMetadata is exactly like ServerBidiReactor.
  765. void StartSendInitialMetadata() ABSL_LOCKS_EXCLUDED(call_mu_)
  766. {
  767. ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
  768. if (call == nullptr)
  769. {
  770. grpc::internal::MutexLock l(&call_mu_);
  771. call = call_.load(std::memory_order_relaxed);
  772. if (call == nullptr)
  773. {
  774. backlog_.send_initial_metadata_wanted = true;
  775. return;
  776. }
  777. }
  778. call->SendInitialMetadata();
  779. }
  780. /// Finish is similar to ServerBidiReactor except for one detail.
  781. /// If the status is non-OK, any message will not be sent. Instead,
  782. /// the client will only receive the status and any trailing metadata.
  783. void Finish(grpc::Status s) ABSL_LOCKS_EXCLUDED(call_mu_)
  784. {
  785. ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
  786. if (call == nullptr)
  787. {
  788. grpc::internal::MutexLock l(&call_mu_);
  789. call = call_.load(std::memory_order_relaxed);
  790. if (call == nullptr)
  791. {
  792. backlog_.finish_wanted = true;
  793. backlog_.status_wanted = std::move(s);
  794. return;
  795. }
  796. }
  797. call->Finish(std::move(s));
  798. }
  799. /// The following notifications are exactly like ServerBidiReactor.
  800. virtual void OnSendInitialMetadataDone(bool /*ok*/)
  801. {
  802. }
  803. void OnDone() override = 0;
  804. void OnCancel() override
  805. {
  806. }
  807. private:
  808. friend class ServerCallbackUnary;
  809. // May be overridden by internal implementation details. This is not a public
  810. // customization point.
  811. virtual void InternalBindCall(ServerCallbackUnary* call)
  812. ABSL_LOCKS_EXCLUDED(call_mu_)
  813. {
  814. grpc::internal::MutexLock l(&call_mu_);
  815. if (GPR_UNLIKELY(backlog_.send_initial_metadata_wanted))
  816. {
  817. call->SendInitialMetadata();
  818. }
  819. if (GPR_UNLIKELY(backlog_.finish_wanted))
  820. {
  821. call->Finish(std::move(backlog_.status_wanted));
  822. }
  823. // Set call_ last so that other functions can use it lock-free
  824. call_.store(call, std::memory_order_release);
  825. }
  826. grpc::internal::Mutex call_mu_;
  827. std::atomic<ServerCallbackUnary*> call_{nullptr};
  828. struct PreBindBacklog
  829. {
  830. bool send_initial_metadata_wanted = false;
  831. bool finish_wanted = false;
  832. grpc::Status status_wanted;
  833. };
  834. PreBindBacklog backlog_ ABSL_GUARDED_BY(call_mu_);
  835. };
  836. namespace internal
  837. {
  838. template<class Base>
  839. class FinishOnlyReactor : public Base
  840. {
  841. public:
  842. explicit FinishOnlyReactor(grpc::Status s)
  843. {
  844. this->Finish(std::move(s));
  845. }
  846. void OnDone() override
  847. {
  848. this->~FinishOnlyReactor();
  849. }
  850. };
  851. using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
  852. template<class Request>
  853. using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
  854. template<class Response>
  855. using UnimplementedWriteReactor =
  856. FinishOnlyReactor<ServerWriteReactor<Response>>;
  857. template<class Request, class Response>
  858. using UnimplementedBidiReactor =
  859. FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
  860. } // namespace internal
  861. // TODO(vjpai): Remove namespace experimental when last known users are migrated
  862. // off.
  863. namespace experimental
  864. {
  865. template<class Request, class Response>
  866. using ServerBidiReactor = ::grpc::ServerBidiReactor<Request, Response>;
  867. } // namespace experimental
  868. } // namespace grpc
  869. #endif // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_H