Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "test/cpp/interop/interop_client.h"
35 :
36 : #include <unistd.h>
37 :
38 : #include <fstream>
39 : #include <memory>
40 :
41 : #include <grpc/grpc.h>
42 : #include <grpc/support/log.h>
43 : #include <grpc/support/string_util.h>
44 : #include <grpc/support/useful.h>
45 : #include <grpc++/channel.h>
46 : #include <grpc++/client_context.h>
47 : #include <grpc++/security/credentials.h>
48 :
49 : #include "src/core/transport/stream_op.h"
50 : #include "test/cpp/interop/client_helper.h"
51 : #include "test/proto/test.grpc.pb.h"
52 : #include "test/proto/empty.grpc.pb.h"
53 : #include "test/proto/messages.grpc.pb.h"
54 :
55 : namespace grpc {
56 : namespace testing {
57 :
58 : static const char* kRandomFile = "test/cpp/interop/rnd.dat";
59 :
60 : namespace {
61 : // The same value is defined by the Java client.
62 4 : const std::vector<int> request_stream_sizes = {27182, 8, 1828, 45904};
63 4 : const std::vector<int> response_stream_sizes = {31415, 9, 2653, 58979};
64 : const int kNumResponseMessages = 2000;
65 : const int kResponseMessageSize = 1030;
66 : const int kReceiveDelayMilliSeconds = 20;
67 : const int kLargeRequestSize = 271828;
68 : const int kLargeResponseSize = 314159;
69 :
70 4 : CompressionType GetInteropCompressionTypeFromCompressionAlgorithm(
71 : grpc_compression_algorithm algorithm) {
72 4 : switch (algorithm) {
73 : case GRPC_COMPRESS_NONE:
74 4 : return CompressionType::NONE;
75 : case GRPC_COMPRESS_GZIP:
76 0 : return CompressionType::GZIP;
77 : case GRPC_COMPRESS_DEFLATE:
78 0 : return CompressionType::DEFLATE;
79 : default:
80 0 : GPR_ASSERT(false);
81 : }
82 : }
83 : } // namespace
84 :
85 4 : InteropClient::InteropClient(std::shared_ptr<Channel> channel)
86 4 : : channel_(channel) {}
87 :
88 4 : void InteropClient::AssertOkOrPrintErrorStatus(const Status& s) {
89 4 : if (s.ok()) {
90 8 : return;
91 : }
92 0 : gpr_log(GPR_INFO, "Error status code: %d, message: %s", s.error_code(),
93 0 : s.error_message().c_str());
94 0 : GPR_ASSERT(0);
95 : }
96 :
97 0 : void InteropClient::DoEmpty() {
98 0 : gpr_log(GPR_INFO, "Sending an empty rpc...");
99 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
100 :
101 0 : Empty request = Empty::default_instance();
102 0 : Empty response = Empty::default_instance();
103 0 : ClientContext context;
104 :
105 0 : Status s = stub->EmptyCall(&context, request, &response);
106 0 : AssertOkOrPrintErrorStatus(s);
107 :
108 0 : gpr_log(GPR_INFO, "Empty rpc done.");
109 0 : }
110 :
111 : // Shared code to set large payload, make rpc and check response payload.
112 4 : void InteropClient::PerformLargeUnary(SimpleRequest* request,
113 : SimpleResponse* response) {
114 4 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
115 :
116 8 : ClientContext context;
117 4 : InteropClientContextInspector inspector(context);
118 : // If the request doesn't already specify the response type, default to
119 : // COMPRESSABLE.
120 4 : request->set_response_size(kLargeResponseSize);
121 8 : grpc::string payload(kLargeRequestSize, '\0');
122 4 : request->mutable_payload()->set_body(payload.c_str(), kLargeRequestSize);
123 :
124 8 : Status s = stub->UnaryCall(&context, *request, response);
125 :
126 : // Compression related checks.
127 4 : GPR_ASSERT(request->response_compression() ==
128 : GetInteropCompressionTypeFromCompressionAlgorithm(
129 : inspector.GetCallCompressionAlgorithm()));
130 4 : if (request->response_compression() == NONE) {
131 4 : GPR_ASSERT(!(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
132 0 : } else if (request->response_type() == PayloadType::COMPRESSABLE) {
133 : // requested compression and compressable response => results should always
134 : // be compressed.
135 0 : GPR_ASSERT(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS);
136 : }
137 :
138 4 : AssertOkOrPrintErrorStatus(s);
139 :
140 : // Payload related checks.
141 4 : if (request->response_type() != PayloadType::RANDOM) {
142 4 : GPR_ASSERT(response->payload().type() == request->response_type());
143 : }
144 4 : switch (response->payload().type()) {
145 : case PayloadType::COMPRESSABLE:
146 4 : GPR_ASSERT(response->payload().body() ==
147 : grpc::string(kLargeResponseSize, '\0'));
148 4 : break;
149 : case PayloadType::UNCOMPRESSABLE: {
150 0 : std::ifstream rnd_file(kRandomFile);
151 0 : GPR_ASSERT(rnd_file.good());
152 0 : for (int i = 0; i < kLargeResponseSize; i++) {
153 0 : GPR_ASSERT(response->payload().body()[i] == (char)rnd_file.get());
154 0 : }
155 0 : } break;
156 : default:
157 0 : GPR_ASSERT(false);
158 4 : }
159 4 : }
160 :
161 0 : void InteropClient::DoComputeEngineCreds(
162 : const grpc::string& default_service_account,
163 : const grpc::string& oauth_scope) {
164 : gpr_log(GPR_INFO,
165 0 : "Sending a large unary rpc with compute engine credentials ...");
166 0 : SimpleRequest request;
167 0 : SimpleResponse response;
168 0 : request.set_fill_username(true);
169 0 : request.set_fill_oauth_scope(true);
170 0 : request.set_response_type(PayloadType::COMPRESSABLE);
171 0 : PerformLargeUnary(&request, &response);
172 0 : gpr_log(GPR_INFO, "Got username %s", response.username().c_str());
173 0 : gpr_log(GPR_INFO, "Got oauth_scope %s", response.oauth_scope().c_str());
174 0 : GPR_ASSERT(!response.username().empty());
175 0 : GPR_ASSERT(response.username().c_str() == default_service_account);
176 0 : GPR_ASSERT(!response.oauth_scope().empty());
177 0 : const char* oauth_scope_str = response.oauth_scope().c_str();
178 0 : GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
179 0 : gpr_log(GPR_INFO, "Large unary with compute engine creds done.");
180 0 : }
181 :
182 0 : void InteropClient::DoOauth2AuthToken(const grpc::string& username,
183 : const grpc::string& oauth_scope) {
184 : gpr_log(GPR_INFO,
185 0 : "Sending a unary rpc with raw oauth2 access token credentials ...");
186 0 : SimpleRequest request;
187 0 : SimpleResponse response;
188 0 : request.set_fill_username(true);
189 0 : request.set_fill_oauth_scope(true);
190 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
191 :
192 0 : ClientContext context;
193 :
194 0 : Status s = stub->UnaryCall(&context, request, &response);
195 :
196 0 : AssertOkOrPrintErrorStatus(s);
197 0 : GPR_ASSERT(!response.username().empty());
198 0 : GPR_ASSERT(!response.oauth_scope().empty());
199 0 : GPR_ASSERT(username == response.username());
200 0 : const char* oauth_scope_str = response.oauth_scope().c_str();
201 0 : GPR_ASSERT(oauth_scope.find(oauth_scope_str) != grpc::string::npos);
202 0 : gpr_log(GPR_INFO, "Unary with oauth2 access token credentials done.");
203 0 : }
204 :
205 0 : void InteropClient::DoPerRpcCreds(const grpc::string& json_key) {
206 0 : gpr_log(GPR_INFO, "Sending a unary rpc with per-rpc JWT access token ...");
207 0 : SimpleRequest request;
208 0 : SimpleResponse response;
209 0 : request.set_fill_username(true);
210 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
211 :
212 0 : ClientContext context;
213 0 : std::chrono::seconds token_lifetime = std::chrono::hours(1);
214 : std::shared_ptr<Credentials> creds =
215 0 : ServiceAccountJWTAccessCredentials(json_key, token_lifetime.count());
216 :
217 0 : context.set_credentials(creds);
218 :
219 0 : Status s = stub->UnaryCall(&context, request, &response);
220 :
221 0 : AssertOkOrPrintErrorStatus(s);
222 0 : GPR_ASSERT(!response.username().empty());
223 0 : GPR_ASSERT(json_key.find(response.username()) != grpc::string::npos);
224 0 : gpr_log(GPR_INFO, "Unary with per-rpc JWT access token done.");
225 0 : }
226 :
227 0 : void InteropClient::DoJwtTokenCreds(const grpc::string& username) {
228 0 : gpr_log(GPR_INFO, "Sending a large unary rpc with JWT token credentials ...");
229 0 : SimpleRequest request;
230 0 : SimpleResponse response;
231 0 : request.set_fill_username(true);
232 0 : request.set_response_type(PayloadType::COMPRESSABLE);
233 0 : PerformLargeUnary(&request, &response);
234 0 : GPR_ASSERT(!response.username().empty());
235 0 : GPR_ASSERT(username.find(response.username()) != grpc::string::npos);
236 0 : gpr_log(GPR_INFO, "Large unary with JWT token creds done.");
237 0 : }
238 :
239 4 : void InteropClient::DoLargeUnary() {
240 4 : gpr_log(GPR_INFO, "Sending a large unary rpc...");
241 4 : SimpleRequest request;
242 8 : SimpleResponse response;
243 4 : request.set_response_type(PayloadType::COMPRESSABLE);
244 4 : PerformLargeUnary(&request, &response);
245 8 : gpr_log(GPR_INFO, "Large unary done.");
246 4 : }
247 :
248 0 : void InteropClient::DoLargeCompressedUnary() {
249 0 : const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
250 0 : const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
251 0 : for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
252 0 : for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
253 : char* log_suffix;
254 : gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
255 0 : CompressionType_Name(compression_types[j]).c_str(),
256 0 : PayloadType_Name(payload_types[i]).c_str());
257 :
258 0 : gpr_log(GPR_INFO, "Sending a large compressed unary rpc %s.", log_suffix);
259 0 : SimpleRequest request;
260 0 : SimpleResponse response;
261 0 : request.set_response_type(payload_types[i]);
262 0 : request.set_response_compression(compression_types[j]);
263 0 : PerformLargeUnary(&request, &response);
264 0 : gpr_log(GPR_INFO, "Large compressed unary done %s.", log_suffix);
265 0 : gpr_free(log_suffix);
266 0 : }
267 : }
268 0 : }
269 :
270 0 : void InteropClient::DoRequestStreaming() {
271 0 : gpr_log(GPR_INFO, "Sending request steaming rpc ...");
272 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
273 :
274 0 : ClientContext context;
275 0 : StreamingInputCallRequest request;
276 0 : StreamingInputCallResponse response;
277 :
278 : std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
279 0 : stub->StreamingInputCall(&context, &response));
280 :
281 0 : int aggregated_payload_size = 0;
282 0 : for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
283 0 : Payload* payload = request.mutable_payload();
284 0 : payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
285 0 : GPR_ASSERT(stream->Write(request));
286 0 : aggregated_payload_size += request_stream_sizes[i];
287 : }
288 0 : stream->WritesDone();
289 0 : Status s = stream->Finish();
290 :
291 0 : GPR_ASSERT(response.aggregated_payload_size() == aggregated_payload_size);
292 0 : AssertOkOrPrintErrorStatus(s);
293 0 : gpr_log(GPR_INFO, "Request streaming done.");
294 0 : }
295 :
296 0 : void InteropClient::DoResponseStreaming() {
297 0 : gpr_log(GPR_INFO, "Receiving response steaming rpc ...");
298 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
299 :
300 0 : ClientContext context;
301 0 : StreamingOutputCallRequest request;
302 0 : for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
303 0 : ResponseParameters* response_parameter = request.add_response_parameters();
304 0 : response_parameter->set_size(response_stream_sizes[i]);
305 : }
306 0 : StreamingOutputCallResponse response;
307 : std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
308 0 : stub->StreamingOutputCall(&context, request));
309 :
310 0 : unsigned int i = 0;
311 0 : while (stream->Read(&response)) {
312 0 : GPR_ASSERT(response.payload().body() ==
313 : grpc::string(response_stream_sizes[i], '\0'));
314 0 : ++i;
315 : }
316 0 : GPR_ASSERT(response_stream_sizes.size() == i);
317 0 : Status s = stream->Finish();
318 0 : AssertOkOrPrintErrorStatus(s);
319 0 : gpr_log(GPR_INFO, "Response streaming done.");
320 0 : }
321 :
322 0 : void InteropClient::DoResponseCompressedStreaming() {
323 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
324 :
325 0 : const CompressionType compression_types[] = {NONE, GZIP, DEFLATE};
326 0 : const PayloadType payload_types[] = {COMPRESSABLE, UNCOMPRESSABLE, RANDOM};
327 0 : for (size_t i = 0; i < GPR_ARRAY_SIZE(payload_types); i++) {
328 0 : for (size_t j = 0; j < GPR_ARRAY_SIZE(compression_types); j++) {
329 0 : ClientContext context;
330 0 : InteropClientContextInspector inspector(context);
331 0 : StreamingOutputCallRequest request;
332 :
333 : char* log_suffix;
334 : gpr_asprintf(&log_suffix, "(compression=%s; payload=%s)",
335 0 : CompressionType_Name(compression_types[j]).c_str(),
336 0 : PayloadType_Name(payload_types[i]).c_str());
337 :
338 0 : gpr_log(GPR_INFO, "Receiving response steaming rpc %s.", log_suffix);
339 :
340 0 : request.set_response_type(payload_types[i]);
341 0 : request.set_response_compression(compression_types[j]);
342 :
343 0 : for (size_t k = 0; k < response_stream_sizes.size(); ++k) {
344 : ResponseParameters* response_parameter =
345 0 : request.add_response_parameters();
346 0 : response_parameter->set_size(response_stream_sizes[k]);
347 : }
348 0 : StreamingOutputCallResponse response;
349 :
350 : std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
351 0 : stub->StreamingOutputCall(&context, request));
352 :
353 0 : size_t k = 0;
354 0 : while (stream->Read(&response)) {
355 : // Payload related checks.
356 0 : if (request.response_type() != PayloadType::RANDOM) {
357 0 : GPR_ASSERT(response.payload().type() == request.response_type());
358 : }
359 0 : switch (response.payload().type()) {
360 : case PayloadType::COMPRESSABLE:
361 0 : GPR_ASSERT(response.payload().body() ==
362 : grpc::string(response_stream_sizes[k], '\0'));
363 0 : break;
364 : case PayloadType::UNCOMPRESSABLE: {
365 0 : std::ifstream rnd_file(kRandomFile);
366 0 : GPR_ASSERT(rnd_file.good());
367 0 : for (int n = 0; n < response_stream_sizes[k]; n++) {
368 0 : GPR_ASSERT(response.payload().body()[n] == (char)rnd_file.get());
369 0 : }
370 0 : } break;
371 : default:
372 0 : GPR_ASSERT(false);
373 : }
374 :
375 : // Compression related checks.
376 0 : GPR_ASSERT(request.response_compression() ==
377 : GetInteropCompressionTypeFromCompressionAlgorithm(
378 : inspector.GetCallCompressionAlgorithm()));
379 0 : if (request.response_compression() == NONE) {
380 0 : GPR_ASSERT(
381 : !(inspector.GetMessageFlags() & GRPC_WRITE_INTERNAL_COMPRESS));
382 0 : } else if (request.response_type() == PayloadType::COMPRESSABLE) {
383 : // requested compression and compressable response => results should
384 : // always be compressed.
385 0 : GPR_ASSERT(inspector.GetMessageFlags() &
386 : GRPC_WRITE_INTERNAL_COMPRESS);
387 : }
388 :
389 0 : ++k;
390 : }
391 :
392 0 : GPR_ASSERT(response_stream_sizes.size() == k);
393 0 : Status s = stream->Finish();
394 :
395 0 : AssertOkOrPrintErrorStatus(s);
396 0 : gpr_log(GPR_INFO, "Response streaming done %s.", log_suffix);
397 0 : gpr_free(log_suffix);
398 0 : }
399 0 : }
400 0 : }
401 :
402 0 : void InteropClient::DoResponseStreamingWithSlowConsumer() {
403 0 : gpr_log(GPR_INFO, "Receiving response steaming rpc with slow consumer ...");
404 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
405 :
406 0 : ClientContext context;
407 0 : StreamingOutputCallRequest request;
408 :
409 0 : for (int i = 0; i < kNumResponseMessages; ++i) {
410 0 : ResponseParameters* response_parameter = request.add_response_parameters();
411 0 : response_parameter->set_size(kResponseMessageSize);
412 : }
413 0 : StreamingOutputCallResponse response;
414 : std::unique_ptr<ClientReader<StreamingOutputCallResponse>> stream(
415 0 : stub->StreamingOutputCall(&context, request));
416 :
417 0 : int i = 0;
418 0 : while (stream->Read(&response)) {
419 0 : GPR_ASSERT(response.payload().body() ==
420 : grpc::string(kResponseMessageSize, '\0'));
421 0 : gpr_log(GPR_INFO, "received message %d", i);
422 0 : usleep(kReceiveDelayMilliSeconds * 1000);
423 0 : ++i;
424 : }
425 0 : GPR_ASSERT(kNumResponseMessages == i);
426 0 : Status s = stream->Finish();
427 :
428 0 : AssertOkOrPrintErrorStatus(s);
429 0 : gpr_log(GPR_INFO, "Response streaming done.");
430 0 : }
431 :
432 0 : void InteropClient::DoHalfDuplex() {
433 0 : gpr_log(GPR_INFO, "Sending half-duplex streaming rpc ...");
434 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
435 :
436 0 : ClientContext context;
437 : std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
438 : StreamingOutputCallResponse>>
439 0 : stream(stub->HalfDuplexCall(&context));
440 :
441 0 : StreamingOutputCallRequest request;
442 0 : ResponseParameters* response_parameter = request.add_response_parameters();
443 0 : for (unsigned int i = 0; i < response_stream_sizes.size(); ++i) {
444 0 : response_parameter->set_size(response_stream_sizes[i]);
445 0 : GPR_ASSERT(stream->Write(request));
446 : }
447 0 : stream->WritesDone();
448 :
449 0 : unsigned int i = 0;
450 0 : StreamingOutputCallResponse response;
451 0 : while (stream->Read(&response)) {
452 0 : GPR_ASSERT(response.payload().body() ==
453 : grpc::string(response_stream_sizes[i], '\0'));
454 0 : ++i;
455 : }
456 0 : GPR_ASSERT(response_stream_sizes.size() == i);
457 0 : Status s = stream->Finish();
458 0 : AssertOkOrPrintErrorStatus(s);
459 0 : gpr_log(GPR_INFO, "Half-duplex streaming rpc done.");
460 0 : }
461 :
462 0 : void InteropClient::DoPingPong() {
463 0 : gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
464 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
465 :
466 0 : ClientContext context;
467 : std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
468 : StreamingOutputCallResponse>>
469 0 : stream(stub->FullDuplexCall(&context));
470 :
471 0 : StreamingOutputCallRequest request;
472 0 : request.set_response_type(PayloadType::COMPRESSABLE);
473 0 : ResponseParameters* response_parameter = request.add_response_parameters();
474 0 : Payload* payload = request.mutable_payload();
475 0 : StreamingOutputCallResponse response;
476 0 : for (unsigned int i = 0; i < request_stream_sizes.size(); ++i) {
477 0 : response_parameter->set_size(response_stream_sizes[i]);
478 0 : payload->set_body(grpc::string(request_stream_sizes[i], '\0'));
479 0 : GPR_ASSERT(stream->Write(request));
480 0 : GPR_ASSERT(stream->Read(&response));
481 0 : GPR_ASSERT(response.payload().body() ==
482 : grpc::string(response_stream_sizes[i], '\0'));
483 : }
484 :
485 0 : stream->WritesDone();
486 0 : GPR_ASSERT(!stream->Read(&response));
487 0 : Status s = stream->Finish();
488 0 : AssertOkOrPrintErrorStatus(s);
489 0 : gpr_log(GPR_INFO, "Ping pong streaming done.");
490 0 : }
491 :
492 0 : void InteropClient::DoCancelAfterBegin() {
493 0 : gpr_log(GPR_INFO, "Sending request steaming rpc ...");
494 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
495 :
496 0 : ClientContext context;
497 0 : StreamingInputCallRequest request;
498 0 : StreamingInputCallResponse response;
499 :
500 : std::unique_ptr<ClientWriter<StreamingInputCallRequest>> stream(
501 0 : stub->StreamingInputCall(&context, &response));
502 :
503 0 : gpr_log(GPR_INFO, "Trying to cancel...");
504 0 : context.TryCancel();
505 0 : Status s = stream->Finish();
506 0 : GPR_ASSERT(s.error_code() == StatusCode::CANCELLED);
507 0 : gpr_log(GPR_INFO, "Canceling streaming done.");
508 0 : }
509 :
510 0 : void InteropClient::DoCancelAfterFirstResponse() {
511 0 : gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc ...");
512 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
513 :
514 0 : ClientContext context;
515 : std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
516 : StreamingOutputCallResponse>>
517 0 : stream(stub->FullDuplexCall(&context));
518 :
519 0 : StreamingOutputCallRequest request;
520 0 : request.set_response_type(PayloadType::COMPRESSABLE);
521 0 : ResponseParameters* response_parameter = request.add_response_parameters();
522 0 : response_parameter->set_size(31415);
523 0 : request.mutable_payload()->set_body(grpc::string(27182, '\0'));
524 0 : StreamingOutputCallResponse response;
525 0 : GPR_ASSERT(stream->Write(request));
526 0 : GPR_ASSERT(stream->Read(&response));
527 0 : GPR_ASSERT(response.payload().body() == grpc::string(31415, '\0'));
528 0 : gpr_log(GPR_INFO, "Trying to cancel...");
529 0 : context.TryCancel();
530 :
531 0 : Status s = stream->Finish();
532 0 : gpr_log(GPR_INFO, "Canceling pingpong streaming done.");
533 0 : }
534 :
535 0 : void InteropClient::DoTimeoutOnSleepingServer() {
536 0 : gpr_log(GPR_INFO, "Sending Ping Pong streaming rpc with a short deadline...");
537 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
538 :
539 0 : ClientContext context;
540 : std::chrono::system_clock::time_point deadline =
541 0 : std::chrono::system_clock::now() + std::chrono::milliseconds(1);
542 0 : context.set_deadline(deadline);
543 : std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
544 : StreamingOutputCallResponse>>
545 0 : stream(stub->FullDuplexCall(&context));
546 :
547 0 : StreamingOutputCallRequest request;
548 0 : request.mutable_payload()->set_body(grpc::string(27182, '\0'));
549 0 : stream->Write(request);
550 :
551 0 : Status s = stream->Finish();
552 0 : GPR_ASSERT(s.error_code() == StatusCode::DEADLINE_EXCEEDED);
553 0 : gpr_log(GPR_INFO, "Pingpong streaming timeout done.");
554 0 : }
555 :
556 0 : void InteropClient::DoEmptyStream() {
557 0 : gpr_log(GPR_INFO, "Starting empty_stream.");
558 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
559 :
560 0 : ClientContext context;
561 : std::unique_ptr<ClientReaderWriter<StreamingOutputCallRequest,
562 : StreamingOutputCallResponse>>
563 0 : stream(stub->FullDuplexCall(&context));
564 0 : stream->WritesDone();
565 0 : StreamingOutputCallResponse response;
566 0 : GPR_ASSERT(stream->Read(&response) == false);
567 0 : Status s = stream->Finish();
568 0 : AssertOkOrPrintErrorStatus(s);
569 0 : gpr_log(GPR_INFO, "empty_stream done.");
570 0 : }
571 :
572 0 : void InteropClient::DoStatusWithMessage() {
573 0 : gpr_log(GPR_INFO, "Sending RPC with a request for status code 2 and message");
574 0 : std::unique_ptr<TestService::Stub> stub(TestService::NewStub(channel_));
575 :
576 0 : ClientContext context;
577 0 : SimpleRequest request;
578 0 : SimpleResponse response;
579 0 : EchoStatus* requested_status = request.mutable_response_status();
580 0 : requested_status->set_code(grpc::StatusCode::UNKNOWN);
581 0 : grpc::string test_msg = "This is a test message";
582 0 : requested_status->set_message(test_msg);
583 :
584 0 : Status s = stub->UnaryCall(&context, request, &response);
585 :
586 0 : GPR_ASSERT(s.error_code() == grpc::StatusCode::UNKNOWN);
587 0 : GPR_ASSERT(s.error_message() == test_msg);
588 0 : gpr_log(GPR_INFO, "Done testing Status and Message");
589 0 : }
590 :
591 : } // namespace testing
592 12 : } // namespace grpc
|