Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #ifndef GRPCXX_SUPPORT_SYNC_STREAM_H
35 : #define GRPCXX_SUPPORT_SYNC_STREAM_H
36 :
37 : #include <grpc/support/log.h>
38 : #include <grpc++/channel.h>
39 : #include <grpc++/client_context.h>
40 : #include <grpc++/completion_queue.h>
41 : #include <grpc++/impl/call.h>
42 : #include <grpc++/impl/service_type.h>
43 : #include <grpc++/server_context.h>
44 : #include <grpc++/support/status.h>
45 :
46 : namespace grpc {
47 :
48 : /// Common interface for all synchronous client side streaming.
49 41 : class ClientStreamingInterface {
50 : public:
51 41 : virtual ~ClientStreamingInterface() {}
52 :
53 : /// Wait until the stream finishes, and return the final status. When the
54 : /// client side declares it has no more message to send, either implicitly or
55 : /// by calling \a WritesDone(), it needs to make sure there is no more message
56 : /// to be received from the server, either implicitly or by getting a false
57 : /// from a \a Read().
58 : ///
59 : /// This function will return either:
60 : /// - when all incoming messages have been read and the server has returned
61 : /// status.
62 : /// - OR when the server has returned a non-OK status.
63 : virtual Status Finish() = 0;
64 : };
65 :
66 : /// An interface that yields a sequence of messages of type \a R.
67 : template <class R>
68 63 : class ReaderInterface {
69 : public:
70 63 : virtual ~ReaderInterface() {}
71 :
72 : /// Blocking read a message and parse to \a msg. Returns \a true on success.
73 : ///
74 : /// \param[out] msg The read message.
75 : ///
76 : /// \return \a false when there will be no more incoming messages, either
77 : /// because the other side has called \a WritesDone() or the stream has failed
78 : /// (or been cancelled).
79 : virtual bool Read(R* msg) = 0;
80 : };
81 :
82 : /// An interface that can be fed a sequence of messages of type \a W.
83 : template <class W>
84 64 : class WriterInterface {
85 : public:
86 64 : virtual ~WriterInterface() {}
87 :
88 : /// Blocking write \a msg to the stream with options.
89 : ///
90 : /// \param msg The message to be written to the stream.
91 : /// \param options Options affecting the write operation.
92 : ///
93 : /// \return \a true on success, \a false when the stream has been closed.
94 : virtual bool Write(const W& msg, const WriteOptions& options) = 0;
95 :
96 : /// Blocking write \a msg to the stream with default options.
97 : ///
98 : /// \param msg The message to be written to the stream.
99 : ///
100 : /// \return \a true on success, \a false when the stream has been closed.
101 144656 : inline bool Write(const W& msg) { return Write(msg, WriteOptions()); }
102 : };
103 :
104 : /// Client-side interface for streaming reads of message of type \a R.
105 : template <class R>
106 8 : class ClientReaderInterface : public ClientStreamingInterface,
107 : public ReaderInterface<R> {
108 : public:
109 : /// Blocking wait for initial metadata from server. The received metadata
110 : /// can only be accessed after this call returns. Should only be called before
111 : /// the first read. Calling this method is optional, and if it is not called
112 : /// the metadata will be available in ClientContext after the first read.
113 : virtual void WaitForInitialMetadata() = 0;
114 : };
115 :
116 : template <class R>
117 8 : class ClientReader GRPC_FINAL : public ClientReaderInterface<R> {
118 : public:
119 : /// Blocking create a stream and write the first request out.
120 : template <class W>
121 4 : ClientReader(Channel* channel, const RpcMethod& method,
122 : ClientContext* context, const W& request)
123 4 : : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
124 : CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
125 4 : CallOpClientSendClose> ops;
126 4 : ops.SendInitialMetadata(context->send_initial_metadata_);
127 : // TODO(ctiller): don't assert
128 4 : GPR_ASSERT(ops.SendMessage(request).ok());
129 4 : ops.ClientSendClose();
130 4 : call_.PerformOps(&ops);
131 4 : cq_.Pluck(&ops);
132 4 : }
133 :
134 0 : void WaitForInitialMetadata() {
135 0 : GPR_ASSERT(!context_->initial_metadata_received_);
136 :
137 0 : CallOpSet<CallOpRecvInitialMetadata> ops;
138 0 : ops.RecvInitialMetadata(context_);
139 0 : call_.PerformOps(&ops);
140 0 : cq_.Pluck(&ops); /// status ignored
141 0 : }
142 :
143 14 : bool Read(R* msg) GRPC_OVERRIDE {
144 14 : CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
145 14 : if (!context_->initial_metadata_received_) {
146 4 : ops.RecvInitialMetadata(context_);
147 : }
148 14 : ops.RecvMessage(msg);
149 14 : call_.PerformOps(&ops);
150 14 : return cq_.Pluck(&ops) && ops.got_message;
151 : }
152 :
153 4 : Status Finish() GRPC_OVERRIDE {
154 4 : CallOpSet<CallOpClientRecvStatus> ops;
155 4 : Status status;
156 4 : ops.ClientRecvStatus(context_, &status);
157 4 : call_.PerformOps(&ops);
158 4 : GPR_ASSERT(cq_.Pluck(&ops));
159 4 : return status;
160 : }
161 :
162 : private:
163 : ClientContext* context_;
164 : CompletionQueue cq_;
165 : Call call_;
166 : };
167 :
168 : /// Client-side interface for streaming writes of message of type \a W.
169 : template <class W>
170 16 : class ClientWriterInterface : public ClientStreamingInterface,
171 : public WriterInterface<W> {
172 : public:
173 : /// Half close writing from the client.
174 : /// Block until writes are completed.
175 : ///
176 : /// \return Whether the writes were successful.
177 : virtual bool WritesDone() = 0;
178 : };
179 :
180 : template <class W>
181 16 : class ClientWriter : public ClientWriterInterface<W> {
182 : public:
183 : /// Blocking create a stream.
184 : template <class R>
185 8 : ClientWriter(Channel* channel, const RpcMethod& method,
186 : ClientContext* context, R* response)
187 8 : : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
188 8 : finish_ops_.RecvMessage(response);
189 :
190 8 : CallOpSet<CallOpSendInitialMetadata> ops;
191 8 : ops.SendInitialMetadata(context->send_initial_metadata_);
192 8 : call_.PerformOps(&ops);
193 8 : cq_.Pluck(&ops);
194 8 : }
195 :
196 : using WriterInterface<W>::Write;
197 50 : bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
198 50 : CallOpSet<CallOpSendMessage> ops;
199 50 : if (!ops.SendMessage(msg, options).ok()) {
200 0 : return false;
201 : }
202 50 : call_.PerformOps(&ops);
203 50 : return cq_.Pluck(&ops);
204 : }
205 :
206 6 : bool WritesDone() GRPC_OVERRIDE {
207 6 : CallOpSet<CallOpClientSendClose> ops;
208 6 : ops.ClientSendClose();
209 6 : call_.PerformOps(&ops);
210 6 : return cq_.Pluck(&ops);
211 : }
212 :
213 : /// Read the final response and wait for the final status.
214 8 : Status Finish() GRPC_OVERRIDE {
215 8 : Status status;
216 8 : finish_ops_.ClientRecvStatus(context_, &status);
217 8 : call_.PerformOps(&finish_ops_);
218 8 : GPR_ASSERT(cq_.Pluck(&finish_ops_));
219 8 : return status;
220 : }
221 :
222 : private:
223 : ClientContext* context_;
224 : CallOpSet<CallOpGenericRecvMessage, CallOpClientRecvStatus> finish_ops_;
225 : CompletionQueue cq_;
226 : Call call_;
227 : };
228 :
229 : /// Client-side interface for bi-directional streaming.
230 : template <class W, class R>
231 58 : class ClientReaderWriterInterface : public ClientStreamingInterface,
232 : public WriterInterface<W>,
233 : public ReaderInterface<R> {
234 : public:
235 : /// Blocking wait for initial metadata from server. The received metadata
236 : /// can only be accessed after this call returns. Should only be called before
237 : /// the first read. Calling this method is optional, and if it is not called
238 : /// the metadata will be available in ClientContext after the first read.
239 : virtual void WaitForInitialMetadata() = 0;
240 :
241 : /// Block until writes are completed.
242 : ///
243 : /// \return Whether the writes were successful.
244 : virtual bool WritesDone() = 0;
245 : };
246 :
247 : template <class W, class R>
248 56 : class ClientReaderWriter GRPC_FINAL : public ClientReaderWriterInterface<W, R> {
249 : public:
250 : /// Blocking create a stream.
251 28 : ClientReaderWriter(Channel* channel, const RpcMethod& method,
252 : ClientContext* context)
253 28 : : context_(context), call_(channel->CreateCall(method, context, &cq_)) {
254 28 : CallOpSet<CallOpSendInitialMetadata> ops;
255 28 : ops.SendInitialMetadata(context->send_initial_metadata_);
256 28 : call_.PerformOps(&ops);
257 28 : cq_.Pluck(&ops);
258 28 : }
259 :
260 0 : void WaitForInitialMetadata() {
261 0 : GPR_ASSERT(!context_->initial_metadata_received_);
262 :
263 0 : CallOpSet<CallOpRecvInitialMetadata> ops;
264 0 : ops.RecvInitialMetadata(context_);
265 0 : call_.PerformOps(&ops);
266 0 : cq_.Pluck(&ops); // status ignored
267 0 : }
268 :
269 81389 : bool Read(R* msg) GRPC_OVERRIDE {
270 81389 : CallOpSet<CallOpRecvInitialMetadata, CallOpRecvMessage<R>> ops;
271 81389 : if (!context_->initial_metadata_received_) {
272 27 : ops.RecvInitialMetadata(context_);
273 : }
274 81389 : ops.RecvMessage(msg);
275 81389 : call_.PerformOps(&ops);
276 81389 : return cq_.Pluck(&ops) && ops.got_message;
277 : }
278 :
279 : using WriterInterface<W>::Write;
280 63209 : bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
281 63209 : CallOpSet<CallOpSendMessage> ops;
282 63209 : if (!ops.SendMessage(msg, options).ok()) return false;
283 63209 : call_.PerformOps(&ops);
284 63209 : return cq_.Pluck(&ops);
285 : }
286 :
287 19 : bool WritesDone() GRPC_OVERRIDE {
288 19 : CallOpSet<CallOpClientSendClose> ops;
289 19 : ops.ClientSendClose();
290 19 : call_.PerformOps(&ops);
291 19 : return cq_.Pluck(&ops);
292 : }
293 :
294 27 : Status Finish() GRPC_OVERRIDE {
295 27 : CallOpSet<CallOpClientRecvStatus> ops;
296 27 : Status status;
297 27 : ops.ClientRecvStatus(context_, &status);
298 27 : call_.PerformOps(&ops);
299 27 : GPR_ASSERT(cq_.Pluck(&ops));
300 27 : return status;
301 : }
302 :
303 : private:
304 : ClientContext* context_;
305 : CompletionQueue cq_;
306 : Call call_;
307 : };
308 :
309 : template <class R>
310 8 : class ServerReader GRPC_FINAL : public ReaderInterface<R> {
311 : public:
312 8 : ServerReader(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
313 :
314 : void SendInitialMetadata() {
315 : GPR_ASSERT(!ctx_->sent_initial_metadata_);
316 :
317 : CallOpSet<CallOpSendInitialMetadata> ops;
318 : ops.SendInitialMetadata(ctx_->initial_metadata_);
319 : ctx_->sent_initial_metadata_ = true;
320 : call_->PerformOps(&ops);
321 : call_->cq()->Pluck(&ops);
322 : }
323 :
324 32 : bool Read(R* msg) GRPC_OVERRIDE {
325 32 : CallOpSet<CallOpRecvMessage<R>> ops;
326 32 : ops.RecvMessage(msg);
327 32 : call_->PerformOps(&ops);
328 32 : return call_->cq()->Pluck(&ops) && ops.got_message;
329 : }
330 :
331 : private:
332 : Call* const call_;
333 : ServerContext* const ctx_;
334 : };
335 :
336 : template <class W>
337 5 : class ServerWriter GRPC_FINAL : public WriterInterface<W> {
338 : public:
339 5 : ServerWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
340 :
341 : void SendInitialMetadata() {
342 : GPR_ASSERT(!ctx_->sent_initial_metadata_);
343 :
344 : CallOpSet<CallOpSendInitialMetadata> ops;
345 : ops.SendInitialMetadata(ctx_->initial_metadata_);
346 : ctx_->sent_initial_metadata_ = true;
347 : call_->PerformOps(&ops);
348 : call_->cq()->Pluck(&ops);
349 : }
350 :
351 : using WriterInterface<W>::Write;
352 18 : bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
353 18 : CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
354 18 : if (!ops.SendMessage(msg, options).ok()) {
355 0 : return false;
356 : }
357 18 : if (!ctx_->sent_initial_metadata_) {
358 5 : ops.SendInitialMetadata(ctx_->initial_metadata_);
359 5 : ctx_->sent_initial_metadata_ = true;
360 : }
361 18 : call_->PerformOps(&ops);
362 18 : return call_->cq()->Pluck(&ops);
363 : }
364 :
365 : private:
366 : Call* const call_;
367 : ServerContext* const ctx_;
368 : };
369 :
370 : /// Server-side interface for bi-directional streaming.
371 : template <class W, class R>
372 22 : class ServerReaderWriter GRPC_FINAL : public WriterInterface<W>,
373 : public ReaderInterface<R> {
374 : public:
375 22 : ServerReaderWriter(Call* call, ServerContext* ctx) : call_(call), ctx_(ctx) {}
376 :
377 : void SendInitialMetadata() {
378 : GPR_ASSERT(!ctx_->sent_initial_metadata_);
379 :
380 : CallOpSet<CallOpSendInitialMetadata> ops;
381 : ops.SendInitialMetadata(ctx_->initial_metadata_);
382 : ctx_->sent_initial_metadata_ = true;
383 : call_->PerformOps(&ops);
384 : call_->cq()->Pluck(&ops);
385 : }
386 :
387 63224 : bool Read(R* msg) GRPC_OVERRIDE {
388 63224 : CallOpSet<CallOpRecvMessage<R>> ops;
389 63224 : ops.RecvMessage(msg);
390 63224 : call_->PerformOps(&ops);
391 63224 : return call_->cq()->Pluck(&ops) && ops.got_message;
392 : }
393 :
394 : using WriterInterface<W>::Write;
395 81376 : bool Write(const W& msg, const WriteOptions& options) GRPC_OVERRIDE {
396 81376 : CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage> ops;
397 81376 : if (!ops.SendMessage(msg, options).ok()) {
398 0 : return false;
399 : }
400 81376 : if (!ctx_->sent_initial_metadata_) {
401 20 : ops.SendInitialMetadata(ctx_->initial_metadata_);
402 20 : ctx_->sent_initial_metadata_ = true;
403 : }
404 81376 : call_->PerformOps(&ops);
405 81376 : return call_->cq()->Pluck(&ops);
406 : }
407 :
408 : private:
409 : Call* const call_;
410 : ServerContext* const ctx_;
411 : };
412 :
413 : } // namespace grpc
414 :
415 : #endif // GRPCXX_SUPPORT_SYNC_STREAM_H
|