Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <stdarg.h>
35 : #include <string.h>
36 :
37 : #include <grpc/grpc.h>
38 : #include <grpc/support/alloc.h>
39 : #include <grpc/support/host_port.h>
40 : #include <grpc/support/log.h>
41 : #include <grpc/support/time.h>
42 : #include <grpc/support/string_util.h>
43 :
44 : #include "src/core/channel/channel_stack.h"
45 : #include "src/core/surface/channel.h"
46 : #include "src/core/channel/client_channel.h"
47 : #include "src/core/support/string.h"
48 : #include "src/core/surface/server.h"
49 : #include "test/core/util/test_config.h"
50 : #include "test/core/util/port.h"
51 : #include "test/core/end2end/cq_verifier.h"
52 :
53 : typedef struct servers_fixture {
54 : size_t num_servers;
55 : grpc_server **servers;
56 : grpc_call **server_calls;
57 : grpc_completion_queue *cq;
58 : char **servers_hostports;
59 : grpc_metadata_array *request_metadata_recv;
60 : } servers_fixture;
61 :
62 : typedef void (*verifier_fn)(const servers_fixture *, grpc_channel *,
63 : const int *, const size_t);
64 :
65 : typedef struct test_spec {
66 : size_t num_iters;
67 : size_t num_servers;
68 :
69 : int **kill_at;
70 : int **revive_at;
71 :
72 : const char *description;
73 :
74 : verifier_fn verifier;
75 :
76 : } test_spec;
77 :
78 5 : static void test_spec_reset(test_spec *spec) {
79 : size_t i, j;
80 :
81 55 : for (i = 0; i < spec->num_iters; i++) {
82 250 : for (j = 0; j < spec->num_servers; j++) {
83 200 : spec->kill_at[i][j] = 0;
84 200 : spec->revive_at[i][j] = 0;
85 : }
86 : }
87 5 : }
88 :
89 1 : static test_spec *test_spec_create(size_t num_iters, size_t num_servers) {
90 : test_spec *spec;
91 : size_t i;
92 :
93 1 : spec = gpr_malloc(sizeof(test_spec));
94 1 : spec->num_iters = num_iters;
95 1 : spec->num_servers = num_servers;
96 1 : spec->kill_at = gpr_malloc(sizeof(int *) * num_iters);
97 1 : spec->revive_at = gpr_malloc(sizeof(int *) * num_iters);
98 11 : for (i = 0; i < num_iters; i++) {
99 10 : spec->kill_at[i] = gpr_malloc(sizeof(int) * num_servers);
100 10 : spec->revive_at[i] = gpr_malloc(sizeof(int) * num_servers);
101 : }
102 :
103 1 : test_spec_reset(spec);
104 1 : return spec;
105 : }
106 :
107 1 : static void test_spec_destroy(test_spec *spec) {
108 : size_t i;
109 11 : for (i = 0; i < spec->num_iters; i++) {
110 10 : gpr_free(spec->kill_at[i]);
111 10 : gpr_free(spec->revive_at[i]);
112 : }
113 :
114 1 : gpr_free(spec->kill_at);
115 1 : gpr_free(spec->revive_at);
116 :
117 1 : gpr_free(spec);
118 1 : }
119 :
120 308 : static void *tag(gpr_intptr t) { return (void *)t; }
121 :
122 74 : static gpr_timespec n_seconds_time(int n) {
123 74 : return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(n);
124 : }
125 :
126 69 : static void drain_cq(grpc_completion_queue *cq) {
127 : grpc_event ev;
128 : do {
129 69 : ev = grpc_completion_queue_next(cq, n_seconds_time(5), NULL);
130 69 : } while (ev.type != GRPC_QUEUE_SHUTDOWN);
131 5 : }
132 :
133 14 : static void kill_server(const servers_fixture *f, size_t i) {
134 14 : gpr_log(GPR_INFO, "KILLING SERVER %d", i);
135 14 : GPR_ASSERT(f->servers[i] != NULL);
136 14 : grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000));
137 14 : GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000),
138 : GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
139 : NULL).type == GRPC_OP_COMPLETE);
140 14 : grpc_server_destroy(f->servers[i]);
141 14 : f->servers[i] = NULL;
142 14 : }
143 :
144 4 : static void revive_server(const servers_fixture *f, size_t i) {
145 : int got_port;
146 4 : gpr_log(GPR_INFO, "RAISE AGAIN SERVER %d", i);
147 4 : GPR_ASSERT(f->servers[i] == NULL);
148 4 : f->servers[i] = grpc_server_create(NULL, NULL);
149 4 : grpc_server_register_completion_queue(f->servers[i], f->cq, NULL);
150 4 : GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port(
151 : f->servers[i], f->servers_hostports[i])) > 0);
152 4 : grpc_server_start(f->servers[i]);
153 4 : }
154 :
155 5 : static servers_fixture *setup_servers(const char *server_host,
156 : const size_t num_servers) {
157 5 : servers_fixture *f = gpr_malloc(sizeof(servers_fixture));
158 : int *ports;
159 : int got_port;
160 : size_t i;
161 :
162 5 : f->num_servers = num_servers;
163 5 : f->server_calls = gpr_malloc(sizeof(grpc_call *) * num_servers);
164 5 : f->request_metadata_recv =
165 5 : gpr_malloc(sizeof(grpc_metadata_array) * num_servers);
166 : /* Create servers. */
167 5 : ports = gpr_malloc(sizeof(int *) * num_servers);
168 5 : f->servers = gpr_malloc(sizeof(grpc_server *) * num_servers);
169 5 : f->servers_hostports = gpr_malloc(sizeof(char *) * num_servers);
170 5 : f->cq = grpc_completion_queue_create(NULL);
171 25 : for (i = 0; i < num_servers; i++) {
172 20 : ports[i] = grpc_pick_unused_port_or_die();
173 :
174 20 : gpr_join_host_port(&f->servers_hostports[i], server_host, ports[i]);
175 :
176 20 : f->servers[i] = grpc_server_create(NULL, NULL);
177 20 : grpc_server_register_completion_queue(f->servers[i], f->cq, NULL);
178 20 : GPR_ASSERT((got_port = grpc_server_add_insecure_http2_port(
179 : f->servers[i], f->servers_hostports[i])) > 0);
180 20 : GPR_ASSERT(ports[i] == got_port);
181 20 : grpc_server_start(f->servers[i]);
182 : }
183 5 : gpr_free(ports);
184 5 : return f;
185 : }
186 :
187 5 : static void teardown_servers(servers_fixture *f) {
188 : size_t i;
189 : /* Destroy server. */
190 25 : for (i = 0; i < f->num_servers; i++) {
191 20 : if (f->servers[i] == NULL) continue;
192 10 : grpc_server_shutdown_and_notify(f->servers[i], f->cq, tag(10000));
193 10 : GPR_ASSERT(grpc_completion_queue_pluck(f->cq, tag(10000),
194 : GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5),
195 : NULL).type == GRPC_OP_COMPLETE);
196 10 : grpc_server_destroy(f->servers[i]);
197 : }
198 5 : grpc_completion_queue_shutdown(f->cq);
199 5 : drain_cq(f->cq);
200 5 : grpc_completion_queue_destroy(f->cq);
201 :
202 5 : gpr_free(f->servers);
203 :
204 25 : for (i = 0; i < f->num_servers; i++) {
205 20 : gpr_free(f->servers_hostports[i]);
206 : }
207 :
208 5 : gpr_free(f->servers_hostports);
209 5 : gpr_free(f->request_metadata_recv);
210 5 : gpr_free(f->server_calls);
211 5 : gpr_free(f);
212 5 : }
213 :
214 : /** Returns connection sequence (server indices), which must be freed */
215 5 : int *perform_request(servers_fixture *f, grpc_channel *client,
216 : const test_spec *spec) {
217 : grpc_call *c;
218 : int s_idx;
219 : int *s_valid;
220 : gpr_timespec deadline;
221 : grpc_op ops[6];
222 : grpc_op *op;
223 : grpc_status_code status;
224 : char *details;
225 : size_t details_capacity;
226 : int was_cancelled;
227 : grpc_call_details *call_details;
228 : size_t i, iter_num;
229 : grpc_event ev;
230 : int read_tag;
231 : int *connection_sequence;
232 : grpc_metadata_array initial_metadata_recv;
233 : grpc_metadata_array trailing_metadata_recv;
234 :
235 5 : s_valid = gpr_malloc(sizeof(int) * f->num_servers);
236 5 : call_details = gpr_malloc(sizeof(grpc_call_details) * f->num_servers);
237 5 : connection_sequence = gpr_malloc(sizeof(int) * spec->num_iters);
238 :
239 : /* Send a trivial request. */
240 5 : deadline = n_seconds_time(60);
241 :
242 55 : for (iter_num = 0; iter_num < spec->num_iters; iter_num++) {
243 50 : cq_verifier *cqv = cq_verifier_create(f->cq);
244 50 : details = NULL;
245 50 : details_capacity = 0;
246 50 : was_cancelled = 2;
247 :
248 250 : for (i = 0; i < f->num_servers; i++) {
249 200 : if (spec->kill_at[iter_num][i] != 0) {
250 14 : kill_server(f, i);
251 186 : } else if (spec->revive_at[iter_num][i] != 0) {
252 : /* killing takes precedence */
253 4 : revive_server(f, i);
254 : }
255 : }
256 :
257 50 : connection_sequence[iter_num] = -1;
258 50 : grpc_metadata_array_init(&initial_metadata_recv);
259 50 : grpc_metadata_array_init(&trailing_metadata_recv);
260 :
261 250 : for (i = 0; i < f->num_servers; i++) {
262 200 : grpc_call_details_init(&call_details[i]);
263 : }
264 50 : memset(s_valid, 0, f->num_servers * sizeof(int));
265 :
266 50 : c = grpc_channel_create_call(client, NULL, GRPC_PROPAGATE_DEFAULTS, f->cq,
267 : "/foo", "foo.test.google.fr", deadline, NULL);
268 50 : GPR_ASSERT(c);
269 :
270 50 : op = ops;
271 50 : op->op = GRPC_OP_SEND_INITIAL_METADATA;
272 50 : op->data.send_initial_metadata.count = 0;
273 50 : op->flags = 0;
274 50 : op->reserved = NULL;
275 50 : op++;
276 50 : op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
277 50 : op->flags = 0;
278 50 : op->reserved = NULL;
279 50 : op++;
280 50 : op->op = GRPC_OP_RECV_INITIAL_METADATA;
281 50 : op->data.recv_initial_metadata = &initial_metadata_recv;
282 50 : op->flags = 0;
283 50 : op->reserved = NULL;
284 50 : op++;
285 50 : op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
286 50 : op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
287 50 : op->data.recv_status_on_client.status = &status;
288 50 : op->data.recv_status_on_client.status_details = &details;
289 50 : op->data.recv_status_on_client.status_details_capacity = &details_capacity;
290 50 : op->flags = 0;
291 50 : op->reserved = NULL;
292 50 : op++;
293 50 : GPR_ASSERT(GRPC_CALL_OK ==
294 : grpc_call_start_batch(c, ops, (size_t)(op - ops), tag(1), NULL));
295 :
296 : /* "listen" on all servers */
297 250 : for (i = 0; i < f->num_servers; i++) {
298 200 : grpc_metadata_array_init(&f->request_metadata_recv[i]);
299 200 : if (f->servers[i] != NULL) {
300 114 : GPR_ASSERT(GRPC_CALL_OK ==
301 : grpc_server_request_call(f->servers[i], &f->server_calls[i],
302 : &call_details[i],
303 : &f->request_metadata_recv[i], f->cq,
304 : f->cq, tag(1000 + (int)i)));
305 : }
306 : }
307 :
308 50 : s_idx = -1;
309 286 : while ((ev = grpc_completion_queue_next(
310 236 : f->cq, GRPC_TIMEOUT_SECONDS_TO_DEADLINE(1), NULL)).type !=
311 : GRPC_QUEUE_TIMEOUT) {
312 68 : read_tag = ((int)(gpr_intptr)ev.tag);
313 68 : gpr_log(GPR_DEBUG, "EVENT: success:%d, type:%d, tag:%d iter:%d",
314 68 : ev.success, ev.type, read_tag, iter_num);
315 68 : if (ev.success && read_tag >= 1000) {
316 32 : GPR_ASSERT(s_idx == -1); /* only one server must reply */
317 : /* only server notifications for non-shutdown events */
318 32 : s_idx = read_tag - 1000;
319 32 : s_valid[s_idx] = 1;
320 32 : connection_sequence[iter_num] = s_idx;
321 : }
322 : }
323 :
324 50 : if (s_idx >= 0) {
325 32 : op = ops;
326 32 : op->op = GRPC_OP_SEND_INITIAL_METADATA;
327 32 : op->data.send_initial_metadata.count = 0;
328 32 : op->flags = 0;
329 32 : op->reserved = NULL;
330 32 : op++;
331 32 : op->op = GRPC_OP_SEND_STATUS_FROM_SERVER;
332 32 : op->data.send_status_from_server.trailing_metadata_count = 0;
333 32 : op->data.send_status_from_server.status = GRPC_STATUS_UNIMPLEMENTED;
334 32 : op->data.send_status_from_server.status_details = "xyz";
335 32 : op->flags = 0;
336 32 : op->reserved = NULL;
337 32 : op++;
338 32 : op->op = GRPC_OP_RECV_CLOSE_ON_SERVER;
339 32 : op->data.recv_close_on_server.cancelled = &was_cancelled;
340 32 : op->flags = 0;
341 32 : op->reserved = NULL;
342 32 : op++;
343 32 : GPR_ASSERT(GRPC_CALL_OK == grpc_call_start_batch(f->server_calls[s_idx],
344 : ops, (size_t)(op - ops),
345 : tag(102), NULL));
346 :
347 32 : cq_expect_completion(cqv, tag(102), 1);
348 32 : cq_expect_completion(cqv, tag(1), 1);
349 32 : cq_verify(cqv);
350 :
351 32 : GPR_ASSERT(status == GRPC_STATUS_UNIMPLEMENTED);
352 32 : GPR_ASSERT(0 == strcmp(details, "xyz"));
353 32 : GPR_ASSERT(0 == strcmp(call_details[s_idx].method, "/foo"));
354 32 : GPR_ASSERT(0 == strcmp(call_details[s_idx].host, "foo.test.google.fr"));
355 32 : GPR_ASSERT(was_cancelled == 1);
356 : }
357 :
358 250 : for (i = 0; i < f->num_servers; i++) {
359 200 : if (s_valid[i] != 0) {
360 32 : grpc_call_destroy(f->server_calls[i]);
361 : }
362 200 : grpc_metadata_array_destroy(&f->request_metadata_recv[i]);
363 : }
364 50 : grpc_metadata_array_destroy(&initial_metadata_recv);
365 50 : grpc_metadata_array_destroy(&trailing_metadata_recv);
366 :
367 50 : cq_verifier_destroy(cqv);
368 :
369 50 : grpc_call_destroy(c);
370 :
371 250 : for (i = 0; i < f->num_servers; i++) {
372 200 : grpc_call_details_destroy(&call_details[i]);
373 : }
374 50 : gpr_free(details);
375 : }
376 :
377 5 : gpr_free(call_details);
378 5 : gpr_free(s_valid);
379 :
380 5 : return connection_sequence;
381 : }
382 :
383 4 : static void assert_channel_connectivity(
384 : grpc_channel *ch, size_t num_accepted_conn_states,
385 : grpc_connectivity_state accepted_conn_state, ...) {
386 : size_t i;
387 : grpc_channel_stack *client_stack;
388 : grpc_channel_element *client_channel_filter;
389 : grpc_connectivity_state actual_conn_state;
390 4 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
391 : va_list ap;
392 :
393 4 : client_stack = grpc_channel_get_channel_stack(ch);
394 4 : client_channel_filter = grpc_channel_stack_last_element(client_stack);
395 :
396 4 : actual_conn_state = grpc_client_channel_check_connectivity_state(
397 : &exec_ctx, client_channel_filter, 0 /* don't try to connect */);
398 4 : grpc_exec_ctx_finish(&exec_ctx);
399 4 : va_start(ap, accepted_conn_state);
400 4 : for (i = 0; i < num_accepted_conn_states; i++) {
401 4 : if (actual_conn_state == accepted_conn_state) {
402 4 : break;
403 : }
404 0 : accepted_conn_state = va_arg(ap, grpc_connectivity_state);
405 : }
406 4 : va_end(ap);
407 4 : if (i == num_accepted_conn_states) {
408 0 : char **accepted_strs =
409 0 : gpr_malloc(sizeof(char *) * num_accepted_conn_states);
410 : char *accepted_str_joined;
411 0 : va_start(ap, accepted_conn_state);
412 0 : for (i = 0; i < num_accepted_conn_states; i++) {
413 0 : GPR_ASSERT(gpr_asprintf(&accepted_strs[i], "%d", accepted_conn_state) >
414 : 0);
415 0 : accepted_conn_state = va_arg(ap, grpc_connectivity_state);
416 : }
417 0 : va_end(ap);
418 0 : accepted_str_joined = gpr_strjoin_sep((const char **)accepted_strs,
419 : num_accepted_conn_states, ", ", NULL);
420 0 : gpr_log(
421 : GPR_ERROR,
422 : "Channel connectivity assertion failed: expected <one of [%s]>, got %d",
423 : accepted_str_joined, actual_conn_state);
424 :
425 0 : for (i = 0; i < num_accepted_conn_states; i++) {
426 0 : gpr_free(accepted_strs[i]);
427 : }
428 0 : gpr_free(accepted_strs);
429 0 : gpr_free(accepted_str_joined);
430 0 : abort();
431 : }
432 4 : }
433 :
434 5 : void run_spec(const test_spec *spec) {
435 : grpc_channel *client;
436 : char *client_hostport;
437 : char *servers_hostports_str;
438 : int *actual_connection_sequence;
439 5 : servers_fixture *f = setup_servers("127.0.0.1", spec->num_servers);
440 :
441 : /* Create client. */
442 5 : servers_hostports_str = gpr_strjoin_sep((const char **)f->servers_hostports,
443 : f->num_servers, ",", NULL);
444 5 : gpr_asprintf(&client_hostport, "ipv4:%s?lb_policy=round_robin",
445 : servers_hostports_str);
446 5 : client = grpc_insecure_channel_create(client_hostport, NULL, NULL);
447 :
448 5 : gpr_log(GPR_INFO, "Testing '%s' with servers=%s client=%s", spec->description,
449 : servers_hostports_str, client_hostport);
450 :
451 5 : actual_connection_sequence = perform_request(f, client, spec);
452 :
453 5 : spec->verifier(f, client, actual_connection_sequence, spec->num_iters);
454 :
455 5 : gpr_free(client_hostport);
456 5 : gpr_free(servers_hostports_str);
457 5 : gpr_free(actual_connection_sequence);
458 :
459 5 : grpc_channel_destroy(client);
460 5 : teardown_servers(f);
461 5 : }
462 :
463 0 : static void print_failed_expectations(const int *expected_connection_sequence,
464 : const int *actual_connection_sequence,
465 : const size_t expected_seq_length,
466 : const size_t num_iters) {
467 : size_t i;
468 0 : for (i = 0; i < num_iters; i++) {
469 0 : gpr_log(GPR_ERROR, "FAILURE: Iter, expected, actual:%d (%d, %d)", i,
470 0 : expected_connection_sequence[i % expected_seq_length],
471 0 : actual_connection_sequence[i]);
472 : }
473 0 : }
474 :
475 1 : static void verify_vanilla_round_robin(const servers_fixture *f,
476 : grpc_channel *client,
477 : const int *actual_connection_sequence,
478 : const size_t num_iters) {
479 : int *expected_connection_sequence;
480 : size_t i;
481 1 : const size_t expected_seq_length = f->num_servers;
482 :
483 : /* verify conn. seq. expectation */
484 : /* get the first sequence of "num_servers" elements */
485 1 : expected_connection_sequence = gpr_malloc(sizeof(int) * expected_seq_length);
486 1 : memcpy(expected_connection_sequence, actual_connection_sequence,
487 : sizeof(int) * expected_seq_length);
488 :
489 11 : for (i = 0; i < num_iters; i++) {
490 10 : const int actual = actual_connection_sequence[i];
491 10 : const int expected = expected_connection_sequence[i % expected_seq_length];
492 10 : if (actual != expected) {
493 0 : gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d", expected,
494 : actual, i);
495 0 : print_failed_expectations(expected_connection_sequence,
496 : actual_connection_sequence, expected_seq_length,
497 : num_iters);
498 0 : abort();
499 : }
500 : }
501 1 : assert_channel_connectivity(client, 1, GRPC_CHANNEL_READY);
502 :
503 1 : gpr_free(expected_connection_sequence);
504 1 : }
505 :
506 : /* At the start of the second iteration, all but the first and last servers (as
507 : * given in "f") are killed */
508 1 : static void verify_vanishing_floor_round_robin(
509 : const servers_fixture *f, grpc_channel *client,
510 : const int *actual_connection_sequence, const size_t num_iters) {
511 : int *expected_connection_sequence;
512 1 : const size_t expected_seq_length = 2;
513 : size_t i;
514 :
515 : /* verify conn. seq. expectation */
516 : /* copy the first full sequence (without -1s) */
517 1 : expected_connection_sequence = gpr_malloc(sizeof(int) * expected_seq_length);
518 1 : memcpy(expected_connection_sequence, actual_connection_sequence + 2,
519 : expected_seq_length * sizeof(int));
520 :
521 : /* first three elements of the sequence should be [<1st>, -1] */
522 1 : if (actual_connection_sequence[0] != expected_connection_sequence[0]) {
523 0 : gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d",
524 : expected_connection_sequence[0], actual_connection_sequence[0], 0);
525 0 : print_failed_expectations(expected_connection_sequence,
526 : actual_connection_sequence, expected_seq_length,
527 : 1u);
528 0 : abort();
529 : }
530 :
531 1 : GPR_ASSERT(actual_connection_sequence[1] == -1);
532 :
533 9 : for (i = 2; i < num_iters; i++) {
534 8 : const int actual = actual_connection_sequence[i];
535 8 : const int expected = expected_connection_sequence[i % expected_seq_length];
536 8 : if (actual != expected) {
537 0 : gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d", expected,
538 : actual, i);
539 0 : print_failed_expectations(expected_connection_sequence,
540 : actual_connection_sequence, expected_seq_length,
541 : num_iters);
542 0 : abort();
543 : }
544 : }
545 1 : gpr_free(expected_connection_sequence);
546 1 : }
547 :
548 1 : static void verify_total_carnage_round_robin(
549 : const servers_fixture *f, grpc_channel *client,
550 : const int *actual_connection_sequence, const size_t num_iters) {
551 : size_t i;
552 :
553 11 : for (i = 0; i < num_iters; i++) {
554 10 : const int actual = actual_connection_sequence[i];
555 10 : const int expected = -1;
556 10 : if (actual != expected) {
557 0 : gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d", expected,
558 : actual, i);
559 0 : abort();
560 : }
561 : }
562 :
563 : /* even though we know all the servers are dead, the client is still trying
564 : * retrying, believing it's in a transient failure situation */
565 1 : assert_channel_connectivity(client, 2, GRPC_CHANNEL_TRANSIENT_FAILURE,
566 : GRPC_CHANNEL_CONNECTING);
567 1 : }
568 :
569 1 : static void verify_partial_carnage_round_robin(
570 : const servers_fixture *f, grpc_channel *client,
571 : const int *actual_connection_sequence, const size_t num_iters) {
572 : int *expected_connection_sequence;
573 : size_t i;
574 1 : const size_t expected_seq_length = f->num_servers;
575 :
576 : /* verify conn. seq. expectation */
577 : /* get the first sequence of "num_servers" elements */
578 1 : expected_connection_sequence = gpr_malloc(sizeof(int) * expected_seq_length);
579 1 : memcpy(expected_connection_sequence, actual_connection_sequence,
580 : sizeof(int) * expected_seq_length);
581 :
582 6 : for (i = 0; i < num_iters / 2; i++) {
583 5 : const int actual = actual_connection_sequence[i];
584 5 : const int expected = expected_connection_sequence[i % expected_seq_length];
585 5 : if (actual != expected) {
586 0 : gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d", expected,
587 : actual, i);
588 0 : print_failed_expectations(expected_connection_sequence,
589 : actual_connection_sequence, expected_seq_length,
590 : num_iters);
591 0 : abort();
592 : }
593 : }
594 :
595 : /* second half of the iterations go without response */
596 6 : for (; i < num_iters; i++) {
597 5 : GPR_ASSERT(actual_connection_sequence[i] == -1);
598 : }
599 :
600 : /* even though we know all the servers are dead, the client is still trying
601 : * retrying, believing it's in a transient failure situation */
602 1 : assert_channel_connectivity(client, 2, GRPC_CHANNEL_TRANSIENT_FAILURE,
603 : GRPC_CHANNEL_CONNECTING);
604 1 : gpr_free(expected_connection_sequence);
605 1 : }
606 :
607 1 : static void verify_rebirth_round_robin(const servers_fixture *f,
608 : grpc_channel *client,
609 : const int *actual_connection_sequence,
610 : const size_t num_iters) {
611 : int *expected_connection_sequence;
612 : size_t i, j, unique_seq_last_idx, unique_seq_first_idx;
613 1 : const size_t expected_seq_length = f->num_servers;
614 : uint8_t *seen_elements;
615 :
616 : /* verify conn. seq. expectation */
617 : /* get the first unique run of length "num_servers". */
618 1 : expected_connection_sequence = gpr_malloc(sizeof(int) * expected_seq_length);
619 1 : seen_elements = gpr_malloc(sizeof(int) * expected_seq_length);
620 :
621 1 : unique_seq_last_idx = ~(size_t)0;
622 :
623 1 : memset(seen_elements, 0, sizeof(uint8_t) * expected_seq_length);
624 7 : for (i = 0; i < num_iters; i++) {
625 12 : if (actual_connection_sequence[i] < 0 ||
626 5 : seen_elements[actual_connection_sequence[i]] != 0) {
627 : /* if anything breaks the uniqueness of the run, back to square zero */
628 2 : memset(seen_elements, 0, sizeof(uint8_t) * expected_seq_length);
629 2 : continue;
630 : }
631 5 : seen_elements[actual_connection_sequence[i]] = 1;
632 15 : for (j = 0; j < expected_seq_length; j++) {
633 14 : if (seen_elements[j] == 0) break;
634 : }
635 5 : if (j == expected_seq_length) { /* seen all the elements */
636 1 : unique_seq_last_idx = i;
637 1 : break;
638 : }
639 : }
640 : /* make sure we found a valid run */
641 5 : for (j = 0; j < expected_seq_length; j++) {
642 4 : GPR_ASSERT(seen_elements[j] != 0);
643 : }
644 :
645 1 : GPR_ASSERT(unique_seq_last_idx != ~(size_t)0);
646 :
647 1 : unique_seq_first_idx = (unique_seq_last_idx - expected_seq_length + 1);
648 2 : memcpy(expected_connection_sequence,
649 1 : actual_connection_sequence + unique_seq_first_idx,
650 : sizeof(int) * expected_seq_length);
651 :
652 : /* first iteration succeeds */
653 1 : GPR_ASSERT(actual_connection_sequence[0] != -1);
654 : /* then we fail for a while... */
655 1 : GPR_ASSERT(actual_connection_sequence[1] == -1);
656 : /* ... but should be up at "unique_seq_first_idx" */
657 1 : GPR_ASSERT(actual_connection_sequence[unique_seq_first_idx] != -1);
658 :
659 8 : for (j = 0, i = unique_seq_first_idx; i < num_iters; i++) {
660 7 : const int actual = actual_connection_sequence[i];
661 7 : const int expected =
662 7 : expected_connection_sequence[j++ % expected_seq_length];
663 7 : if (actual != expected) {
664 0 : gpr_log(GPR_ERROR, "FAILURE: expected %d, actual %d at iter %d", expected,
665 : actual, i);
666 0 : print_failed_expectations(expected_connection_sequence,
667 : actual_connection_sequence, expected_seq_length,
668 : num_iters);
669 0 : abort();
670 : }
671 : }
672 :
673 : /* things are fine once the servers are brought back up */
674 1 : assert_channel_connectivity(client, 1, GRPC_CHANNEL_READY);
675 1 : gpr_free(expected_connection_sequence);
676 1 : gpr_free(seen_elements);
677 1 : }
678 :
679 1 : int main(int argc, char **argv) {
680 : test_spec *spec;
681 : size_t i;
682 1 : const size_t NUM_ITERS = 10;
683 1 : const size_t NUM_SERVERS = 4;
684 :
685 1 : grpc_test_init(argc, argv);
686 1 : grpc_init();
687 :
688 : /* everything is fine, all servers stay up the whole time and life's peachy */
689 1 : spec = test_spec_create(NUM_ITERS, NUM_SERVERS);
690 1 : spec->verifier = verify_vanilla_round_robin;
691 1 : spec->description = "test_all_server_up";
692 1 : run_spec(spec);
693 :
694 : /* Kill all servers first thing in the morning */
695 1 : test_spec_reset(spec);
696 1 : spec->verifier = verify_total_carnage_round_robin;
697 1 : spec->description = "test_kill_all_server";
698 5 : for (i = 0; i < NUM_SERVERS; i++) {
699 4 : spec->kill_at[0][i] = 1;
700 : }
701 1 : run_spec(spec);
702 :
703 : /* at the start of the 2nd iteration, kill all but the first and last servers.
704 : * This should knock down the server bound to be selected next */
705 1 : test_spec_reset(spec);
706 1 : spec->verifier = verify_vanishing_floor_round_robin;
707 1 : spec->description = "test_kill_all_server_at_2nd_iteration";
708 3 : for (i = 1; i < NUM_SERVERS - 1; i++) {
709 2 : spec->kill_at[1][i] = 1;
710 : }
711 1 : run_spec(spec);
712 :
713 : /* Midway, kill all servers. */
714 1 : test_spec_reset(spec);
715 1 : spec->verifier = verify_partial_carnage_round_robin;
716 1 : spec->description = "test_kill_all_server_midway";
717 5 : for (i = 0; i < NUM_SERVERS; i++) {
718 4 : spec->kill_at[spec->num_iters / 2][i] = 1;
719 : }
720 1 : run_spec(spec);
721 :
722 : /* After first iteration, kill all servers. On the third one, bring them all
723 : * back up. */
724 1 : test_spec_reset(spec);
725 1 : spec->verifier = verify_rebirth_round_robin;
726 1 : spec->description = "test_kill_all_server_after_1st_resurrect_at_3rd";
727 5 : for (i = 0; i < NUM_SERVERS; i++) {
728 4 : spec->kill_at[1][i] = 1;
729 4 : spec->revive_at[3][i] = 1;
730 : }
731 1 : run_spec(spec);
732 :
733 1 : test_spec_destroy(spec);
734 :
735 1 : grpc_shutdown();
736 1 : return 0;
737 : }
|