LCOV - code coverage report
Current view: top level - include/grpc++/support - sync_stream.h (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 123 140 87.9 %
Date: 2015-10-10 Functions: 115 212 54.2 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #ifndef GRPCXX_SUPPORT_SYNC_STREAM_H
      35             : #define GRPCXX_SUPPORT_SYNC_STREAM_H
      36             : 
      37             : #include <grpc/support/log.h>
      38             : #include <grpc++/channel.h>
      39             : #include <grpc++/client_context.h>
      40             : #include <grpc++/completion_queue.h>
      41             : #include <grpc++/impl/call.h>
      42             : #include <grpc++/impl/service_type.h>
      43             : #include <grpc++/server_context.h>
      44             : #include <grpc++/support/status.h>
      45             : 
      46             : namespace grpc {
      47             : 
      48             : /// Common interface for all synchronous client side streaming.
      49          41 : class ClientStreamingInterface {
      50             :  public:
      51          41 :   virtual ~ClientStreamingInterface() {}
      52             : 
      53             :   /// Wait until the stream finishes, and return the final status. When the
      54             :   /// client side declares it has no more message to send, either implicitly or
      55             :   /// by calling \a WritesDone(), it needs to make sure there is no more message
      56             :   /// to be received from the server, either implicitly or by getting a false
      57             :   /// from a \a Read().
      58             :   ///
      59             :   /// This function will return either:
      60             :   /// - when all incoming messages have been read and the server has returned
      61             :   ///   status.
      62             :   /// - OR when the server has returned a non-OK status.
      63             :   virtual Status Finish() = 0;
      64             : };
      65             : 
      66             : /// An interface that yields a sequence of messages of type \a R.
      67             : template <class R>
      68          63 : class ReaderInterface {
      69             :  public:
      70          63 :   virtual ~ReaderInterface() {}
      71             : 
      72             :   /// Blocking read a message and parse to \a msg. Returns \a true on success.
      73             :   ///
      74             :   /// \param[out] msg The read message.
      75             :   ///
      76             :   /// \return \a false when there will be no more incoming messages, either
      77             :   /// because the other side has called \a WritesDone() or the stream has failed
      78             :   /// (or been cancelled).
      79             :   virtual bool Read(R* msg) = 0;
      80             : };
      81             : 
      82             : /// An interface that can be fed a sequence of messages of type \a W.
      83             : template <class W>
      84          64 : class WriterInterface {
      85             :  public:
      86          64 :   virtual ~WriterInterface() {}
      87             : 
      88             :   /// Blocking write \a msg to the stream with options.
      89             :   ///
      90             :   /// \param msg The message to be written to the stream.
      91             :   /// \param options Options affecting the write operation.
      92             :   ///
      93             :   /// \return \a true on success, \a false when the stream has been closed.
      94             :   virtual bool Write(const W& msg, const WriteOptions& options) = 0;
      95             : 
      96             :   /// Blocking write \a msg to the stream with default options.
      97             :   ///
      98             :   /// \param msg The message to be written to the stream.
      99             :   ///
     100             :   /// \return \a true on success, \a false when the stream has been closed.
     101      144656 :   inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
     102             : };
     103             : 
     104             : /// Client-side interface for streaming reads of message of type \a R.
     105             : template <class R>
     106           8 : class ClientReaderInterface : public ClientStreamingInterface,
     107             :                               public ReaderInterface<R> {
     108             :  public:
     109             :   /// Blocking wait for initial metadata from server. The received metadata
     110             :   /// can only be accessed after this call returns. Should only be called before
     111             :   /// the first read. Calling this method is optional, and if it is not called
     112             :   /// the metadata will be available in ClientContext after the first read.
     113             :   virtual void WaitForInitialMetadata() = 0;
     114             : };
     115             : 
     116             : template <class R>
     117           8 : class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
     118             :  public:
     119             :   /// Blocking create a stream and write the first request out.
     120             :   template <class W>
     121           4 :   ClientReader(Channel* channel, const RpcMethod& method,
     122             :                ClientContext* context, const W& request)
     123           4 :       : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
     124             :     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
     125           4 :               CallOpClientSendClose> ops;
     126           4 :     ops.SendInitialMetadata(context->send_initial_metadata_);
     127             :     // TODO(ctiller): don't assert
     128           4 :     GPR_ASSERT(ops.SendMessage(request).ok());
     129           4 :     ops.ClientSendClose();
     130           4 :     call_.PerformOps(&ops);
     131           4 :     cq_.Pluck(&ops);
     132           4 :   }
     133             : 
     134           0 :   void WaitForInitialMetadata() {
     135           0 :     GPR_ASSERT(!context_->initial_metadata_received_);
     136             : 
     137           0 :     CallOpSet<CallOpRecvInitialMetadata> ops;
     138           0 :     ops.RecvInitialMetadata(context_);
     139           0 :     call_.PerformOps(&ops);
     140           0 :     cq_.Pluck(&ops);  /// status ignored
     141           0 :   }
     142             : 
     143          14 :   bool Read(R* msg) GRPC_OVERRIDE {
     144          14 :     CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
     145          14 :     if (!context_->initial_metadata_received_) {
     146           4 :       ops.RecvInitialMetadata(context_);
     147             :     }
     148          14 :     ops.RecvMessage(msg);
     149          14 :     call_.PerformOps(&ops);
     150          14 :     return cq_.Pluck(&ops) && ops.got_message;
     151             :   }
     152             : 
     153           4 :   Status Finish() GRPC_OVERRIDE {
     154           4 :     CallOpSet<CallOpClientRecvStatus> ops;
     155           4 :     Status status;
     156           4 :     ops.ClientRecvStatus(context_, &status);
     157           4 :     call_.PerformOps(&ops);
     158           4 :     GPR_ASSERT(cq_.Pluck(&ops));
     159           4 :     return status;
     160             :   }
     161             : 
     162             :  private:
     163             :   ClientContext* context_;
     164             :   CompletionQueue cq_;
     165             :   Call call_;
     166             : };
     167             : 
     168             : /// Client-side interface for streaming writes of message of type \a W.
     169             : template <class W>
     170          16 : class ClientWriterInterface : public ClientStreamingInterface,
     171             :                               public WriterInterface<W> {
     172             :  public:
     173             :   /// Half close writing from the client.
     174             :   /// Block until writes are completed.
     175             :   ///
     176             :   /// \return Whether the writes were successful.
     177             :   virtual bool WritesDone() = 0;
     178             : };
     179             : 
     180             : template <class W>
     181          16 : class ClientWriter : public ClientWriterInterface<W> {
     182             :  public:
     183             :   /// Blocking create a stream.
     184             :   template <class R>
     185           8 :   ClientWriter(Channel* channel, const RpcMethod& method,
     186             :                ClientContext* context, R* response)
     187           8 :       : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
     188           8 :     finish_ops_.RecvMessage(response);
     189             : 
     190           8 :     CallOpSet<CallOpSendInitialMetadata> ops;
     191           8 :     ops.SendInitialMetadata(context->send_initial_metadata_);
     192           8 :     call_.PerformOps(&ops);
     193           8 :     cq_.Pluck(&ops);
     194           8 :   }
     195             : 
     196             :   using WriterInterface<W>::Write;
     197          50 :   bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
     198          50 :     CallOpSet<CallOpSendMessage> ops;
     199          50 :     if (!ops.SendMessage(msg, options).ok()) {
     200           0 :       return false;
     201             :     }
     202          50 :     call_.PerformOps(&ops);
     203          50 :     return cq_.Pluck(&ops);
     204             :   }
     205             : 
     206           6 :   bool WritesDone() GRPC_OVERRIDE {
     207           6 :     CallOpSet<CallOpClientSendClose> ops;
     208           6 :     ops.ClientSendClose();
     209           6 :     call_.PerformOps(&ops);
     210           6 :     return cq_.Pluck(&ops);
     211             :   }
     212             : 
     213             :   /// Read the final response and wait for the final status.
     214           8 :   Status Finish() GRPC_OVERRIDE {
     215           8 :     Status status;
     216           8 :     finish_ops_.ClientRecvStatus(context_, &status);
     217           8 :     call_.PerformOps(&finish_ops_);
     218           8 :     GPR_ASSERT(cq_.Pluck(&finish_ops_));
     219           8 :     return status;
     220             :   }
     221             : 
     222             :  private:
     223             :   ClientContext* context_;
     224             :   CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
     225             :   CompletionQueue cq_;
     226             :   Call call_;
     227             : };
     228             : 
     229             : /// Client-side interface for bi-directional streaming.
     230             : template <class W, class R>
     231          58 : class ClientReaderWriterInterface : public ClientStreamingInterface,
     232             :                                     public WriterInterface<W>,
     233             :                                     public ReaderInterface<R> {
     234             :  public:
     235             :   /// Blocking wait for initial metadata from server. The received metadata
     236             :   /// can only be accessed after this call returns. Should only be called before
     237             :   /// the first read. Calling this method is optional, and if it is not called
     238             :   /// the metadata will be available in ClientContext after the first read.
     239             :   virtual void WaitForInitialMetadata() = 0;
     240             : 
     241             :   /// Block until writes are completed.
     242             :   ///
     243             :   /// \return Whether the writes were successful.
     244             :   virtual bool WritesDone() = 0;
     245             : };
     246             : 
     247             : template <class W, class R>
     248          56 : class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
     249             :  public:
     250             :   /// Blocking create a stream.
     251          28 :   ClientReaderWriter(Channel* channel, const RpcMethod& method,
     252             :                      ClientContext* context)
     253          28 :       : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
     254          28 :     CallOpSet<CallOpSendInitialMetadata> ops;
     255          28 :     ops.SendInitialMetadata(context->send_initial_metadata_);
     256          28 :     call_.PerformOps(&ops);
     257          28 :     cq_.Pluck(&ops);
     258          28 :   }
     259             : 
     260           0 :   void WaitForInitialMetadata() {
     261           0 :     GPR_ASSERT(!context_->initial_metadata_received_);
     262             : 
     263           0 :     CallOpSet<CallOpRecvInitialMetadata> ops;
     264           0 :     ops.RecvInitialMetadata(context_);
     265           0 :     call_.PerformOps(&ops);
     266           0 :     cq_.Pluck(&ops);  // status ignored
     267           0 :   }
     268             : 
     269       81389 :   bool Read(R* msg) GRPC_OVERRIDE {
     270       81389 :     CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
     271       81389 :     if (!context_->initial_metadata_received_) {
     272          27 :       ops.RecvInitialMetadata(context_);
     273             :     }
     274       81389 :     ops.RecvMessage(msg);
     275       81389 :     call_.PerformOps(&ops);
     276       81389 :     return cq_.Pluck(&ops) && ops.got_message;
     277             :   }
     278             : 
     279             :   using WriterInterface<W>::Write;
     280       63209 :   bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
     281       63209 :     CallOpSet<CallOpSendMessage> ops;
     282       63209 :     if (!ops.SendMessage(msg, options).ok()) return false;
     283       63209 :     call_.PerformOps(&ops);
     284       63209 :     return cq_.Pluck(&ops);
     285             :   }
     286             : 
     287          19 :   bool WritesDone() GRPC_OVERRIDE {
     288          19 :     CallOpSet<CallOpClientSendClose> ops;
     289          19 :     ops.ClientSendClose();
     290          19 :     call_.PerformOps(&ops);
     291          19 :     return cq_.Pluck(&ops);
     292             :   }
     293             : 
     294          27 :   Status Finish() GRPC_OVERRIDE {
     295          27 :     CallOpSet<CallOpClientRecvStatus> ops;
     296          27 :     Status status;
     297          27 :     ops.ClientRecvStatus(context_, &status);
     298          27 :     call_.PerformOps(&ops);
     299          27 :     GPR_ASSERT(cq_.Pluck(&ops));
     300          27 :     return status;
     301             :   }
     302             : 
     303             :  private:
     304             :   ClientContext* context_;
     305             :   CompletionQueue cq_;
     306             :   Call call_;
     307             : };
     308             : 
     309             : template <class R>
     310           8 : class ServerReader GRPC_FINAL : public ReaderInterface<R> {
     311             :  public:
     312           8 :   ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
     313             : 
     314             :   void SendInitialMetadata() {
     315             :     GPR_ASSERT(!ctx_->sent_initial_metadata_);
     316             : 
     317             :     CallOpSet<CallOpSendInitialMetadata> ops;
     318             :     ops.SendInitialMetadata(ctx_->initial_metadata_);
     319             :     ctx_->sent_initial_metadata_ = true;
     320             :     call_->PerformOps(&ops);
     321             :     call_->cq()->Pluck(&ops);
     322             :   }
     323             : 
     324          32 :   bool Read(R* msg) GRPC_OVERRIDE {
     325          32 :     CallOpSet<CallOpRecvMessage<R>> ops;
     326          32 :     ops.RecvMessage(msg);
     327          32 :     call_->PerformOps(&ops);
     328          32 :     return call_->cq()->Pluck(&ops) && ops.got_message;
     329             :   }
     330             : 
     331             :  private:
     332             :   Call* const call_;
     333             :   ServerContext* const ctx_;
     334             : };
     335             : 
     336             : template <class W>
     337           5 : class ServerWriter GRPC_FINAL : public WriterInterface<W> {
     338             :  public:
     339           5 :   ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
     340             : 
     341             :   void SendInitialMetadata() {
     342             :     GPR_ASSERT(!ctx_->sent_initial_metadata_);
     343             : 
     344             :     CallOpSet<CallOpSendInitialMetadata> ops;
     345             :     ops.SendInitialMetadata(ctx_->initial_metadata_);
     346             :     ctx_->sent_initial_metadata_ = true;
     347             :     call_->PerformOps(&ops);
     348             :     call_->cq()->Pluck(&ops);
     349             :   }
     350             : 
     351             :   using WriterInterface<W>::Write;
     352          18 :   bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
     353          18 :     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
     354          18 :     if (!ops.SendMessage(msg, options).ok()) {
     355           0 :       return false;
     356             :     }
     357          18 :     if (!ctx_->sent_initial_metadata_) {
     358           5 :       ops.SendInitialMetadata(ctx_->initial_metadata_);
     359           5 :       ctx_->sent_initial_metadata_ = true;
     360             :     }
     361          18 :     call_->PerformOps(&ops);
     362          18 :     return call_->cq()->Pluck(&ops);
     363             :   }
     364             : 
     365             :  private:
     366             :   Call* const call_;
     367             :   ServerContext* const ctx_;
     368             : };
     369             : 
     370             : /// Server-side interface for bi-directional streaming.
     371             : template <class W, class R>
     372          22 : class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
     373             :                                       public ReaderInterface<R> {
     374             :  public:
     375          22 :   ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
     376             : 
     377             :   void SendInitialMetadata() {
     378             :     GPR_ASSERT(!ctx_->sent_initial_metadata_);
     379             : 
     380             :     CallOpSet<CallOpSendInitialMetadata> ops;
     381             :     ops.SendInitialMetadata(ctx_->initial_metadata_);
     382             :     ctx_->sent_initial_metadata_ = true;
     383             :     call_->PerformOps(&ops);
     384             :     call_->cq()->Pluck(&ops);
     385             :   }
     386             : 
     387       63224 :   bool Read(R* msg) GRPC_OVERRIDE {
     388       63224 :     CallOpSet<CallOpRecvMessage<R>> ops;
     389       63224 :     ops.RecvMessage(msg);
     390       63224 :     call_->PerformOps(&ops);
     391       63224 :     return call_->cq()->Pluck(&ops) && ops.got_message;
     392             :   }
     393             : 
     394             :   using WriterInterface<W>::Write;
     395       81376 :   bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
     396       81376 :     CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
     397       81376 :     if (!ops.SendMessage(msg, options).ok()) {
     398           0 :       return false;
     399             :     }
     400       81376 :     if (!ctx_->sent_initial_metadata_) {
     401          20 :       ops.SendInitialMetadata(ctx_->initial_metadata_);
     402          20 :       ctx_->sent_initial_metadata_ = true;
     403             :     }
     404       81376 :     call_->PerformOps(&ops);
     405       81376 :     return call_->cq()->Pluck(&ops);
     406             :   }
     407             : 
     408             :  private:
     409             :   Call* const call_;
     410             :   ServerContext* const ctx_;
     411             : };
     412             : 
     413             : }  // namespace grpc
     414             : 
     415             : #endif  // GRPCXX_SUPPORT_SYNC_STREAM_H

Generated by: LCOV version 1.10