gRPC  0.6.0
 All Classes Namespaces Functions Variables Enumerations Properties Pages
stream.h
1 /*
2  *
3  * Copyright 2015, Google Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  * * Redistributions of source code must retain the above copyright
11  * notice, this list of conditions and the following disclaimer.
12  * * Redistributions in binary form must reproduce the above
13  * copyright notice, this list of conditions and the following disclaimer
14  * in the documentation and/or other materials provided with the
15  * distribution.
16  * * Neither the name of Google Inc. nor the names of its
17  * contributors may be used to endorse or promote products derived from
18  * this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  *
32  */
33 
34 #ifndef GRPCXX_STREAM_H
35 #define GRPCXX_STREAM_H
36 
37 #include <grpc++/channel_interface.h>
38 #include <grpc++/client_context.h>
39 #include <grpc++/completion_queue.h>
40 #include <grpc++/server_context.h>
41 #include <grpc++/impl/call.h>
42 #include <grpc++/impl/service_type.h>
43 #include <grpc++/status.h>
44 #include <grpc/support/log.h>
45 
46 namespace grpc {
47 
48 // Common interface for all client side streaming.
50  public:
51  virtual ~ClientStreamingInterface() {}
52 
53  // Wait until the stream finishes, and return the final status. When the
54  // client side declares it has no more message to send, either implicitly or
55  // by calling WritesDone, it needs to make sure there is no more message to
56  // be received from the server, either implicitly or by getting a false from
57  // a Read(). Otherwise, this implicitly cancels the stream.
58  virtual Status Finish() = 0;
59 };
60 
61 // An interface that yields a sequence of R messages.
62 template <class R>
64  public:
65  virtual ~ReaderInterface() {}
66 
67  // Blocking read a message and parse to msg. Returns true on success.
68  // The method returns false when there will be no more incoming messages,
69  // either because the other side has called WritesDone or the stream has
70  // failed (or been cancelled).
71  virtual bool Read(R* msg) = 0;
72 };
73 
74 // An interface that can be fed a sequence of W messages.
75 template <class W>
77  public:
78  virtual ~WriterInterface() {}
79 
80  // Blocking write msg to the stream. Returns true on success.
81  // Returns false when the stream has been closed.
82  virtual bool Write(const W& msg) = 0;
83 };
84 
85 template <class R>
87  public ReaderInterface<R> {
88  public:
89  virtual void WaitForInitialMetadata() = 0;
90 };
91 
92 template <class R>
94  public:
95  // Blocking create a stream and write the first request out.
96  ClientReader(ChannelInterface* channel, const RpcMethod& method,
97  ClientContext* context, const grpc::protobuf::Message& request)
98  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
99  CallOpBuffer buf;
100  buf.AddSendInitialMetadata(&context->send_initial_metadata_);
101  buf.AddSendMessage(request);
102  buf.AddClientSendClose();
103  call_.PerformOps(&buf);
104  cq_.Pluck(&buf);
105  }
106 
107  // Blocking wait for initial metadata from server. The received metadata
108  // can only be accessed after this call returns. Should only be called before
109  // the first read. Calling this method is optional, and if it is not called
110  // the metadata will be available in ClientContext after the first read.
111  void WaitForInitialMetadata() {
112  GPR_ASSERT(!context_->initial_metadata_received_);
113 
114  CallOpBuffer buf;
115  buf.AddRecvInitialMetadata(context_);
116  call_.PerformOps(&buf);
117  GPR_ASSERT(cq_.Pluck(&buf));
118  }
119 
120  bool Read(R* msg) GRPC_OVERRIDE {
121  CallOpBuffer buf;
122  if (!context_->initial_metadata_received_) {
123  buf.AddRecvInitialMetadata(context_);
124  }
125  buf.AddRecvMessage(msg);
126  call_.PerformOps(&buf);
127  return cq_.Pluck(&buf) && buf.got_message;
128  }
129 
130  Status Finish() GRPC_OVERRIDE {
131  CallOpBuffer buf;
132  Status status;
133  buf.AddClientRecvStatus(context_, &status);
134  call_.PerformOps(&buf);
135  GPR_ASSERT(cq_.Pluck(&buf));
136  return status;
137  }
138 
139  private:
140  ClientContext* context_;
141  CompletionQueue cq_;
142  Call call_;
143 };
144 
145 template <class W>
147  public WriterInterface<W> {
148  public:
149  virtual bool WritesDone() = 0;
150 };
151 
152 template <class W>
154  public:
155  // Blocking create a stream.
156  ClientWriter(ChannelInterface* channel, const RpcMethod& method,
157  ClientContext* context, grpc::protobuf::Message* response)
158  : context_(context),
159  response_(response),
160  call_(channel->CreateCall(method, context, &cq_)) {
161  CallOpBuffer buf;
162  buf.AddSendInitialMetadata(&context->send_initial_metadata_);
163  call_.PerformOps(&buf);
164  cq_.Pluck(&buf);
165  }
166 
167  bool Write(const W& msg) GRPC_OVERRIDE {
168  CallOpBuffer buf;
169  buf.AddSendMessage(msg);
170  call_.PerformOps(&buf);
171  return cq_.Pluck(&buf);
172  }
173 
174  bool WritesDone() GRPC_OVERRIDE {
175  CallOpBuffer buf;
176  buf.AddClientSendClose();
177  call_.PerformOps(&buf);
178  return cq_.Pluck(&buf);
179  }
180 
181  // Read the final response and wait for the final status.
182  Status Finish() GRPC_OVERRIDE {
183  CallOpBuffer buf;
184  Status status;
185  buf.AddRecvMessage(response_);
186  buf.AddClientRecvStatus(context_, &status);
187  call_.PerformOps(&buf);
188  GPR_ASSERT(cq_.Pluck(&buf));
189  return status;
190  }
191 
192  private:
193  ClientContext* context_;
194  grpc::protobuf::Message* const response_;
195  CompletionQueue cq_;
196  Call call_;
197 };
198 
199 // Client-side interface for bi-directional streaming.
200 template <class W, class R>
202  public WriterInterface<W>,
203  public ReaderInterface<R> {
204  public:
205  virtual void WaitForInitialMetadata() = 0;
206  virtual bool WritesDone() = 0;
207 };
208 
209 template <class W, class R>
211  public:
212  // Blocking create a stream.
213  ClientReaderWriter(ChannelInterface* channel, const RpcMethod& method,
214  ClientContext* context)
215  : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
216  CallOpBuffer buf;
217  buf.AddSendInitialMetadata(&context->send_initial_metadata_);
218  call_.PerformOps(&buf);
219  GPR_ASSERT(cq_.Pluck(&buf));
220  }
221 
222  // Blocking wait for initial metadata from server. The received metadata
223  // can only be accessed after this call returns. Should only be called before
224  // the first read. Calling this method is optional, and if it is not called
225  // the metadata will be available in ClientContext after the first read.
226  void WaitForInitialMetadata() {
227  GPR_ASSERT(!context_->initial_metadata_received_);
228 
229  CallOpBuffer buf;
230  buf.AddRecvInitialMetadata(context_);
231  call_.PerformOps(&buf);
232  GPR_ASSERT(cq_.Pluck(&buf));
233  }
234 
235  bool Read(R* msg) GRPC_OVERRIDE {
236  CallOpBuffer buf;
237  if (!context_->initial_metadata_received_) {
238  buf.AddRecvInitialMetadata(context_);
239  }
240  buf.AddRecvMessage(msg);
241  call_.PerformOps(&buf);
242  return cq_.Pluck(&buf) && buf.got_message;
243  }
244 
245  bool Write(const W& msg) GRPC_OVERRIDE {
246  CallOpBuffer buf;
247  buf.AddSendMessage(msg);
248  call_.PerformOps(&buf);
249  return cq_.Pluck(&buf);
250  }
251 
252  bool WritesDone() GRPC_OVERRIDE {
253  CallOpBuffer buf;
254  buf.AddClientSendClose();
255  call_.PerformOps(&buf);
256  return cq_.Pluck(&buf);
257  }
258 
259  Status Finish() GRPC_OVERRIDE {
260  CallOpBuffer buf;
261  Status status;
262  buf.AddClientRecvStatus(context_, &status);
263  call_.PerformOps(&buf);
264  GPR_ASSERT(cq_.Pluck(&buf));
265  return status;
266  }
267 
268  private:
269  ClientContext* context_;
270  CompletionQueue cq_;
271  Call call_;
272 };
273 
274 template <class R>
275 class ServerReader GRPC_FINAL : public ReaderInterface<R> {
276  public:
277  ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
278 
279  void SendInitialMetadata() {
280  GPR_ASSERT(!ctx_->sent_initial_metadata_);
281 
282  CallOpBuffer buf;
283  buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
284  ctx_->sent_initial_metadata_ = true;
285  call_->PerformOps(&buf);
286  call_->cq()->Pluck(&buf);
287  }
288 
289  bool Read(R* msg) GRPC_OVERRIDE {
290  CallOpBuffer buf;
291  buf.AddRecvMessage(msg);
292  call_->PerformOps(&buf);
293  return call_->cq()->Pluck(&buf) && buf.got_message;
294  }
295 
296  private:
297  Call* const call_;
298  ServerContext* const ctx_;
299 };
300 
301 template <class W>
302 class ServerWriter GRPC_FINAL : public WriterInterface<W> {
303  public:
304  ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
305 
306  void SendInitialMetadata() {
307  GPR_ASSERT(!ctx_->sent_initial_metadata_);
308 
309  CallOpBuffer buf;
310  buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
311  ctx_->sent_initial_metadata_ = true;
312  call_->PerformOps(&buf);
313  call_->cq()->Pluck(&buf);
314  }
315 
316  bool Write(const W& msg) GRPC_OVERRIDE {
317  CallOpBuffer buf;
318  if (!ctx_->sent_initial_metadata_) {
319  buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
320  ctx_->sent_initial_metadata_ = true;
321  }
322  buf.AddSendMessage(msg);
323  call_->PerformOps(&buf);
324  return call_->cq()->Pluck(&buf);
325  }
326 
327  private:
328  Call* const call_;
329  ServerContext* const ctx_;
330 };
331 
332 // Server-side interface for bi-directional streaming.
333 template <class W, class R>
334 class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
335  public ReaderInterface<R> {
336  public:
337  ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
338 
339  void SendInitialMetadata() {
340  GPR_ASSERT(!ctx_->sent_initial_metadata_);
341 
342  CallOpBuffer buf;
343  buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
344  ctx_->sent_initial_metadata_ = true;
345  call_->PerformOps(&buf);
346  call_->cq()->Pluck(&buf);
347  }
348 
349  bool Read(R* msg) GRPC_OVERRIDE {
350  CallOpBuffer buf;
351  buf.AddRecvMessage(msg);
352  call_->PerformOps(&buf);
353  return call_->cq()->Pluck(&buf) && buf.got_message;
354  }
355 
356  bool Write(const W& msg) GRPC_OVERRIDE {
357  CallOpBuffer buf;
358  if (!ctx_->sent_initial_metadata_) {
359  buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
360  ctx_->sent_initial_metadata_ = true;
361  }
362  buf.AddSendMessage(msg);
363  call_->PerformOps(&buf);
364  return call_->cq()->Pluck(&buf);
365  }
366 
367  private:
368  Call* const call_;
369  ServerContext* const ctx_;
370 };
371 
372 // Async interfaces
373 // Common interface for all client side streaming.
375  public:
376  virtual ~ClientAsyncStreamingInterface() {}
377 
378  virtual void ReadInitialMetadata(void* tag) = 0;
379 
380  virtual void Finish(Status* status, void* tag) = 0;
381 };
382 
383 // An interface that yields a sequence of R messages.
384 template <class R>
386  public:
387  virtual ~AsyncReaderInterface() {}
388 
389  virtual void Read(R* msg, void* tag) = 0;
390 };
391 
392 // An interface that can be fed a sequence of W messages.
393 template <class W>
395  public:
396  virtual ~AsyncWriterInterface() {}
397 
398  virtual void Write(const W& msg, void* tag) = 0;
399 };
400 
401 template <class R>
403  public AsyncReaderInterface<R> {
404 };
405 
406 template <class R>
408  public:
409  // Create a stream and write the first request out.
411  const RpcMethod& method, ClientContext* context,
412  const grpc::protobuf::Message& request, void* tag)
413  : context_(context), call_(channel->CreateCall(method, context, cq)) {
414  init_buf_.Reset(tag);
415  init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
416  init_buf_.AddSendMessage(request);
417  init_buf_.AddClientSendClose();
418  call_.PerformOps(&init_buf_);
419  }
420 
421  void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
422  GPR_ASSERT(!context_->initial_metadata_received_);
423 
424  meta_buf_.Reset(tag);
425  meta_buf_.AddRecvInitialMetadata(context_);
426  call_.PerformOps(&meta_buf_);
427  }
428 
429  void Read(R* msg, void* tag) GRPC_OVERRIDE {
430  read_buf_.Reset(tag);
431  if (!context_->initial_metadata_received_) {
432  read_buf_.AddRecvInitialMetadata(context_);
433  }
434  read_buf_.AddRecvMessage(msg);
435  call_.PerformOps(&read_buf_);
436  }
437 
438  void Finish(Status* status, void* tag) GRPC_OVERRIDE {
439  finish_buf_.Reset(tag);
440  if (!context_->initial_metadata_received_) {
441  finish_buf_.AddRecvInitialMetadata(context_);
442  }
443  finish_buf_.AddClientRecvStatus(context_, status);
444  call_.PerformOps(&finish_buf_);
445  }
446 
447  private:
448  ClientContext* context_;
449  Call call_;
450  CallOpBuffer init_buf_;
451  CallOpBuffer meta_buf_;
452  CallOpBuffer read_buf_;
453  CallOpBuffer finish_buf_;
454 };
455 
456 template <class W>
458  public AsyncWriterInterface<W> {
459  public:
460  virtual void WritesDone(void* tag) = 0;
461 };
462 
463 template <class W>
465  public:
467  const RpcMethod& method, ClientContext* context,
468  grpc::protobuf::Message* response, void* tag)
469  : context_(context),
470  response_(response),
471  call_(channel->CreateCall(method, context, cq)) {
472  init_buf_.Reset(tag);
473  init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
474  call_.PerformOps(&init_buf_);
475  }
476 
477  void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
478  GPR_ASSERT(!context_->initial_metadata_received_);
479 
480  meta_buf_.Reset(tag);
481  meta_buf_.AddRecvInitialMetadata(context_);
482  call_.PerformOps(&meta_buf_);
483  }
484 
485  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
486  write_buf_.Reset(tag);
487  write_buf_.AddSendMessage(msg);
488  call_.PerformOps(&write_buf_);
489  }
490 
491  void WritesDone(void* tag) GRPC_OVERRIDE {
492  writes_done_buf_.Reset(tag);
493  writes_done_buf_.AddClientSendClose();
494  call_.PerformOps(&writes_done_buf_);
495  }
496 
497  void Finish(Status* status, void* tag) GRPC_OVERRIDE {
498  finish_buf_.Reset(tag);
499  if (!context_->initial_metadata_received_) {
500  finish_buf_.AddRecvInitialMetadata(context_);
501  }
502  finish_buf_.AddRecvMessage(response_);
503  finish_buf_.AddClientRecvStatus(context_, status);
504  call_.PerformOps(&finish_buf_);
505  }
506 
507  private:
508  ClientContext* context_;
509  grpc::protobuf::Message* const response_;
510  Call call_;
511  CallOpBuffer init_buf_;
512  CallOpBuffer meta_buf_;
513  CallOpBuffer write_buf_;
514  CallOpBuffer writes_done_buf_;
515  CallOpBuffer finish_buf_;
516 };
517 
518 // Client-side interface for bi-directional streaming.
519 template <class W, class R>
521  public AsyncWriterInterface<W>,
522  public AsyncReaderInterface<R> {
523  public:
524  virtual void WritesDone(void* tag) = 0;
525 };
526 
527 template <class W, class R>
529  : public ClientAsyncReaderWriterInterface<W, R> {
530  public:
532  const RpcMethod& method, ClientContext* context,
533  void* tag)
534  : context_(context), call_(channel->CreateCall(method, context, cq)) {
535  init_buf_.Reset(tag);
536  init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
537  call_.PerformOps(&init_buf_);
538  }
539 
540  void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
541  GPR_ASSERT(!context_->initial_metadata_received_);
542 
543  meta_buf_.Reset(tag);
544  meta_buf_.AddRecvInitialMetadata(context_);
545  call_.PerformOps(&meta_buf_);
546  }
547 
548  void Read(R* msg, void* tag) GRPC_OVERRIDE {
549  read_buf_.Reset(tag);
550  if (!context_->initial_metadata_received_) {
551  read_buf_.AddRecvInitialMetadata(context_);
552  }
553  read_buf_.AddRecvMessage(msg);
554  call_.PerformOps(&read_buf_);
555  }
556 
557  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
558  write_buf_.Reset(tag);
559  write_buf_.AddSendMessage(msg);
560  call_.PerformOps(&write_buf_);
561  }
562 
563  void WritesDone(void* tag) GRPC_OVERRIDE {
564  writes_done_buf_.Reset(tag);
565  writes_done_buf_.AddClientSendClose();
566  call_.PerformOps(&writes_done_buf_);
567  }
568 
569  void Finish(Status* status, void* tag) GRPC_OVERRIDE {
570  finish_buf_.Reset(tag);
571  if (!context_->initial_metadata_received_) {
572  finish_buf_.AddRecvInitialMetadata(context_);
573  }
574  finish_buf_.AddClientRecvStatus(context_, status);
575  call_.PerformOps(&finish_buf_);
576  }
577 
578  private:
579  ClientContext* context_;
580  Call call_;
581  CallOpBuffer init_buf_;
582  CallOpBuffer meta_buf_;
583  CallOpBuffer read_buf_;
584  CallOpBuffer write_buf_;
585  CallOpBuffer writes_done_buf_;
586  CallOpBuffer finish_buf_;
587 };
588 
589 template <class W, class R>
590 class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
591  public AsyncReaderInterface<R> {
592  public:
593  explicit ServerAsyncReader(ServerContext* ctx)
594  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
595 
596  void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
597  GPR_ASSERT(!ctx_->sent_initial_metadata_);
598 
599  meta_buf_.Reset(tag);
600  meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
601  ctx_->sent_initial_metadata_ = true;
602  call_.PerformOps(&meta_buf_);
603  }
604 
605  void Read(R* msg, void* tag) GRPC_OVERRIDE {
606  read_buf_.Reset(tag);
607  read_buf_.AddRecvMessage(msg);
608  call_.PerformOps(&read_buf_);
609  }
610 
611  void Finish(const W& msg, const Status& status, void* tag) {
612  finish_buf_.Reset(tag);
613  if (!ctx_->sent_initial_metadata_) {
614  finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
615  ctx_->sent_initial_metadata_ = true;
616  }
617  // The response is dropped if the status is not OK.
618  if (status.IsOk()) {
619  finish_buf_.AddSendMessage(msg);
620  }
621  finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
622  call_.PerformOps(&finish_buf_);
623  }
624 
625  void FinishWithError(const Status& status, void* tag) {
626  GPR_ASSERT(!status.IsOk());
627  finish_buf_.Reset(tag);
628  if (!ctx_->sent_initial_metadata_) {
629  finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
630  ctx_->sent_initial_metadata_ = true;
631  }
632  finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
633  call_.PerformOps(&finish_buf_);
634  }
635 
636  private:
637  void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
638 
639  Call call_;
640  ServerContext* ctx_;
641  CallOpBuffer meta_buf_;
642  CallOpBuffer read_buf_;
643  CallOpBuffer finish_buf_;
644 };
645 
646 template <class W>
647 class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
648  public AsyncWriterInterface<W> {
649  public:
650  explicit ServerAsyncWriter(ServerContext* ctx)
651  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
652 
653  void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
654  GPR_ASSERT(!ctx_->sent_initial_metadata_);
655 
656  meta_buf_.Reset(tag);
657  meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
658  ctx_->sent_initial_metadata_ = true;
659  call_.PerformOps(&meta_buf_);
660  }
661 
662  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
663  write_buf_.Reset(tag);
664  if (!ctx_->sent_initial_metadata_) {
665  write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
666  ctx_->sent_initial_metadata_ = true;
667  }
668  write_buf_.AddSendMessage(msg);
669  call_.PerformOps(&write_buf_);
670  }
671 
672  void Finish(const Status& status, void* tag) {
673  finish_buf_.Reset(tag);
674  if (!ctx_->sent_initial_metadata_) {
675  finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
676  ctx_->sent_initial_metadata_ = true;
677  }
678  finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
679  call_.PerformOps(&finish_buf_);
680  }
681 
682  private:
683  void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
684 
685  Call call_;
686  ServerContext* ctx_;
687  CallOpBuffer meta_buf_;
688  CallOpBuffer write_buf_;
689  CallOpBuffer finish_buf_;
690 };
691 
692 // Server-side interface for bi-directional streaming.
693 template <class W, class R>
694 class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
695  public AsyncWriterInterface<W>,
696  public AsyncReaderInterface<R> {
697  public:
698  explicit ServerAsyncReaderWriter(ServerContext* ctx)
699  : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
700 
701  void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
702  GPR_ASSERT(!ctx_->sent_initial_metadata_);
703 
704  meta_buf_.Reset(tag);
705  meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
706  ctx_->sent_initial_metadata_ = true;
707  call_.PerformOps(&meta_buf_);
708  }
709 
710  void Read(R* msg, void* tag) GRPC_OVERRIDE {
711  read_buf_.Reset(tag);
712  read_buf_.AddRecvMessage(msg);
713  call_.PerformOps(&read_buf_);
714  }
715 
716  void Write(const W& msg, void* tag) GRPC_OVERRIDE {
717  write_buf_.Reset(tag);
718  if (!ctx_->sent_initial_metadata_) {
719  write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
720  ctx_->sent_initial_metadata_ = true;
721  }
722  write_buf_.AddSendMessage(msg);
723  call_.PerformOps(&write_buf_);
724  }
725 
726  void Finish(const Status& status, void* tag) {
727  finish_buf_.Reset(tag);
728  if (!ctx_->sent_initial_metadata_) {
729  finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
730  ctx_->sent_initial_metadata_ = true;
731  }
732  finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
733  call_.PerformOps(&finish_buf_);
734  }
735 
736  private:
737  void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
738 
739  Call call_;
740  ServerContext* ctx_;
741  CallOpBuffer meta_buf_;
742  CallOpBuffer read_buf_;
743  CallOpBuffer write_buf_;
744  CallOpBuffer finish_buf_;
745 };
746 
747 } // namespace grpc
748 
749 #endif // GRPCXX_STREAM_H
Definition: client_context.h:60
Definition: stream.h:86
Definition: completion_queue.h:76
Definition: channel_interface.h:52
Definition: client_context.h:62
Definition: _completion_queue.h:40
Definition: stream.h:63
Definition: status.h:42
Definition: stream.h:49
Definition: stream.h:374
Definition: stream.h:76
Definition: channel.h:53
Definition: client_context.h:72
Definition: proto_utils.cc:45
Definition: client_context.h:58
Definition: client_context.h:64
Definition: client_context.h:68
Definition: stream.h:385
Definition: stream.h:146
Definition: stream.h:402
Definition: _call.h:44
Definition: rpc_method.h:39
Definition: stream.h:394
Definition: call.h:53
Definition: stream.h:457
Definition: channel_create.c:62
Definition: client_context.h:66
Definition: stream.h:201