LCOV - code coverage report
Current view: top level - include/grpc++/support - async_stream.h (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 145 191 75.9 %
Date: 2015-10-10 Functions: 92 199 46.2 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #ifndef GRPCXX_SUPPORT_ASYNC_STREAM_H
      35             : #define GRPCXX_SUPPORT_ASYNC_STREAM_H
      36             : 
      37             : #include <grpc/support/log.h>
      38             : #include <grpc++/channel.h>
      39             : #include <grpc++/client_context.h>
      40             : #include <grpc++/completion_queue.h>
      41             : #include <grpc++/impl/call.h>
      42             : #include <grpc++/impl/service_type.h>
      43             : #include <grpc++/server_context.h>
      44             : #include <grpc++/support/status.h>
      45             : 
      46             : namespace grpc {
      47             : 
      48             : /// Common interface for all client side asynchronous streaming.
      49          20 : class ClientAsyncStreamingInterface {
      50             :  public:
      51          20 :   virtual ~ClientAsyncStreamingInterface() {}
      52             : 
      53             :   /// Request notification of the reading of the initial metadata. Completion
      54             :   /// will be notified by \a tag on the associated completion queue.
      55             :   ///
      56             :   /// \param[in] tag Tag identifying this request.
      57             :   virtual void ReadInitialMetadata(void* tag) = 0;
      58             : 
      59             :   /// Request notification completion.
      60             :   ///
      61             :   /// \param[out] status To be updated with the operation status.
      62             :   /// \param[in] tag Tag identifying this request.
      63             :   virtual void Finish(Status* status, void* tag) = 0;
      64             : };
      65             : 
      66             : /// An interface that yields a sequence of messages of type \a R.
      67             : template <class R>
      68       90437 : class AsyncReaderInterface {
      69             :  public:
      70       64716 :   virtual ~AsyncReaderInterface() {}
      71             : 
      72             :   /// Read a message of type \a R into \a msg. Completion will be notified by \a
      73             :   /// tag on the associated completion queue.
      74             :   ///
      75             :   /// \param[out] msg Where to eventually store the read message.
      76             :   /// \param[in] tag The tag identifying the operation.
      77             :   virtual void Read(R* msg, void* tag) = 0;
      78             : };
      79             : 
      80             : /// An interface that can be fed a sequence of messages of type \a W.
      81             : template <class W>
      82       90435 : class AsyncWriterInterface {
      83             :  public:
      84       64795 :   virtual ~AsyncWriterInterface() {}
      85             : 
      86             :   /// Request the writing of \a msg with identifying tag \a tag.
      87             :   ///
      88             :   /// \param[in] msg The message to be written.
      89             :   /// \param[in] tag The tag identifying the operation.
      90             :   virtual void Write(const W& msg, void* tag) = 0;
      91             : };
      92             : 
      93             : template <class R>
      94           4 : class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface,
      95             :                                    public AsyncReaderInterface<R> {};
      96             : 
      97             : template <class R>
      98           4 : class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
      99             :  public:
     100             :   /// Create a stream and write the first request out.
     101             :   template <class W>
     102           2 :   ClientAsyncReader(Channel* channel, CompletionQueue* cq,
     103             :                     const RpcMethod& method, ClientContext* context,
     104             :                     const W& request, void* tag)
     105           2 :       : context_(context), call_(channel->CreateCall(method, context, cq)) {
     106           2 :     init_ops_.set_output_tag(tag);
     107           2 :     init_ops_.SendInitialMetadata(context->send_initial_metadata_);
     108             :     // TODO(ctiller): don't assert
     109           2 :     GPR_ASSERT(init_ops_.SendMessage(request).ok());
     110           2 :     init_ops_.ClientSendClose();
     111           2 :     call_.PerformOps(&init_ops_);
     112           2 :   }
     113             : 
     114           0 :   void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
     115           0 :     GPR_ASSERT(!context_->initial_metadata_received_);
     116             : 
     117           0 :     meta_ops_.set_output_tag(tag);
     118           0 :     meta_ops_.RecvInitialMetadata(context_);
     119           0 :     call_.PerformOps(&meta_ops_);
     120           0 :   }
     121             : 
     122           6 :   void Read(R* msg, void* tag) GRPC_OVERRIDE {
     123           6 :     read_ops_.set_output_tag(tag);
     124           6 :     if (!context_->initial_metadata_received_) {
     125           2 :       read_ops_.RecvInitialMetadata(context_);
     126             :     }
     127           6 :     read_ops_.RecvMessage(msg);
     128           6 :     call_.PerformOps(&read_ops_);
     129           6 :   }
     130             : 
     131           2 :   void Finish(Status* status, void* tag) GRPC_OVERRIDE {
     132           2 :     finish_ops_.set_output_tag(tag);
     133           2 :     if (!context_->initial_metadata_received_) {
     134           0 :       finish_ops_.RecvInitialMetadata(context_);
     135             :     }
     136           2 :     finish_ops_.ClientRecvStatus(context_, status);
     137           2 :     call_.PerformOps(&finish_ops_);
     138           2 :   }
     139             : 
     140             :  private:
     141             :   ClientContext* context_;
     142             :   Call call_;
     143             :   CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
     144             :       init_ops_;
     145             :   CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
     146             :   CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
     147             :   CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
     148             : };
     149             : 
     150             : /// Common interface for client side asynchronous writing.
     151             : template <class W>
     152           4 : class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
     153             :                                    public AsyncWriterInterface<W> {
     154             :  public:
     155             :   /// Signal the client is done with the writes.
     156             :   ///
     157             :   /// \param[in] tag The tag identifying the operation.
     158             :   virtual void WritesDone(void* tag) = 0;
     159             : };
     160             : 
     161             : template <class W>
     162           4 : class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
     163             :  public:
     164             :   template <class R>
     165           2 :   ClientAsyncWriter(Channel* channel, CompletionQueue* cq,
     166             :                     const RpcMethod& method, ClientContext* context,
     167             :                     R* response, void* tag)
     168           2 :       : context_(context), call_(channel->CreateCall(method, context, cq)) {
     169           2 :     finish_ops_.RecvMessage(response);
     170             : 
     171           2 :     init_ops_.set_output_tag(tag);
     172           2 :     init_ops_.SendInitialMetadata(context->send_initial_metadata_);
     173           2 :     call_.PerformOps(&init_ops_);
     174           2 :   }
     175             : 
     176           0 :   void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
     177           0 :     GPR_ASSERT(!context_->initial_metadata_received_);
     178             : 
     179           0 :     meta_ops_.set_output_tag(tag);
     180           0 :     meta_ops_.RecvInitialMetadata(context_);
     181           0 :     call_.PerformOps(&meta_ops_);
     182           0 :   }
     183             : 
     184           4 :   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
     185           4 :     write_ops_.set_output_tag(tag);
     186             :     // TODO(ctiller): don't assert
     187           4 :     GPR_ASSERT(write_ops_.SendMessage(msg).ok());
     188           4 :     call_.PerformOps(&write_ops_);
     189           4 :   }
     190             : 
     191           2 :   void WritesDone(void* tag) GRPC_OVERRIDE {
     192           2 :     writes_done_ops_.set_output_tag(tag);
     193           2 :     writes_done_ops_.ClientSendClose();
     194           2 :     call_.PerformOps(&writes_done_ops_);
     195           2 :   }
     196             : 
     197           2 :   void Finish(Status* status, void* tag) GRPC_OVERRIDE {
     198           2 :     finish_ops_.set_output_tag(tag);
     199           2 :     if (!context_->initial_metadata_received_) {
     200           2 :       finish_ops_.RecvInitialMetadata(context_);
     201             :     }
     202           2 :     finish_ops_.ClientRecvStatus(context_, status);
     203           2 :     call_.PerformOps(&finish_ops_);
     204           2 :   }
     205             : 
     206             :  private:
     207             :   ClientContext* context_;
     208             :   Call call_;
     209             :   CallOpSet<CallOpSendInitialMetadata> init_ops_;
     210             :   CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
     211             :   CallOpSet<CallOpSendMessage> write_ops_;
     212             :   CallOpSet<CallOpClientSendClose> writes_done_ops_;
     213             :   CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
     214             :             CallOpClientRecvStatus> finish_ops_;
     215             : };
     216             : 
     217             : /// Client-side interface for asynchronous bi-directional streaming.
     218             : template <class W, class R>
     219          32 : class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface,
     220             :                                          public AsyncWriterInterface<W>,
     221             :                                          public AsyncReaderInterface<R> {
     222             :  public:
     223             :   /// Signal the client is done with the writes.
     224             :   ///
     225             :   /// \param[in] tag The tag identifying the operation.
     226             :   virtual void WritesDone(void* tag) = 0;
     227             : };
     228             : 
     229             : template <class W, class R>
     230          32 : class ClientAsyncReaderWriter GRPC_FINAL
     231             :     : public ClientAsyncReaderWriterInterface<W, R> {
     232             :  public:
     233          16 :   ClientAsyncReaderWriter(Channel* channel, CompletionQueue* cq,
     234             :                           const RpcMethod& method, ClientContext* context,
     235             :                           void* tag)
     236          16 :       : context_(context), call_(channel->CreateCall(method, context, cq)) {
     237          16 :     init_ops_.set_output_tag(tag);
     238          16 :     init_ops_.SendInitialMetadata(context->send_initial_metadata_);
     239          16 :     call_.PerformOps(&init_ops_);
     240          16 :   }
     241             : 
     242           0 :   void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
     243           0 :     GPR_ASSERT(!context_->initial_metadata_received_);
     244             : 
     245           0 :     meta_ops_.set_output_tag(tag);
     246           0 :     meta_ops_.RecvInitialMetadata(context_);
     247           0 :     call_.PerformOps(&meta_ops_);
     248           0 :   }
     249             : 
     250       26539 :   void Read(R* msg, void* tag) GRPC_OVERRIDE {
     251       26539 :     read_ops_.set_output_tag(tag);
     252       26539 :     if (!context_->initial_metadata_received_) {
     253          16 :       read_ops_.RecvInitialMetadata(context_);
     254             :     }
     255       26539 :     read_ops_.RecvMessage(msg);
     256       26539 :     call_.PerformOps(&read_ops_);
     257       26539 :   }
     258             : 
     259       26540 :   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
     260       26540 :     write_ops_.set_output_tag(tag);
     261             :     // TODO(ctiller): don't assert
     262       26540 :     GPR_ASSERT(write_ops_.SendMessage(msg).ok());
     263       26540 :     call_.PerformOps(&write_ops_);
     264       26540 :   }
     265             : 
     266          15 :   void WritesDone(void* tag) GRPC_OVERRIDE {
     267          15 :     writes_done_ops_.set_output_tag(tag);
     268          15 :     writes_done_ops_.ClientSendClose();
     269          15 :     call_.PerformOps(&writes_done_ops_);
     270          15 :   }
     271             : 
     272          15 :   void Finish(Status* status, void* tag) GRPC_OVERRIDE {
     273          15 :     finish_ops_.set_output_tag(tag);
     274          15 :     if (!context_->initial_metadata_received_) {
     275           0 :       finish_ops_.RecvInitialMetadata(context_);
     276             :     }
     277          15 :     finish_ops_.ClientRecvStatus(context_, status);
     278          15 :     call_.PerformOps(&finish_ops_);
     279          15 :   }
     280             : 
     281             :  private:
     282             :   ClientContext* context_;
     283             :   Call call_;
     284             :   CallOpSet<CallOpSendInitialMetadata> init_ops_;
     285             :   CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
     286             :   CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
     287             :   CallOpSet<CallOpSendMessage> write_ops_;
     288             :   CallOpSet<CallOpClientSendClose> writes_done_ops_;
     289             :   CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
     290             : };
     291             : 
     292             : template <class W, class R>
     293           2 : class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
     294             :                                      public AsyncReaderInterface<R> {
     295             :  public:
     296           2 :   explicit ServerAsyncReader(ServerContext* ctx)
     297           2 :       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
     298             : 
     299           0 :   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     300           0 :     GPR_ASSERT(!ctx_->sent_initial_metadata_);
     301             : 
     302           0 :     meta_ops_.set_output_tag(tag);
     303           0 :     meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     304           0 :     ctx_->sent_initial_metadata_ = true;
     305           0 :     call_.PerformOps(&meta_ops_);
     306           0 :   }
     307             : 
     308           6 :   void Read(R* msg, void* tag) GRPC_OVERRIDE {
     309           6 :     read_ops_.set_output_tag(tag);
     310           6 :     read_ops_.RecvMessage(msg);
     311           6 :     call_.PerformOps(&read_ops_);
     312           6 :   }
     313             : 
     314           2 :   void Finish(const W& msg, const Status& status, void* tag) {
     315           2 :     finish_ops_.set_output_tag(tag);
     316           2 :     if (!ctx_->sent_initial_metadata_) {
     317           2 :       finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     318           2 :       ctx_->sent_initial_metadata_ = true;
     319             :     }
     320             :     // The response is dropped if the status is not OK.
     321           2 :     if (status.ok()) {
     322           2 :       finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
     323             :                                    finish_ops_.SendMessage(msg));
     324             :     } else {
     325           0 :       finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
     326             :     }
     327           2 :     call_.PerformOps(&finish_ops_);
     328           2 :   }
     329             : 
     330             :   void FinishWithError(const Status& status, void* tag) {
     331             :     GPR_ASSERT(!status.ok());
     332             :     finish_ops_.set_output_tag(tag);
     333             :     if (!ctx_->sent_initial_metadata_) {
     334             :       finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     335             :       ctx_->sent_initial_metadata_ = true;
     336             :     }
     337             :     finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
     338             :     call_.PerformOps(&finish_ops_);
     339             :   }
     340             : 
     341             :  private:
     342           2 :   void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
     343             : 
     344             :   Call call_;
     345             :   ServerContext* ctx_;
     346             :   CallOpSet<CallOpSendInitialMetadata> meta_ops_;
     347             :   CallOpSet<CallOpRecvMessage<R>> read_ops_;
     348             :   CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
     349             :             CallOpServerSendStatus> finish_ops_;
     350             : };
     351             : 
     352             : template <class W>
     353           2 : class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
     354             :                                      public AsyncWriterInterface<W> {
     355             :  public:
     356           2 :   explicit ServerAsyncWriter(ServerContext* ctx)
     357           2 :       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
     358             : 
     359           0 :   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     360           0 :     GPR_ASSERT(!ctx_->sent_initial_metadata_);
     361             : 
     362           0 :     meta_ops_.set_output_tag(tag);
     363           0 :     meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     364           0 :     ctx_->sent_initial_metadata_ = true;
     365           0 :     call_.PerformOps(&meta_ops_);
     366           0 :   }
     367             : 
     368           4 :   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
     369           4 :     write_ops_.set_output_tag(tag);
     370           4 :     if (!ctx_->sent_initial_metadata_) {
     371           2 :       write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     372           2 :       ctx_->sent_initial_metadata_ = true;
     373             :     }
     374             :     // TODO(ctiller): don't assert
     375           4 :     GPR_ASSERT(write_ops_.SendMessage(msg).ok());
     376           4 :     call_.PerformOps(&write_ops_);
     377           4 :   }
     378             : 
     379           2 :   void Finish(const Status& status, void* tag) {
     380           2 :     finish_ops_.set_output_tag(tag);
     381           2 :     if (!ctx_->sent_initial_metadata_) {
     382           0 :       finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     383           0 :       ctx_->sent_initial_metadata_ = true;
     384             :     }
     385           2 :     finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
     386           2 :     call_.PerformOps(&finish_ops_);
     387           2 :   }
     388             : 
     389             :  private:
     390           2 :   void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
     391             : 
     392             :   Call call_;
     393             :   ServerContext* ctx_;
     394             :   CallOpSet<CallOpSendInitialMetadata> meta_ops_;
     395             :   CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
     396             :   CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
     397             : };
     398             : 
     399             : /// Server-side interface for asynchronous bi-directional streaming.
     400             : template <class W, class R>
     401       90276 : class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
     402             :                                            public AsyncWriterInterface<W>,
     403             :                                            public AsyncReaderInterface<R> {
     404             :  public:
     405       65247 :   explicit ServerAsyncReaderWriter(ServerContext* ctx)
     406       65247 :       : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
     407             : 
     408           0 :   void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
     409           0 :     GPR_ASSERT(!ctx_->sent_initial_metadata_);
     410             : 
     411           0 :     meta_ops_.set_output_tag(tag);
     412           0 :     meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     413           0 :     ctx_->sent_initial_metadata_ = true;
     414           0 :     call_.PerformOps(&meta_ops_);
     415           0 :   }
     416             : 
     417       26543 :   void Read(R* msg, void* tag) GRPC_OVERRIDE {
     418       26543 :     read_ops_.set_output_tag(tag);
     419       26543 :     read_ops_.RecvMessage(msg);
     420       26543 :     call_.PerformOps(&read_ops_);
     421       26543 :   }
     422             : 
     423       26539 :   void Write(const W& msg, void* tag) GRPC_OVERRIDE {
     424       26539 :     write_ops_.set_output_tag(tag);
     425       26539 :     if (!ctx_->sent_initial_metadata_) {
     426          15 :       write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     427          15 :       ctx_->sent_initial_metadata_ = true;
     428             :     }
     429             :     // TODO(ctiller): don't assert
     430       26539 :     GPR_ASSERT(write_ops_.SendMessage(msg).ok());
     431       26539 :     call_.PerformOps(&write_ops_);
     432       26539 :   }
     433             : 
     434          14 :   void Finish(const Status& status, void* tag) {
     435          14 :     finish_ops_.set_output_tag(tag);
     436          14 :     if (!ctx_->sent_initial_metadata_) {
     437           0 :       finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
     438           0 :       ctx_->sent_initial_metadata_ = true;
     439             :     }
     440          14 :     finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
     441          14 :     call_.PerformOps(&finish_ops_);
     442          14 :   }
     443             : 
     444             :  private:
     445             :   friend class ::grpc::Server;
     446             : 
     447       64636 :   void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
     448             : 
     449             :   Call call_;
     450             :   ServerContext* ctx_;
     451             :   CallOpSet<CallOpSendInitialMetadata> meta_ops_;
     452             :   CallOpSet<CallOpRecvMessage<R>> read_ops_;
     453             :   CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
     454             :   CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
     455             : };
     456             : 
     457             : }  // namespace grpc
     458             : 
     459             : #endif  // GRPCXX_SUPPORT_ASYNC_STREAM_H

Generated by: LCOV version 1.10