34 #ifndef GRPCXX_STREAM_H
35 #define GRPCXX_STREAM_H
37 #include <grpc++/channel_interface.h>
38 #include <grpc++/client_context.h>
39 #include <grpc++/completion_queue.h>
40 #include <grpc++/server_context.h>
41 #include <grpc++/impl/call.h>
42 #include <grpc++/impl/service_type.h>
43 #include <grpc++/status.h>
44 #include <grpc/support/log.h>
58 virtual Status Finish() = 0;
71 virtual bool Read(R* msg) = 0;
82 virtual bool Write(
const W& msg) = 0;
89 virtual void WaitForInitialMetadata() = 0;
98 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
100 buf.AddSendInitialMetadata(&context->send_initial_metadata_);
101 buf.AddSendMessage(request);
102 buf.AddClientSendClose();
103 call_.PerformOps(&buf);
111 void WaitForInitialMetadata() {
112 GPR_ASSERT(!context_->initial_metadata_received_);
115 buf.AddRecvInitialMetadata(context_);
116 call_.PerformOps(&buf);
117 GPR_ASSERT(cq_.Pluck(&buf));
120 bool Read(R* msg) GRPC_OVERRIDE {
122 if (!context_->initial_metadata_received_) {
123 buf.AddRecvInitialMetadata(context_);
125 buf.AddRecvMessage(msg);
126 call_.PerformOps(&buf);
127 return cq_.Pluck(&buf) && buf.got_message;
130 Status Finish() GRPC_OVERRIDE {
133 buf.AddClientRecvStatus(context_, &status);
134 call_.PerformOps(&buf);
135 GPR_ASSERT(cq_.Pluck(&buf));
140 ClientContext* context_;
149 virtual bool WritesDone() = 0;
160 call_(channel->CreateCall(method, context, &cq_)) {
162 buf.AddSendInitialMetadata(&context->send_initial_metadata_);
163 call_.PerformOps(&buf);
167 bool Write(
const W& msg) GRPC_OVERRIDE {
169 buf.AddSendMessage(msg);
170 call_.PerformOps(&buf);
171 return cq_.Pluck(&buf);
174 bool WritesDone() GRPC_OVERRIDE {
176 buf.AddClientSendClose();
177 call_.PerformOps(&buf);
178 return cq_.Pluck(&buf);
182 Status Finish() GRPC_OVERRIDE {
185 buf.AddRecvMessage(response_);
186 buf.AddClientRecvStatus(context_, &status);
187 call_.PerformOps(&buf);
188 GPR_ASSERT(cq_.Pluck(&buf));
193 ClientContext* context_;
194 grpc::protobuf::Message*
const response_;
200 template <
class W,
class R>
205 virtual void WaitForInitialMetadata() = 0;
206 virtual bool WritesDone() = 0;
209 template <
class W,
class R>
215 : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
217 buf.AddSendInitialMetadata(&context->send_initial_metadata_);
218 call_.PerformOps(&buf);
219 GPR_ASSERT(cq_.Pluck(&buf));
226 void WaitForInitialMetadata() {
227 GPR_ASSERT(!context_->initial_metadata_received_);
230 buf.AddRecvInitialMetadata(context_);
231 call_.PerformOps(&buf);
232 GPR_ASSERT(cq_.Pluck(&buf));
235 bool Read(R* msg) GRPC_OVERRIDE {
237 if (!context_->initial_metadata_received_) {
238 buf.AddRecvInitialMetadata(context_);
240 buf.AddRecvMessage(msg);
241 call_.PerformOps(&buf);
242 return cq_.Pluck(&buf) && buf.got_message;
245 bool Write(
const W& msg) GRPC_OVERRIDE {
247 buf.AddSendMessage(msg);
248 call_.PerformOps(&buf);
249 return cq_.Pluck(&buf);
252 bool WritesDone() GRPC_OVERRIDE {
254 buf.AddClientSendClose();
255 call_.PerformOps(&buf);
256 return cq_.Pluck(&buf);
259 Status Finish() GRPC_OVERRIDE {
262 buf.AddClientRecvStatus(context_, &status);
263 call_.PerformOps(&buf);
264 GPR_ASSERT(cq_.Pluck(&buf));
269 ClientContext* context_;
275 class ServerReader
GRPC_FINAL :
public ReaderInterface<R> {
277 ServerReader(
Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
279 void SendInitialMetadata() {
280 GPR_ASSERT(!ctx_->sent_initial_metadata_);
283 buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
284 ctx_->sent_initial_metadata_ =
true;
285 call_->PerformOps(&buf);
286 call_->cq()->Pluck(&buf);
289 bool Read(R* msg) GRPC_OVERRIDE {
291 buf.AddRecvMessage(msg);
292 call_->PerformOps(&buf);
293 return call_->cq()->Pluck(&buf) && buf.got_message;
298 ServerContext*
const ctx_;
302 class ServerWriter
GRPC_FINAL :
public WriterInterface<W> {
304 ServerWriter(
Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
306 void SendInitialMetadata() {
307 GPR_ASSERT(!ctx_->sent_initial_metadata_);
310 buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
311 ctx_->sent_initial_metadata_ =
true;
312 call_->PerformOps(&buf);
313 call_->cq()->Pluck(&buf);
316 bool Write(
const W& msg) GRPC_OVERRIDE {
318 if (!ctx_->sent_initial_metadata_) {
319 buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
320 ctx_->sent_initial_metadata_ =
true;
322 buf.AddSendMessage(msg);
323 call_->PerformOps(&buf);
324 return call_->cq()->Pluck(&buf);
329 ServerContext*
const ctx_;
333 template <
class W,
class R>
334 class ServerReaderWriter
GRPC_FINAL :
public WriterInterface<W>,
335 public ReaderInterface<R> {
337 ServerReaderWriter(
Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
339 void SendInitialMetadata() {
340 GPR_ASSERT(!ctx_->sent_initial_metadata_);
343 buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
344 ctx_->sent_initial_metadata_ =
true;
345 call_->PerformOps(&buf);
346 call_->cq()->Pluck(&buf);
349 bool Read(R* msg) GRPC_OVERRIDE {
351 buf.AddRecvMessage(msg);
352 call_->PerformOps(&buf);
353 return call_->cq()->Pluck(&buf) && buf.got_message;
356 bool Write(
const W& msg) GRPC_OVERRIDE {
358 if (!ctx_->sent_initial_metadata_) {
359 buf.AddSendInitialMetadata(&ctx_->initial_metadata_);
360 ctx_->sent_initial_metadata_ =
true;
362 buf.AddSendMessage(msg);
363 call_->PerformOps(&buf);
364 return call_->cq()->Pluck(&buf);
369 ServerContext*
const ctx_;
378 virtual void ReadInitialMetadata(
void* tag) = 0;
380 virtual void Finish(
Status* status,
void* tag) = 0;
389 virtual void Read(R* msg,
void* tag) = 0;
398 virtual void Write(
const W& msg,
void* tag) = 0;
412 const grpc::protobuf::Message& request,
void* tag)
413 : context_(context), call_(channel->CreateCall(method, context, cq)) {
414 init_buf_.Reset(tag);
415 init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
416 init_buf_.AddSendMessage(request);
417 init_buf_.AddClientSendClose();
418 call_.PerformOps(&init_buf_);
421 void ReadInitialMetadata(
void* tag) GRPC_OVERRIDE {
422 GPR_ASSERT(!context_->initial_metadata_received_);
424 meta_buf_.Reset(tag);
425 meta_buf_.AddRecvInitialMetadata(context_);
426 call_.PerformOps(&meta_buf_);
429 void Read(R* msg,
void* tag) GRPC_OVERRIDE {
430 read_buf_.Reset(tag);
431 if (!context_->initial_metadata_received_) {
432 read_buf_.AddRecvInitialMetadata(context_);
434 read_buf_.AddRecvMessage(msg);
435 call_.PerformOps(&read_buf_);
438 void Finish(Status* status,
void* tag) GRPC_OVERRIDE {
439 finish_buf_.Reset(tag);
440 if (!context_->initial_metadata_received_) {
441 finish_buf_.AddRecvInitialMetadata(context_);
443 finish_buf_.AddClientRecvStatus(context_, status);
444 call_.PerformOps(&finish_buf_);
448 ClientContext* context_;
450 CallOpBuffer init_buf_;
451 CallOpBuffer meta_buf_;
452 CallOpBuffer read_buf_;
453 CallOpBuffer finish_buf_;
460 virtual void WritesDone(
void* tag) = 0;
468 grpc::protobuf::Message* response,
void* tag)
471 call_(channel->CreateCall(method, context, cq)) {
472 init_buf_.Reset(tag);
473 init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
474 call_.PerformOps(&init_buf_);
477 void ReadInitialMetadata(
void* tag) GRPC_OVERRIDE {
478 GPR_ASSERT(!context_->initial_metadata_received_);
480 meta_buf_.Reset(tag);
481 meta_buf_.AddRecvInitialMetadata(context_);
482 call_.PerformOps(&meta_buf_);
485 void Write(
const W& msg,
void* tag) GRPC_OVERRIDE {
486 write_buf_.Reset(tag);
487 write_buf_.AddSendMessage(msg);
488 call_.PerformOps(&write_buf_);
491 void WritesDone(
void* tag) GRPC_OVERRIDE {
492 writes_done_buf_.Reset(tag);
493 writes_done_buf_.AddClientSendClose();
494 call_.PerformOps(&writes_done_buf_);
497 void Finish(Status* status,
void* tag) GRPC_OVERRIDE {
498 finish_buf_.Reset(tag);
499 if (!context_->initial_metadata_received_) {
500 finish_buf_.AddRecvInitialMetadata(context_);
502 finish_buf_.AddRecvMessage(response_);
503 finish_buf_.AddClientRecvStatus(context_, status);
504 call_.PerformOps(&finish_buf_);
508 ClientContext* context_;
509 grpc::protobuf::Message*
const response_;
511 CallOpBuffer init_buf_;
512 CallOpBuffer meta_buf_;
513 CallOpBuffer write_buf_;
514 CallOpBuffer writes_done_buf_;
515 CallOpBuffer finish_buf_;
519 template <
class W,
class R>
524 virtual void WritesDone(
void* tag) = 0;
527 template <
class W,
class R>
534 : context_(context), call_(channel->CreateCall(method, context, cq)) {
535 init_buf_.Reset(tag);
536 init_buf_.AddSendInitialMetadata(&context->send_initial_metadata_);
537 call_.PerformOps(&init_buf_);
540 void ReadInitialMetadata(
void* tag) GRPC_OVERRIDE {
541 GPR_ASSERT(!context_->initial_metadata_received_);
543 meta_buf_.Reset(tag);
544 meta_buf_.AddRecvInitialMetadata(context_);
545 call_.PerformOps(&meta_buf_);
548 void Read(R* msg,
void* tag) GRPC_OVERRIDE {
549 read_buf_.Reset(tag);
550 if (!context_->initial_metadata_received_) {
551 read_buf_.AddRecvInitialMetadata(context_);
553 read_buf_.AddRecvMessage(msg);
554 call_.PerformOps(&read_buf_);
557 void Write(
const W& msg,
void* tag) GRPC_OVERRIDE {
558 write_buf_.Reset(tag);
559 write_buf_.AddSendMessage(msg);
560 call_.PerformOps(&write_buf_);
563 void WritesDone(
void* tag) GRPC_OVERRIDE {
564 writes_done_buf_.Reset(tag);
565 writes_done_buf_.AddClientSendClose();
566 call_.PerformOps(&writes_done_buf_);
569 void Finish(Status* status,
void* tag) GRPC_OVERRIDE {
570 finish_buf_.Reset(tag);
571 if (!context_->initial_metadata_received_) {
572 finish_buf_.AddRecvInitialMetadata(context_);
574 finish_buf_.AddClientRecvStatus(context_, status);
575 call_.PerformOps(&finish_buf_);
579 ClientContext* context_;
581 CallOpBuffer init_buf_;
582 CallOpBuffer meta_buf_;
583 CallOpBuffer read_buf_;
584 CallOpBuffer write_buf_;
585 CallOpBuffer writes_done_buf_;
586 CallOpBuffer finish_buf_;
589 template <
class W,
class R>
590 class ServerAsyncReader
GRPC_FINAL :
public ServerAsyncStreamingInterface,
591 public AsyncReaderInterface<R> {
593 explicit ServerAsyncReader(ServerContext* ctx)
594 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
596 void SendInitialMetadata(
void* tag) GRPC_OVERRIDE {
597 GPR_ASSERT(!ctx_->sent_initial_metadata_);
599 meta_buf_.Reset(tag);
600 meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
601 ctx_->sent_initial_metadata_ =
true;
602 call_.PerformOps(&meta_buf_);
605 void Read(R* msg,
void* tag) GRPC_OVERRIDE {
606 read_buf_.Reset(tag);
607 read_buf_.AddRecvMessage(msg);
608 call_.PerformOps(&read_buf_);
611 void Finish(
const W& msg,
const Status& status,
void* tag) {
612 finish_buf_.Reset(tag);
613 if (!ctx_->sent_initial_metadata_) {
614 finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
615 ctx_->sent_initial_metadata_ =
true;
619 finish_buf_.AddSendMessage(msg);
621 finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
622 call_.PerformOps(&finish_buf_);
625 void FinishWithError(
const Status& status,
void* tag) {
626 GPR_ASSERT(!status.IsOk());
627 finish_buf_.Reset(tag);
628 if (!ctx_->sent_initial_metadata_) {
629 finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
630 ctx_->sent_initial_metadata_ =
true;
632 finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
633 call_.PerformOps(&finish_buf_);
637 void BindCall(
Call* call) GRPC_OVERRIDE { call_ = *call; }
641 CallOpBuffer meta_buf_;
642 CallOpBuffer read_buf_;
643 CallOpBuffer finish_buf_;
647 class ServerAsyncWriter
GRPC_FINAL :
public ServerAsyncStreamingInterface,
648 public AsyncWriterInterface<W> {
650 explicit ServerAsyncWriter(ServerContext* ctx)
651 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
653 void SendInitialMetadata(
void* tag) GRPC_OVERRIDE {
654 GPR_ASSERT(!ctx_->sent_initial_metadata_);
656 meta_buf_.Reset(tag);
657 meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
658 ctx_->sent_initial_metadata_ =
true;
659 call_.PerformOps(&meta_buf_);
662 void Write(
const W& msg,
void* tag) GRPC_OVERRIDE {
663 write_buf_.Reset(tag);
664 if (!ctx_->sent_initial_metadata_) {
665 write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
666 ctx_->sent_initial_metadata_ =
true;
668 write_buf_.AddSendMessage(msg);
669 call_.PerformOps(&write_buf_);
672 void Finish(
const Status& status,
void* tag) {
673 finish_buf_.Reset(tag);
674 if (!ctx_->sent_initial_metadata_) {
675 finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
676 ctx_->sent_initial_metadata_ =
true;
678 finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
679 call_.PerformOps(&finish_buf_);
683 void BindCall(
Call* call) GRPC_OVERRIDE { call_ = *call; }
687 CallOpBuffer meta_buf_;
688 CallOpBuffer write_buf_;
689 CallOpBuffer finish_buf_;
693 template <
class W,
class R>
694 class ServerAsyncReaderWriter
GRPC_FINAL :
public ServerAsyncStreamingInterface,
695 public AsyncWriterInterface<W>,
696 public AsyncReaderInterface<R> {
698 explicit ServerAsyncReaderWriter(ServerContext* ctx)
699 : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
701 void SendInitialMetadata(
void* tag) GRPC_OVERRIDE {
702 GPR_ASSERT(!ctx_->sent_initial_metadata_);
704 meta_buf_.Reset(tag);
705 meta_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
706 ctx_->sent_initial_metadata_ =
true;
707 call_.PerformOps(&meta_buf_);
710 void Read(R* msg,
void* tag) GRPC_OVERRIDE {
711 read_buf_.Reset(tag);
712 read_buf_.AddRecvMessage(msg);
713 call_.PerformOps(&read_buf_);
716 void Write(
const W& msg,
void* tag) GRPC_OVERRIDE {
717 write_buf_.Reset(tag);
718 if (!ctx_->sent_initial_metadata_) {
719 write_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
720 ctx_->sent_initial_metadata_ =
true;
722 write_buf_.AddSendMessage(msg);
723 call_.PerformOps(&write_buf_);
726 void Finish(
const Status& status,
void* tag) {
727 finish_buf_.Reset(tag);
728 if (!ctx_->sent_initial_metadata_) {
729 finish_buf_.AddSendInitialMetadata(&ctx_->initial_metadata_);
730 ctx_->sent_initial_metadata_ =
true;
732 finish_buf_.AddServerSendStatus(&ctx_->trailing_metadata_, status);
733 call_.PerformOps(&finish_buf_);
737 void BindCall(
Call* call) GRPC_OVERRIDE { call_ = *call; }
741 CallOpBuffer meta_buf_;
742 CallOpBuffer read_buf_;
743 CallOpBuffer write_buf_;
744 CallOpBuffer finish_buf_;
749 #endif // GRPCXX_STREAM_H
Definition: client_context.h:60
Definition: completion_queue.h:76
Definition: channel_interface.h:52
Definition: client_context.h:62
Definition: _completion_queue.h:40
Definition: client_context.h:72
Definition: proto_utils.cc:45
Definition: client_context.h:58
Definition: client_context.h:64
Definition: client_context.h:68
Definition: rpc_method.h:39
Definition: channel_create.c:62
Definition: client_context.h:66