Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <memory>
35 :
36 : #include <grpc/grpc.h>
37 : #include <grpc/support/thd.h>
38 : #include <grpc/support/time.h>
39 : #include <grpc++/channel.h>
40 : #include <grpc++/client_context.h>
41 : #include <grpc++/create_channel.h>
42 : #include <grpc++/server.h>
43 : #include <grpc++/server_builder.h>
44 : #include <grpc++/server_context.h>
45 : #include <gtest/gtest.h>
46 :
47 : #include "test/core/util/port.h"
48 : #include "test/core/util/test_config.h"
49 : #include "test/cpp/util/echo_duplicate.grpc.pb.h"
50 : #include "test/cpp/util/echo.grpc.pb.h"
51 : #include "test/cpp/util/string_ref_helper.h"
52 :
53 : #ifdef GPR_POSIX_SOCKET
54 : #include "src/core/iomgr/pollset_posix.h"
55 : #endif
56 :
57 : using grpc::cpp::test::util::EchoRequest;
58 : using grpc::cpp::test::util::EchoResponse;
59 : using std::chrono::system_clock;
60 :
61 : namespace grpc {
62 : namespace testing {
63 :
64 : namespace {
65 :
66 360 : void* tag(int i) { return (void*)(gpr_intptr)i; }
67 :
68 : #ifdef GPR_POSIX_SOCKET
69 696 : static int assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
70 : int timeout) {
71 696 : GPR_ASSERT(timeout == 0);
72 696 : return poll(pfds, nfds, timeout);
73 : }
74 :
75 : class PollOverride {
76 : public:
77 178 : PollOverride(grpc_poll_function_type f) {
78 178 : prev_ = grpc_poll_function;
79 178 : grpc_poll_function = f;
80 178 : }
81 :
82 178 : ~PollOverride() { grpc_poll_function = prev_; }
83 :
84 : private:
85 : grpc_poll_function_type prev_;
86 : };
87 :
88 178 : class PollingCheckRegion : public PollOverride {
89 : public:
90 178 : explicit PollingCheckRegion(bool allow_blocking)
91 178 : : PollOverride(allow_blocking ? poll : assert_non_blocking_poll) {}
92 : };
93 : #else
94 : class PollingCheckRegion {
95 : public:
96 : explicit PollingCheckRegion(bool allow_blocking) {}
97 : };
98 : #endif
99 :
100 178 : class Verifier : public PollingCheckRegion {
101 : public:
102 178 : explicit Verifier(bool spin) : PollingCheckRegion(!spin), spin_(spin) {}
103 180 : Verifier& Expect(int i, bool expect_ok) {
104 180 : expectations_[tag(i)] = expect_ok;
105 180 : return *this;
106 : }
107 168 : void Verify(CompletionQueue* cq) {
108 168 : GPR_ASSERT(!expectations_.empty());
109 510 : while (!expectations_.empty()) {
110 : bool ok;
111 : void* got_tag;
112 174 : if (spin_) {
113 : for (;;) {
114 732 : auto r = cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
115 732 : if (r == CompletionQueue::TIMEOUT) continue;
116 87 : if (r == CompletionQueue::GOT_EVENT) break;
117 0 : gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
118 0 : abort();
119 645 : }
120 : } else {
121 87 : EXPECT_TRUE(cq->Next(&got_tag, &ok));
122 : }
123 174 : auto it = expectations_.find(got_tag);
124 174 : EXPECT_TRUE(it != expectations_.end());
125 174 : EXPECT_EQ(it->second, ok);
126 174 : expectations_.erase(it);
127 : }
128 168 : }
129 10 : void Verify(CompletionQueue* cq,
130 : std::chrono::system_clock::time_point deadline) {
131 10 : if (expectations_.empty()) {
132 : bool ok;
133 : void* got_tag;
134 4 : if (spin_) {
135 4 : while (std::chrono::system_clock::now() < deadline) {
136 0 : EXPECT_EQ(
137 : cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME)),
138 0 : CompletionQueue::TIMEOUT);
139 : }
140 : } else {
141 2 : EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
142 2 : CompletionQueue::TIMEOUT);
143 : }
144 : } else {
145 18 : while (!expectations_.empty()) {
146 : bool ok;
147 : void* got_tag;
148 6 : if (spin_) {
149 : for (;;) {
150 52 : GPR_ASSERT(std::chrono::system_clock::now() < deadline);
151 : auto r =
152 52 : cq->AsyncNext(&got_tag, &ok, gpr_time_0(GPR_CLOCK_REALTIME));
153 52 : if (r == CompletionQueue::TIMEOUT) continue;
154 3 : if (r == CompletionQueue::GOT_EVENT) break;
155 0 : gpr_log(GPR_ERROR, "unexpected result from AsyncNext");
156 0 : abort();
157 49 : }
158 : } else {
159 3 : EXPECT_EQ(cq->AsyncNext(&got_tag, &ok, deadline),
160 3 : CompletionQueue::GOT_EVENT);
161 : }
162 6 : auto it = expectations_.find(got_tag);
163 6 : EXPECT_TRUE(it != expectations_.end());
164 6 : EXPECT_EQ(it->second, ok);
165 6 : expectations_.erase(it);
166 : }
167 : }
168 10 : }
169 :
170 : private:
171 : std::map<void*, bool> expectations_;
172 : bool spin_;
173 : };
174 :
175 26 : class AsyncEnd2endTest : public ::testing::TestWithParam<bool> {
176 : protected:
177 26 : AsyncEnd2endTest() {}
178 :
179 26 : void SetUp() GRPC_OVERRIDE {
180 26 : int port = grpc_pick_unused_port_or_die();
181 26 : server_address_ << "localhost:" << port;
182 : // Setup server
183 26 : ServerBuilder builder;
184 : builder.AddListeningPort(server_address_.str(),
185 26 : grpc::InsecureServerCredentials());
186 26 : builder.RegisterAsyncService(&service_);
187 26 : cq_ = builder.AddCompletionQueue();
188 26 : server_ = builder.BuildAndStart();
189 26 : }
190 :
191 26 : void TearDown() GRPC_OVERRIDE {
192 26 : server_->Shutdown();
193 : void* ignored_tag;
194 : bool ignored_ok;
195 26 : cq_->Shutdown();
196 26 : while (cq_->Next(&ignored_tag, &ignored_ok))
197 : ;
198 26 : }
199 :
200 24 : void ResetStub() {
201 : std::shared_ptr<Channel> channel =
202 24 : CreateChannel(server_address_.str(), InsecureCredentials());
203 24 : stub_ = grpc::cpp::test::util::TestService::NewStub(channel);
204 24 : }
205 :
206 4 : void SendRpc(int num_rpcs) {
207 26 : for (int i = 0; i < num_rpcs; i++) {
208 22 : EchoRequest send_request;
209 44 : EchoRequest recv_request;
210 44 : EchoResponse send_response;
211 44 : EchoResponse recv_response;
212 44 : Status recv_status;
213 :
214 44 : ClientContext cli_ctx;
215 44 : ServerContext srv_ctx;
216 44 : grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
217 :
218 22 : send_request.set_message("Hello");
219 : std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
220 44 : stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
221 :
222 22 : service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
223 44 : cq_.get(), tag(2));
224 :
225 22 : Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
226 22 : EXPECT_EQ(send_request.message(), recv_request.message());
227 :
228 22 : send_response.set_message(recv_request.message());
229 22 : response_writer.Finish(send_response, Status::OK, tag(3));
230 22 : Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
231 :
232 22 : response_reader->Finish(&recv_response, &recv_status, tag(4));
233 22 : Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
234 :
235 22 : EXPECT_EQ(send_response.message(), recv_response.message());
236 22 : EXPECT_TRUE(recv_status.ok());
237 22 : }
238 4 : }
239 :
240 : std::unique_ptr<ServerCompletionQueue> cq_;
241 : std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
242 : std::unique_ptr<Server> server_;
243 : grpc::cpp::test::util::TestService::AsyncService service_;
244 : std::ostringstream server_address_;
245 : };
246 :
247 10 : TEST_P(AsyncEnd2endTest, SimpleRpc) {
248 2 : ResetStub();
249 2 : SendRpc(1);
250 2 : }
251 :
252 10 : TEST_P(AsyncEnd2endTest, SequentialRpcs) {
253 2 : ResetStub();
254 2 : SendRpc(10);
255 2 : }
256 :
257 : // Test a simple RPC using the async version of Next
258 10 : TEST_P(AsyncEnd2endTest, AsyncNextRpc) {
259 2 : ResetStub();
260 :
261 2 : EchoRequest send_request;
262 4 : EchoRequest recv_request;
263 4 : EchoResponse send_response;
264 4 : EchoResponse recv_response;
265 4 : Status recv_status;
266 :
267 4 : ClientContext cli_ctx;
268 4 : ServerContext srv_ctx;
269 4 : grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
270 :
271 2 : send_request.set_message("Hello");
272 : std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
273 4 : stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
274 :
275 : std::chrono::system_clock::time_point time_now(
276 2 : std::chrono::system_clock::now());
277 : std::chrono::system_clock::time_point time_limit(
278 2 : std::chrono::system_clock::now() + std::chrono::seconds(10));
279 2 : Verifier(GetParam()).Verify(cq_.get(), time_now);
280 2 : Verifier(GetParam()).Verify(cq_.get(), time_now);
281 :
282 2 : service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
283 4 : cq_.get(), tag(2));
284 :
285 2 : Verifier(GetParam()).Expect(2, true).Verify(cq_.get(), time_limit);
286 2 : EXPECT_EQ(send_request.message(), recv_request.message());
287 :
288 2 : send_response.set_message(recv_request.message());
289 2 : response_writer.Finish(send_response, Status::OK, tag(3));
290 2 : Verifier(GetParam())
291 4 : .Expect(3, true)
292 4 : .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
293 :
294 2 : response_reader->Finish(&recv_response, &recv_status, tag(4));
295 2 : Verifier(GetParam())
296 4 : .Expect(4, true)
297 4 : .Verify(cq_.get(), std::chrono::system_clock::time_point::max());
298 :
299 2 : EXPECT_EQ(send_response.message(), recv_response.message());
300 4 : EXPECT_TRUE(recv_status.ok());
301 2 : }
302 :
303 : // Two pings and a final pong.
304 10 : TEST_P(AsyncEnd2endTest, SimpleClientStreaming) {
305 2 : ResetStub();
306 :
307 2 : EchoRequest send_request;
308 4 : EchoRequest recv_request;
309 4 : EchoResponse send_response;
310 4 : EchoResponse recv_response;
311 4 : Status recv_status;
312 4 : ClientContext cli_ctx;
313 4 : ServerContext srv_ctx;
314 4 : ServerAsyncReader<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
315 :
316 2 : send_request.set_message("Hello");
317 : std::unique_ptr<ClientAsyncWriter<EchoRequest> > cli_stream(
318 4 : stub_->AsyncRequestStream(&cli_ctx, &recv_response, cq_.get(), tag(1)));
319 :
320 2 : service_.RequestRequestStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
321 4 : tag(2));
322 :
323 2 : Verifier(GetParam()).Expect(2, true).Expect(1, true).Verify(cq_.get());
324 :
325 2 : cli_stream->Write(send_request, tag(3));
326 2 : Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
327 :
328 2 : srv_stream.Read(&recv_request, tag(4));
329 2 : Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
330 2 : EXPECT_EQ(send_request.message(), recv_request.message());
331 :
332 2 : cli_stream->Write(send_request, tag(5));
333 2 : Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
334 :
335 2 : srv_stream.Read(&recv_request, tag(6));
336 2 : Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
337 :
338 2 : EXPECT_EQ(send_request.message(), recv_request.message());
339 2 : cli_stream->WritesDone(tag(7));
340 2 : Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
341 :
342 2 : srv_stream.Read(&recv_request, tag(8));
343 2 : Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
344 :
345 2 : send_response.set_message(recv_request.message());
346 2 : srv_stream.Finish(send_response, Status::OK, tag(9));
347 2 : Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
348 :
349 2 : cli_stream->Finish(&recv_status, tag(10));
350 2 : Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
351 :
352 2 : EXPECT_EQ(send_response.message(), recv_response.message());
353 4 : EXPECT_TRUE(recv_status.ok());
354 2 : }
355 :
356 : // One ping, two pongs.
357 10 : TEST_P(AsyncEnd2endTest, SimpleServerStreaming) {
358 2 : ResetStub();
359 :
360 2 : EchoRequest send_request;
361 4 : EchoRequest recv_request;
362 4 : EchoResponse send_response;
363 4 : EchoResponse recv_response;
364 4 : Status recv_status;
365 4 : ClientContext cli_ctx;
366 4 : ServerContext srv_ctx;
367 4 : ServerAsyncWriter<EchoResponse> srv_stream(&srv_ctx);
368 :
369 2 : send_request.set_message("Hello");
370 : std::unique_ptr<ClientAsyncReader<EchoResponse> > cli_stream(
371 4 : stub_->AsyncResponseStream(&cli_ctx, send_request, cq_.get(), tag(1)));
372 :
373 : service_.RequestResponseStream(&srv_ctx, &recv_request, &srv_stream,
374 2 : cq_.get(), cq_.get(), tag(2));
375 :
376 2 : Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
377 2 : EXPECT_EQ(send_request.message(), recv_request.message());
378 :
379 2 : send_response.set_message(recv_request.message());
380 2 : srv_stream.Write(send_response, tag(3));
381 2 : Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
382 :
383 2 : cli_stream->Read(&recv_response, tag(4));
384 2 : Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
385 2 : EXPECT_EQ(send_response.message(), recv_response.message());
386 :
387 2 : srv_stream.Write(send_response, tag(5));
388 2 : Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
389 :
390 2 : cli_stream->Read(&recv_response, tag(6));
391 2 : Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
392 2 : EXPECT_EQ(send_response.message(), recv_response.message());
393 :
394 2 : srv_stream.Finish(Status::OK, tag(7));
395 2 : Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
396 :
397 2 : cli_stream->Read(&recv_response, tag(8));
398 2 : Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
399 :
400 2 : cli_stream->Finish(&recv_status, tag(9));
401 2 : Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
402 :
403 4 : EXPECT_TRUE(recv_status.ok());
404 2 : }
405 :
406 : // One ping, one pong.
407 10 : TEST_P(AsyncEnd2endTest, SimpleBidiStreaming) {
408 2 : ResetStub();
409 :
410 2 : EchoRequest send_request;
411 4 : EchoRequest recv_request;
412 4 : EchoResponse send_response;
413 4 : EchoResponse recv_response;
414 4 : Status recv_status;
415 4 : ClientContext cli_ctx;
416 4 : ServerContext srv_ctx;
417 4 : ServerAsyncReaderWriter<EchoResponse, EchoRequest> srv_stream(&srv_ctx);
418 :
419 2 : send_request.set_message("Hello");
420 : std::unique_ptr<ClientAsyncReaderWriter<EchoRequest, EchoResponse> >
421 4 : cli_stream(stub_->AsyncBidiStream(&cli_ctx, cq_.get(), tag(1)));
422 :
423 2 : service_.RequestBidiStream(&srv_ctx, &srv_stream, cq_.get(), cq_.get(),
424 4 : tag(2));
425 :
426 2 : Verifier(GetParam()).Expect(1, true).Expect(2, true).Verify(cq_.get());
427 :
428 2 : cli_stream->Write(send_request, tag(3));
429 2 : Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
430 :
431 2 : srv_stream.Read(&recv_request, tag(4));
432 2 : Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
433 2 : EXPECT_EQ(send_request.message(), recv_request.message());
434 :
435 2 : send_response.set_message(recv_request.message());
436 2 : srv_stream.Write(send_response, tag(5));
437 2 : Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
438 :
439 2 : cli_stream->Read(&recv_response, tag(6));
440 2 : Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
441 2 : EXPECT_EQ(send_response.message(), recv_response.message());
442 :
443 2 : cli_stream->WritesDone(tag(7));
444 2 : Verifier(GetParam()).Expect(7, true).Verify(cq_.get());
445 :
446 2 : srv_stream.Read(&recv_request, tag(8));
447 2 : Verifier(GetParam()).Expect(8, false).Verify(cq_.get());
448 :
449 2 : srv_stream.Finish(Status::OK, tag(9));
450 2 : Verifier(GetParam()).Expect(9, true).Verify(cq_.get());
451 :
452 2 : cli_stream->Finish(&recv_status, tag(10));
453 2 : Verifier(GetParam()).Expect(10, true).Verify(cq_.get());
454 :
455 4 : EXPECT_TRUE(recv_status.ok());
456 2 : }
457 :
458 : // Metadata tests
459 10 : TEST_P(AsyncEnd2endTest, ClientInitialMetadataRpc) {
460 2 : ResetStub();
461 :
462 2 : EchoRequest send_request;
463 4 : EchoRequest recv_request;
464 4 : EchoResponse send_response;
465 4 : EchoResponse recv_response;
466 4 : Status recv_status;
467 :
468 4 : ClientContext cli_ctx;
469 4 : ServerContext srv_ctx;
470 4 : grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
471 :
472 2 : send_request.set_message("Hello");
473 4 : std::pair<grpc::string, grpc::string> meta1("key1", "val1");
474 4 : std::pair<grpc::string, grpc::string> meta2("key2", "val2");
475 2 : cli_ctx.AddMetadata(meta1.first, meta1.second);
476 2 : cli_ctx.AddMetadata(meta2.first, meta2.second);
477 :
478 : std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
479 4 : stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
480 :
481 2 : service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
482 4 : cq_.get(), tag(2));
483 2 : Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
484 2 : EXPECT_EQ(send_request.message(), recv_request.message());
485 4 : auto client_initial_metadata = srv_ctx.client_metadata();
486 2 : EXPECT_EQ(meta1.second,
487 2 : ToString(client_initial_metadata.find(meta1.first)->second));
488 2 : EXPECT_EQ(meta2.second,
489 2 : ToString(client_initial_metadata.find(meta2.first)->second));
490 2 : EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
491 :
492 2 : send_response.set_message(recv_request.message());
493 2 : response_writer.Finish(send_response, Status::OK, tag(3));
494 :
495 2 : Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
496 :
497 2 : response_reader->Finish(&recv_response, &recv_status, tag(4));
498 2 : Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
499 :
500 2 : EXPECT_EQ(send_response.message(), recv_response.message());
501 4 : EXPECT_TRUE(recv_status.ok());
502 2 : }
503 :
504 10 : TEST_P(AsyncEnd2endTest, ServerInitialMetadataRpc) {
505 2 : ResetStub();
506 :
507 2 : EchoRequest send_request;
508 4 : EchoRequest recv_request;
509 4 : EchoResponse send_response;
510 4 : EchoResponse recv_response;
511 4 : Status recv_status;
512 :
513 4 : ClientContext cli_ctx;
514 4 : ServerContext srv_ctx;
515 4 : grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
516 :
517 2 : send_request.set_message("Hello");
518 4 : std::pair<grpc::string, grpc::string> meta1("key1", "val1");
519 4 : std::pair<grpc::string, grpc::string> meta2("key2", "val2");
520 :
521 : std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
522 4 : stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
523 :
524 2 : service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
525 4 : cq_.get(), tag(2));
526 2 : Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
527 2 : EXPECT_EQ(send_request.message(), recv_request.message());
528 2 : srv_ctx.AddInitialMetadata(meta1.first, meta1.second);
529 2 : srv_ctx.AddInitialMetadata(meta2.first, meta2.second);
530 2 : response_writer.SendInitialMetadata(tag(3));
531 2 : Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
532 :
533 2 : response_reader->ReadInitialMetadata(tag(4));
534 2 : Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
535 4 : auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
536 2 : EXPECT_EQ(meta1.second,
537 2 : ToString(server_initial_metadata.find(meta1.first)->second));
538 2 : EXPECT_EQ(meta2.second,
539 2 : ToString(server_initial_metadata.find(meta2.first)->second));
540 2 : EXPECT_EQ(static_cast<size_t>(2), server_initial_metadata.size());
541 :
542 2 : send_response.set_message(recv_request.message());
543 2 : response_writer.Finish(send_response, Status::OK, tag(5));
544 2 : Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
545 :
546 2 : response_reader->Finish(&recv_response, &recv_status, tag(6));
547 2 : Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
548 :
549 2 : EXPECT_EQ(send_response.message(), recv_response.message());
550 4 : EXPECT_TRUE(recv_status.ok());
551 2 : }
552 :
553 10 : TEST_P(AsyncEnd2endTest, ServerTrailingMetadataRpc) {
554 2 : ResetStub();
555 :
556 2 : EchoRequest send_request;
557 4 : EchoRequest recv_request;
558 4 : EchoResponse send_response;
559 4 : EchoResponse recv_response;
560 4 : Status recv_status;
561 :
562 4 : ClientContext cli_ctx;
563 4 : ServerContext srv_ctx;
564 4 : grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
565 :
566 2 : send_request.set_message("Hello");
567 4 : std::pair<grpc::string, grpc::string> meta1("key1", "val1");
568 4 : std::pair<grpc::string, grpc::string> meta2("key2", "val2");
569 :
570 : std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
571 4 : stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
572 :
573 2 : service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
574 4 : cq_.get(), tag(2));
575 2 : Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
576 2 : EXPECT_EQ(send_request.message(), recv_request.message());
577 2 : response_writer.SendInitialMetadata(tag(3));
578 2 : Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
579 :
580 2 : send_response.set_message(recv_request.message());
581 2 : srv_ctx.AddTrailingMetadata(meta1.first, meta1.second);
582 2 : srv_ctx.AddTrailingMetadata(meta2.first, meta2.second);
583 2 : response_writer.Finish(send_response, Status::OK, tag(4));
584 :
585 2 : Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
586 :
587 2 : response_reader->Finish(&recv_response, &recv_status, tag(5));
588 2 : Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
589 2 : EXPECT_EQ(send_response.message(), recv_response.message());
590 2 : EXPECT_TRUE(recv_status.ok());
591 4 : auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
592 2 : EXPECT_EQ(meta1.second,
593 2 : ToString(server_trailing_metadata.find(meta1.first)->second));
594 2 : EXPECT_EQ(meta2.second,
595 2 : ToString(server_trailing_metadata.find(meta2.first)->second));
596 4 : EXPECT_EQ(static_cast<size_t>(2), server_trailing_metadata.size());
597 2 : }
598 :
599 10 : TEST_P(AsyncEnd2endTest, MetadataRpc) {
600 2 : ResetStub();
601 :
602 2 : EchoRequest send_request;
603 4 : EchoRequest recv_request;
604 4 : EchoResponse send_response;
605 4 : EchoResponse recv_response;
606 4 : Status recv_status;
607 :
608 4 : ClientContext cli_ctx;
609 4 : ServerContext srv_ctx;
610 4 : grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
611 :
612 2 : send_request.set_message("Hello");
613 4 : std::pair<grpc::string, grpc::string> meta1("key1", "val1");
614 : std::pair<grpc::string, grpc::string> meta2(
615 : "key2-bin",
616 4 : grpc::string("\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xcb\xcc", 13));
617 4 : std::pair<grpc::string, grpc::string> meta3("key3", "val3");
618 : std::pair<grpc::string, grpc::string> meta6(
619 : "key4-bin",
620 : grpc::string("\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d",
621 4 : 14));
622 4 : std::pair<grpc::string, grpc::string> meta5("key5", "val5");
623 : std::pair<grpc::string, grpc::string> meta4(
624 : "key6-bin",
625 : grpc::string(
626 4 : "\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee", 15));
627 :
628 2 : cli_ctx.AddMetadata(meta1.first, meta1.second);
629 2 : cli_ctx.AddMetadata(meta2.first, meta2.second);
630 :
631 : std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
632 4 : stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
633 :
634 2 : service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
635 4 : cq_.get(), tag(2));
636 2 : Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
637 2 : EXPECT_EQ(send_request.message(), recv_request.message());
638 4 : auto client_initial_metadata = srv_ctx.client_metadata();
639 2 : EXPECT_EQ(meta1.second,
640 2 : ToString(client_initial_metadata.find(meta1.first)->second));
641 2 : EXPECT_EQ(meta2.second,
642 2 : ToString(client_initial_metadata.find(meta2.first)->second));
643 2 : EXPECT_GE(client_initial_metadata.size(), static_cast<size_t>(2));
644 :
645 2 : srv_ctx.AddInitialMetadata(meta3.first, meta3.second);
646 2 : srv_ctx.AddInitialMetadata(meta4.first, meta4.second);
647 2 : response_writer.SendInitialMetadata(tag(3));
648 2 : Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
649 2 : response_reader->ReadInitialMetadata(tag(4));
650 2 : Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
651 4 : auto server_initial_metadata = cli_ctx.GetServerInitialMetadata();
652 2 : EXPECT_EQ(meta3.second,
653 2 : ToString(server_initial_metadata.find(meta3.first)->second));
654 2 : EXPECT_EQ(meta4.second,
655 2 : ToString(server_initial_metadata.find(meta4.first)->second));
656 2 : EXPECT_GE(server_initial_metadata.size(), static_cast<size_t>(2));
657 :
658 2 : send_response.set_message(recv_request.message());
659 2 : srv_ctx.AddTrailingMetadata(meta5.first, meta5.second);
660 2 : srv_ctx.AddTrailingMetadata(meta6.first, meta6.second);
661 2 : response_writer.Finish(send_response, Status::OK, tag(5));
662 :
663 2 : Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
664 :
665 2 : response_reader->Finish(&recv_response, &recv_status, tag(6));
666 2 : Verifier(GetParam()).Expect(6, true).Verify(cq_.get());
667 2 : EXPECT_EQ(send_response.message(), recv_response.message());
668 2 : EXPECT_TRUE(recv_status.ok());
669 4 : auto server_trailing_metadata = cli_ctx.GetServerTrailingMetadata();
670 2 : EXPECT_EQ(meta5.second,
671 2 : ToString(server_trailing_metadata.find(meta5.first)->second));
672 2 : EXPECT_EQ(meta6.second,
673 2 : ToString(server_trailing_metadata.find(meta6.first)->second));
674 4 : EXPECT_GE(server_trailing_metadata.size(), static_cast<size_t>(2));
675 2 : }
676 :
677 : // Server uses AsyncNotifyWhenDone API to check for cancellation
678 10 : TEST_P(AsyncEnd2endTest, ServerCheckCancellation) {
679 2 : ResetStub();
680 :
681 2 : EchoRequest send_request;
682 4 : EchoRequest recv_request;
683 4 : EchoResponse send_response;
684 4 : EchoResponse recv_response;
685 4 : Status recv_status;
686 :
687 4 : ClientContext cli_ctx;
688 4 : ServerContext srv_ctx;
689 4 : grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
690 :
691 2 : send_request.set_message("Hello");
692 : std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
693 4 : stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
694 :
695 2 : srv_ctx.AsyncNotifyWhenDone(tag(5));
696 2 : service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
697 4 : cq_.get(), tag(2));
698 :
699 2 : Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
700 2 : EXPECT_EQ(send_request.message(), recv_request.message());
701 :
702 2 : cli_ctx.TryCancel();
703 2 : Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
704 2 : EXPECT_TRUE(srv_ctx.IsCancelled());
705 :
706 2 : response_reader->Finish(&recv_response, &recv_status, tag(4));
707 2 : Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
708 :
709 4 : EXPECT_EQ(StatusCode::CANCELLED, recv_status.error_code());
710 2 : }
711 :
712 : // Server uses AsyncNotifyWhenDone API to check for normal finish
713 10 : TEST_P(AsyncEnd2endTest, ServerCheckDone) {
714 2 : ResetStub();
715 :
716 2 : EchoRequest send_request;
717 4 : EchoRequest recv_request;
718 4 : EchoResponse send_response;
719 4 : EchoResponse recv_response;
720 4 : Status recv_status;
721 :
722 4 : ClientContext cli_ctx;
723 4 : ServerContext srv_ctx;
724 4 : grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
725 :
726 2 : send_request.set_message("Hello");
727 : std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
728 4 : stub_->AsyncEcho(&cli_ctx, send_request, cq_.get()));
729 :
730 2 : srv_ctx.AsyncNotifyWhenDone(tag(5));
731 2 : service_.RequestEcho(&srv_ctx, &recv_request, &response_writer, cq_.get(),
732 4 : cq_.get(), tag(2));
733 :
734 2 : Verifier(GetParam()).Expect(2, true).Verify(cq_.get());
735 2 : EXPECT_EQ(send_request.message(), recv_request.message());
736 :
737 2 : send_response.set_message(recv_request.message());
738 2 : response_writer.Finish(send_response, Status::OK, tag(3));
739 2 : Verifier(GetParam()).Expect(3, true).Verify(cq_.get());
740 2 : Verifier(GetParam()).Expect(5, true).Verify(cq_.get());
741 2 : EXPECT_FALSE(srv_ctx.IsCancelled());
742 :
743 2 : response_reader->Finish(&recv_response, &recv_status, tag(4));
744 2 : Verifier(GetParam()).Expect(4, true).Verify(cq_.get());
745 :
746 2 : EXPECT_EQ(send_response.message(), recv_response.message());
747 4 : EXPECT_TRUE(recv_status.ok());
748 2 : }
749 :
750 10 : TEST_P(AsyncEnd2endTest, UnimplementedRpc) {
751 : std::shared_ptr<Channel> channel =
752 2 : CreateChannel(server_address_.str(), InsecureCredentials());
753 4 : std::unique_ptr<grpc::cpp::test::util::UnimplementedService::Stub> stub;
754 2 : stub = grpc::cpp::test::util::UnimplementedService::NewStub(channel);
755 4 : EchoRequest send_request;
756 4 : EchoResponse recv_response;
757 4 : Status recv_status;
758 :
759 4 : ClientContext cli_ctx;
760 2 : send_request.set_message("Hello");
761 : std::unique_ptr<ClientAsyncResponseReader<EchoResponse> > response_reader(
762 4 : stub->AsyncUnimplemented(&cli_ctx, send_request, cq_.get()));
763 :
764 2 : response_reader->Finish(&recv_response, &recv_status, tag(4));
765 2 : Verifier(GetParam()).Expect(4, false).Verify(cq_.get());
766 :
767 2 : EXPECT_EQ(StatusCode::UNIMPLEMENTED, recv_status.error_code());
768 4 : EXPECT_EQ("", recv_status.error_message());
769 2 : }
770 :
771 14 : INSTANTIATE_TEST_CASE_P(AsyncEnd2end, AsyncEnd2endTest,
772 : ::testing::Values(false, true));
773 :
774 : } // namespace
775 : } // namespace testing
776 : } // namespace grpc
777 :
778 1 : int main(int argc, char** argv) {
779 1 : grpc_test_init(argc, argv);
780 1 : ::testing::InitGoogleTest(&argc, argv);
781 1 : return RUN_ALL_TESTS();
782 3 : }
|