Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #ifndef GRPCXX_SUPPORT_ASYNC_STREAM_H
35 : #define GRPCXX_SUPPORT_ASYNC_STREAM_H
36 :
37 : #include <grpc/support/log.h>
38 : #include <grpc++/channel.h>
39 : #include <grpc++/client_context.h>
40 : #include <grpc++/completion_queue.h>
41 : #include <grpc++/impl/call.h>
42 : #include <grpc++/impl/service_type.h>
43 : #include <grpc++/server_context.h>
44 : #include <grpc++/support/status.h>
45 :
46 : namespace grpc {
47 :
48 : /// Common interface for all client side asynchronous streaming.
49 20 : class ClientAsyncStreamingInterface {
50 : public:
51 20 : virtual ~ClientAsyncStreamingInterface() {}
52 :
53 : /// Request notification of the reading of the initial metadata. Completion
54 : /// will be notified by \a tag on the associated completion queue.
55 : ///
56 : /// \param[in] tag Tag identifying this request.
57 : virtual void ReadInitialMetadata(void* tag) = 0;
58 :
59 : /// Request notification completion.
60 : ///
61 : /// \param[out] status To be updated with the operation status.
62 : /// \param[in] tag Tag identifying this request.
63 : virtual void Finish(Status* status, void* tag) = 0;
64 : };
65 :
66 : /// An interface that yields a sequence of messages of type \a R.
67 : template <class R>
68 90437 : class AsyncReaderInterface {
69 : public:
70 64716 : virtual ~AsyncReaderInterface() {}
71 :
72 : /// Read a message of type \a R into \a msg. Completion will be notified by \a
73 : /// tag on the associated completion queue.
74 : ///
75 : /// \param[out] msg Where to eventually store the read message.
76 : /// \param[in] tag The tag identifying the operation.
77 : virtual void Read(R* msg, void* tag) = 0;
78 : };
79 :
80 : /// An interface that can be fed a sequence of messages of type \a W.
81 : template <class W>
82 90435 : class AsyncWriterInterface {
83 : public:
84 64795 : virtual ~AsyncWriterInterface() {}
85 :
86 : /// Request the writing of \a msg with identifying tag \a tag.
87 : ///
88 : /// \param[in] msg The message to be written.
89 : /// \param[in] tag The tag identifying the operation.
90 : virtual void Write(const W& msg, void* tag) = 0;
91 : };
92 :
93 : template <class R>
94 4 : class ClientAsyncReaderInterface : public ClientAsyncStreamingInterface,
95 : public AsyncReaderInterface<R> {};
96 :
97 : template <class R>
98 4 : class ClientAsyncReader GRPC_FINAL : public ClientAsyncReaderInterface<R> {
99 : public:
100 : /// Create a stream and write the first request out.
101 : template <class W>
102 2 : ClientAsyncReader(Channel* channel, CompletionQueue* cq,
103 : const RpcMethod& method, ClientContext* context,
104 : const W& request, void* tag)
105 2 : : context_(context), call_(channel->CreateCall(method, context, cq)) {
106 2 : init_ops_.set_output_tag(tag);
107 2 : init_ops_.SendInitialMetadata(context->send_initial_metadata_);
108 : // TODO(ctiller): don't assert
109 2 : GPR_ASSERT(init_ops_.SendMessage(request).ok());
110 2 : init_ops_.ClientSendClose();
111 2 : call_.PerformOps(&init_ops_);
112 2 : }
113 :
114 0 : void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
115 0 : GPR_ASSERT(!context_->initial_metadata_received_);
116 :
117 0 : meta_ops_.set_output_tag(tag);
118 0 : meta_ops_.RecvInitialMetadata(context_);
119 0 : call_.PerformOps(&meta_ops_);
120 0 : }
121 :
122 6 : void Read(R* msg, void* tag) GRPC_OVERRIDE {
123 6 : read_ops_.set_output_tag(tag);
124 6 : if (!context_->initial_metadata_received_) {
125 2 : read_ops_.RecvInitialMetadata(context_);
126 : }
127 6 : read_ops_.RecvMessage(msg);
128 6 : call_.PerformOps(&read_ops_);
129 6 : }
130 :
131 2 : void Finish(Status* status, void* tag) GRPC_OVERRIDE {
132 2 : finish_ops_.set_output_tag(tag);
133 2 : if (!context_->initial_metadata_received_) {
134 0 : finish_ops_.RecvInitialMetadata(context_);
135 : }
136 2 : finish_ops_.ClientRecvStatus(context_, status);
137 2 : call_.PerformOps(&finish_ops_);
138 2 : }
139 :
140 : private:
141 : ClientContext* context_;
142 : Call call_;
143 : CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage, CallOpClientSendClose>
144 : init_ops_;
145 : CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
146 : CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
147 : CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
148 : };
149 :
150 : /// Common interface for client side asynchronous writing.
151 : template <class W>
152 4 : class ClientAsyncWriterInterface : public ClientAsyncStreamingInterface,
153 : public AsyncWriterInterface<W> {
154 : public:
155 : /// Signal the client is done with the writes.
156 : ///
157 : /// \param[in] tag The tag identifying the operation.
158 : virtual void WritesDone(void* tag) = 0;
159 : };
160 :
161 : template <class W>
162 4 : class ClientAsyncWriter GRPC_FINAL : public ClientAsyncWriterInterface<W> {
163 : public:
164 : template <class R>
165 2 : ClientAsyncWriter(Channel* channel, CompletionQueue* cq,
166 : const RpcMethod& method, ClientContext* context,
167 : R* response, void* tag)
168 2 : : context_(context), call_(channel->CreateCall(method, context, cq)) {
169 2 : finish_ops_.RecvMessage(response);
170 :
171 2 : init_ops_.set_output_tag(tag);
172 2 : init_ops_.SendInitialMetadata(context->send_initial_metadata_);
173 2 : call_.PerformOps(&init_ops_);
174 2 : }
175 :
176 0 : void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
177 0 : GPR_ASSERT(!context_->initial_metadata_received_);
178 :
179 0 : meta_ops_.set_output_tag(tag);
180 0 : meta_ops_.RecvInitialMetadata(context_);
181 0 : call_.PerformOps(&meta_ops_);
182 0 : }
183 :
184 4 : void Write(const W& msg, void* tag) GRPC_OVERRIDE {
185 4 : write_ops_.set_output_tag(tag);
186 : // TODO(ctiller): don't assert
187 4 : GPR_ASSERT(write_ops_.SendMessage(msg).ok());
188 4 : call_.PerformOps(&write_ops_);
189 4 : }
190 :
191 2 : void WritesDone(void* tag) GRPC_OVERRIDE {
192 2 : writes_done_ops_.set_output_tag(tag);
193 2 : writes_done_ops_.ClientSendClose();
194 2 : call_.PerformOps(&writes_done_ops_);
195 2 : }
196 :
197 2 : void Finish(Status* status, void* tag) GRPC_OVERRIDE {
198 2 : finish_ops_.set_output_tag(tag);
199 2 : if (!context_->initial_metadata_received_) {
200 2 : finish_ops_.RecvInitialMetadata(context_);
201 : }
202 2 : finish_ops_.ClientRecvStatus(context_, status);
203 2 : call_.PerformOps(&finish_ops_);
204 2 : }
205 :
206 : private:
207 : ClientContext* context_;
208 : Call call_;
209 : CallOpSet<CallOpSendInitialMetadata> init_ops_;
210 : CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
211 : CallOpSet<CallOpSendMessage> write_ops_;
212 : CallOpSet<CallOpClientSendClose> writes_done_ops_;
213 : CallOpSet<CallOpRecvInitialMetadata, CallOpGenericRecvMessage,
214 : CallOpClientRecvStatus> finish_ops_;
215 : };
216 :
217 : /// Client-side interface for asynchronous bi-directional streaming.
218 : template <class W, class R>
219 32 : class ClientAsyncReaderWriterInterface : public ClientAsyncStreamingInterface,
220 : public AsyncWriterInterface<W>,
221 : public AsyncReaderInterface<R> {
222 : public:
223 : /// Signal the client is done with the writes.
224 : ///
225 : /// \param[in] tag The tag identifying the operation.
226 : virtual void WritesDone(void* tag) = 0;
227 : };
228 :
229 : template <class W, class R>
230 32 : class ClientAsyncReaderWriter GRPC_FINAL
231 : : public ClientAsyncReaderWriterInterface<W, R> {
232 : public:
233 16 : ClientAsyncReaderWriter(Channel* channel, CompletionQueue* cq,
234 : const RpcMethod& method, ClientContext* context,
235 : void* tag)
236 16 : : context_(context), call_(channel->CreateCall(method, context, cq)) {
237 16 : init_ops_.set_output_tag(tag);
238 16 : init_ops_.SendInitialMetadata(context->send_initial_metadata_);
239 16 : call_.PerformOps(&init_ops_);
240 16 : }
241 :
242 0 : void ReadInitialMetadata(void* tag) GRPC_OVERRIDE {
243 0 : GPR_ASSERT(!context_->initial_metadata_received_);
244 :
245 0 : meta_ops_.set_output_tag(tag);
246 0 : meta_ops_.RecvInitialMetadata(context_);
247 0 : call_.PerformOps(&meta_ops_);
248 0 : }
249 :
250 26539 : void Read(R* msg, void* tag) GRPC_OVERRIDE {
251 26539 : read_ops_.set_output_tag(tag);
252 26539 : if (!context_->initial_metadata_received_) {
253 16 : read_ops_.RecvInitialMetadata(context_);
254 : }
255 26539 : read_ops_.RecvMessage(msg);
256 26539 : call_.PerformOps(&read_ops_);
257 26539 : }
258 :
259 26540 : void Write(const W& msg, void* tag) GRPC_OVERRIDE {
260 26540 : write_ops_.set_output_tag(tag);
261 : // TODO(ctiller): don't assert
262 26540 : GPR_ASSERT(write_ops_.SendMessage(msg).ok());
263 26540 : call_.PerformOps(&write_ops_);
264 26540 : }
265 :
266 15 : void WritesDone(void* tag) GRPC_OVERRIDE {
267 15 : writes_done_ops_.set_output_tag(tag);
268 15 : writes_done_ops_.ClientSendClose();
269 15 : call_.PerformOps(&writes_done_ops_);
270 15 : }
271 :
272 15 : void Finish(Status* status, void* tag) GRPC_OVERRIDE {
273 15 : finish_ops_.set_output_tag(tag);
274 15 : if (!context_->initial_metadata_received_) {
275 0 : finish_ops_.RecvInitialMetadata(context_);
276 : }
277 15 : finish_ops_.ClientRecvStatus(context_, status);
278 15 : call_.PerformOps(&finish_ops_);
279 15 : }
280 :
281 : private:
282 : ClientContext* context_;
283 : Call call_;
284 : CallOpSet<CallOpSendInitialMetadata> init_ops_;
285 : CallOpSet<CallOpRecvInitialMetadata> meta_ops_;
286 : CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> read_ops_;
287 : CallOpSet<CallOpSendMessage> write_ops_;
288 : CallOpSet<CallOpClientSendClose> writes_done_ops_;
289 : CallOpSet<CallOpRecvInitialMetadata, CallOpClientRecvStatus> finish_ops_;
290 : };
291 :
292 : template <class W, class R>
293 2 : class ServerAsyncReader GRPC_FINAL : public ServerAsyncStreamingInterface,
294 : public AsyncReaderInterface<R> {
295 : public:
296 2 : explicit ServerAsyncReader(ServerContext* ctx)
297 2 : : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
298 :
299 0 : void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
300 0 : GPR_ASSERT(!ctx_->sent_initial_metadata_);
301 :
302 0 : meta_ops_.set_output_tag(tag);
303 0 : meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
304 0 : ctx_->sent_initial_metadata_ = true;
305 0 : call_.PerformOps(&meta_ops_);
306 0 : }
307 :
308 6 : void Read(R* msg, void* tag) GRPC_OVERRIDE {
309 6 : read_ops_.set_output_tag(tag);
310 6 : read_ops_.RecvMessage(msg);
311 6 : call_.PerformOps(&read_ops_);
312 6 : }
313 :
314 2 : void Finish(const W& msg, const Status& status, void* tag) {
315 2 : finish_ops_.set_output_tag(tag);
316 2 : if (!ctx_->sent_initial_metadata_) {
317 2 : finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
318 2 : ctx_->sent_initial_metadata_ = true;
319 : }
320 : // The response is dropped if the status is not OK.
321 2 : if (status.ok()) {
322 2 : finish_ops_.ServerSendStatus(ctx_->trailing_metadata_,
323 : finish_ops_.SendMessage(msg));
324 : } else {
325 0 : finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
326 : }
327 2 : call_.PerformOps(&finish_ops_);
328 2 : }
329 :
330 : void FinishWithError(const Status& status, void* tag) {
331 : GPR_ASSERT(!status.ok());
332 : finish_ops_.set_output_tag(tag);
333 : if (!ctx_->sent_initial_metadata_) {
334 : finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
335 : ctx_->sent_initial_metadata_ = true;
336 : }
337 : finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
338 : call_.PerformOps(&finish_ops_);
339 : }
340 :
341 : private:
342 2 : void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
343 :
344 : Call call_;
345 : ServerContext* ctx_;
346 : CallOpSet<CallOpSendInitialMetadata> meta_ops_;
347 : CallOpSet<CallOpRecvMessage<R>> read_ops_;
348 : CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
349 : CallOpServerSendStatus> finish_ops_;
350 : };
351 :
352 : template <class W>
353 2 : class ServerAsyncWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
354 : public AsyncWriterInterface<W> {
355 : public:
356 2 : explicit ServerAsyncWriter(ServerContext* ctx)
357 2 : : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
358 :
359 0 : void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
360 0 : GPR_ASSERT(!ctx_->sent_initial_metadata_);
361 :
362 0 : meta_ops_.set_output_tag(tag);
363 0 : meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
364 0 : ctx_->sent_initial_metadata_ = true;
365 0 : call_.PerformOps(&meta_ops_);
366 0 : }
367 :
368 4 : void Write(const W& msg, void* tag) GRPC_OVERRIDE {
369 4 : write_ops_.set_output_tag(tag);
370 4 : if (!ctx_->sent_initial_metadata_) {
371 2 : write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
372 2 : ctx_->sent_initial_metadata_ = true;
373 : }
374 : // TODO(ctiller): don't assert
375 4 : GPR_ASSERT(write_ops_.SendMessage(msg).ok());
376 4 : call_.PerformOps(&write_ops_);
377 4 : }
378 :
379 2 : void Finish(const Status& status, void* tag) {
380 2 : finish_ops_.set_output_tag(tag);
381 2 : if (!ctx_->sent_initial_metadata_) {
382 0 : finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
383 0 : ctx_->sent_initial_metadata_ = true;
384 : }
385 2 : finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
386 2 : call_.PerformOps(&finish_ops_);
387 2 : }
388 :
389 : private:
390 2 : void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
391 :
392 : Call call_;
393 : ServerContext* ctx_;
394 : CallOpSet<CallOpSendInitialMetadata> meta_ops_;
395 : CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
396 : CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
397 : };
398 :
399 : /// Server-side interface for asynchronous bi-directional streaming.
400 : template <class W, class R>
401 90276 : class ServerAsyncReaderWriter GRPC_FINAL : public ServerAsyncStreamingInterface,
402 : public AsyncWriterInterface<W>,
403 : public AsyncReaderInterface<R> {
404 : public:
405 65247 : explicit ServerAsyncReaderWriter(ServerContext* ctx)
406 65247 : : call_(nullptr, nullptr, nullptr), ctx_(ctx) {}
407 :
408 0 : void SendInitialMetadata(void* tag) GRPC_OVERRIDE {
409 0 : GPR_ASSERT(!ctx_->sent_initial_metadata_);
410 :
411 0 : meta_ops_.set_output_tag(tag);
412 0 : meta_ops_.SendInitialMetadata(ctx_->initial_metadata_);
413 0 : ctx_->sent_initial_metadata_ = true;
414 0 : call_.PerformOps(&meta_ops_);
415 0 : }
416 :
417 26543 : void Read(R* msg, void* tag) GRPC_OVERRIDE {
418 26543 : read_ops_.set_output_tag(tag);
419 26543 : read_ops_.RecvMessage(msg);
420 26543 : call_.PerformOps(&read_ops_);
421 26543 : }
422 :
423 26539 : void Write(const W& msg, void* tag) GRPC_OVERRIDE {
424 26539 : write_ops_.set_output_tag(tag);
425 26539 : if (!ctx_->sent_initial_metadata_) {
426 15 : write_ops_.SendInitialMetadata(ctx_->initial_metadata_);
427 15 : ctx_->sent_initial_metadata_ = true;
428 : }
429 : // TODO(ctiller): don't assert
430 26539 : GPR_ASSERT(write_ops_.SendMessage(msg).ok());
431 26539 : call_.PerformOps(&write_ops_);
432 26539 : }
433 :
434 14 : void Finish(const Status& status, void* tag) {
435 14 : finish_ops_.set_output_tag(tag);
436 14 : if (!ctx_->sent_initial_metadata_) {
437 0 : finish_ops_.SendInitialMetadata(ctx_->initial_metadata_);
438 0 : ctx_->sent_initial_metadata_ = true;
439 : }
440 14 : finish_ops_.ServerSendStatus(ctx_->trailing_metadata_, status);
441 14 : call_.PerformOps(&finish_ops_);
442 14 : }
443 :
444 : private:
445 : friend class ::grpc::Server;
446 :
447 64636 : void BindCall(Call* call) GRPC_OVERRIDE { call_ = *call; }
448 :
449 : Call call_;
450 : ServerContext* ctx_;
451 : CallOpSet<CallOpSendInitialMetadata> meta_ops_;
452 : CallOpSet<CallOpRecvMessage<R>> read_ops_;
453 : CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> write_ops_;
454 : CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> finish_ops_;
455 : };
456 :
457 : } // namespace grpc
458 :
459 : #endif // GRPCXX_SUPPORT_ASYNC_STREAM_H
|