Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/iomgr/tcp_posix.h"
35 :
36 : #include <errno.h>
37 : #include <fcntl.h>
38 : #include <string.h>
39 : #include <sys/types.h>
40 : #include <sys/socket.h>
41 : #include <unistd.h>
42 :
43 : #include <grpc/grpc.h>
44 : #include <grpc/support/alloc.h>
45 : #include <grpc/support/log.h>
46 : #include <grpc/support/time.h>
47 : #include <grpc/support/useful.h>
48 : #include "test/core/util/test_config.h"
49 : #include "test/core/iomgr/endpoint_tests.h"
50 :
51 : static grpc_pollset g_pollset;
52 :
53 : /*
54 : General test notes:
55 :
56 : All tests which write data into a socket write i%256 into byte i, which is
57 : verified by readers.
58 :
59 : In general there are a few interesting things to vary which may lead to
60 : exercising different codepaths in an implementation:
61 : 1. Total amount of data written to the socket
62 : 2. Size of slice allocations
63 : 3. Amount of data we read from or write to the socket at once
64 :
65 : The tests here tend to parameterize these where applicable.
66 :
67 : */
68 :
69 74 : static void create_sockets(int sv[2]) {
70 : int flags;
71 74 : GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
72 74 : flags = fcntl(sv[0], F_GETFL, 0);
73 74 : GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
74 74 : flags = fcntl(sv[1], F_GETFL, 0);
75 74 : GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
76 74 : }
77 :
78 2 : static ssize_t fill_socket(int fd) {
79 : ssize_t write_bytes;
80 2 : ssize_t total_bytes = 0;
81 : int i;
82 : unsigned char buf[256];
83 514 : for (i = 0; i < 256; ++i) {
84 512 : buf[i] = (gpr_uint8)i;
85 : }
86 : do {
87 336 : write_bytes = write(fd, buf, 256);
88 336 : if (write_bytes > 0) {
89 334 : total_bytes += write_bytes;
90 : }
91 336 : } while (write_bytes >= 0 || errno == EINTR);
92 2 : GPR_ASSERT(errno == EAGAIN);
93 2 : return total_bytes;
94 : }
95 :
96 4 : static size_t fill_socket_partial(int fd, size_t bytes) {
97 : ssize_t write_bytes;
98 4 : size_t total_bytes = 0;
99 4 : unsigned char *buf = malloc(bytes);
100 : unsigned i;
101 30104 : for (i = 0; i < bytes; ++i) {
102 30100 : buf[i] = (gpr_uint8)(i % 256);
103 : }
104 :
105 : do {
106 4 : write_bytes = write(fd, buf, bytes - total_bytes);
107 4 : if (write_bytes > 0) {
108 4 : total_bytes += (size_t)write_bytes;
109 : }
110 4 : } while ((write_bytes >= 0 || errno == EINTR) && bytes > total_bytes);
111 :
112 4 : gpr_free(buf);
113 :
114 4 : return total_bytes;
115 : }
116 :
117 : struct read_socket_state {
118 : grpc_endpoint *ep;
119 : size_t read_bytes;
120 : size_t target_read_bytes;
121 : gpr_slice_buffer incoming;
122 : grpc_closure read_cb;
123 : };
124 :
125 13218 : static size_t count_slices(gpr_slice *slices, size_t nslices,
126 : int *current_data) {
127 13218 : size_t num_bytes = 0;
128 : unsigned i, j;
129 : unsigned char *buf;
130 66052 : for (i = 0; i < nslices; ++i) {
131 52834 : buf = GPR_SLICE_START_PTR(slices[i]);
132 168438 : for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) {
133 115604 : GPR_ASSERT(buf[j] == *current_data);
134 115604 : *current_data = (*current_data + 1) % 256;
135 : }
136 52834 : num_bytes += GPR_SLICE_LENGTH(slices[i]);
137 : }
138 13218 : return num_bytes;
139 : }
140 :
141 13218 : static void read_cb(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
142 13218 : struct read_socket_state *state = (struct read_socket_state *)user_data;
143 : size_t read_bytes;
144 : int current_data;
145 :
146 13218 : GPR_ASSERT(success);
147 :
148 13218 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
149 13218 : current_data = state->read_bytes % 256;
150 13218 : read_bytes = count_slices(state->incoming.slices, state->incoming.count,
151 : ¤t_data);
152 13218 : state->read_bytes += read_bytes;
153 13218 : gpr_log(GPR_INFO, "Read %d bytes of %d", read_bytes,
154 : state->target_read_bytes);
155 13218 : if (state->read_bytes >= state->target_read_bytes) {
156 6 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
157 : } else {
158 13212 : grpc_endpoint_read(exec_ctx, state->ep, &state->incoming, &state->read_cb);
159 13212 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
160 : }
161 13218 : }
162 :
163 : /* Write to a socket, then read from it using the grpc_tcp API. */
164 4 : static void read_test(size_t num_bytes, size_t slice_size) {
165 : int sv[2];
166 : grpc_endpoint *ep;
167 : struct read_socket_state state;
168 : size_t written_bytes;
169 4 : gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
170 4 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
171 :
172 4 : gpr_log(GPR_INFO, "Read test of size %d, slice size %d", num_bytes,
173 : slice_size);
174 :
175 4 : create_sockets(sv);
176 :
177 4 : ep = grpc_tcp_create(grpc_fd_create(sv[1], "read_test"), slice_size, "test");
178 4 : grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset);
179 :
180 4 : written_bytes = fill_socket_partial(sv[0], num_bytes);
181 4 : gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
182 :
183 4 : state.ep = ep;
184 4 : state.read_bytes = 0;
185 4 : state.target_read_bytes = written_bytes;
186 4 : gpr_slice_buffer_init(&state.incoming);
187 4 : grpc_closure_init(&state.read_cb, read_cb, &state);
188 :
189 4 : grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
190 :
191 4 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
192 12 : while (state.read_bytes < state.target_read_bytes) {
193 : grpc_pollset_worker worker;
194 4 : grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
195 : gpr_now(GPR_CLOCK_MONOTONIC), deadline);
196 4 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
197 4 : grpc_exec_ctx_finish(&exec_ctx);
198 4 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
199 : }
200 4 : GPR_ASSERT(state.read_bytes == state.target_read_bytes);
201 4 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
202 :
203 4 : gpr_slice_buffer_destroy(&state.incoming);
204 4 : grpc_endpoint_destroy(&exec_ctx, ep);
205 4 : grpc_exec_ctx_finish(&exec_ctx);
206 4 : }
207 :
208 : /* Write to a socket until it fills up, then read from it using the grpc_tcp
209 : API. */
210 2 : static void large_read_test(size_t slice_size) {
211 : int sv[2];
212 : grpc_endpoint *ep;
213 : struct read_socket_state state;
214 : ssize_t written_bytes;
215 2 : gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
216 2 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
217 :
218 2 : gpr_log(GPR_INFO, "Start large read test, slice size %d", slice_size);
219 :
220 2 : create_sockets(sv);
221 :
222 2 : ep = grpc_tcp_create(grpc_fd_create(sv[1], "large_read_test"), slice_size,
223 : "test");
224 2 : grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset);
225 :
226 2 : written_bytes = fill_socket(sv[0]);
227 2 : gpr_log(GPR_INFO, "Wrote %d bytes", written_bytes);
228 :
229 2 : state.ep = ep;
230 2 : state.read_bytes = 0;
231 2 : state.target_read_bytes = (size_t)written_bytes;
232 2 : gpr_slice_buffer_init(&state.incoming);
233 2 : grpc_closure_init(&state.read_cb, read_cb, &state);
234 :
235 2 : grpc_endpoint_read(&exec_ctx, ep, &state.incoming, &state.read_cb);
236 :
237 2 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
238 6 : while (state.read_bytes < state.target_read_bytes) {
239 : grpc_pollset_worker worker;
240 2 : grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
241 : gpr_now(GPR_CLOCK_MONOTONIC), deadline);
242 2 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
243 2 : grpc_exec_ctx_finish(&exec_ctx);
244 2 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
245 : }
246 2 : GPR_ASSERT(state.read_bytes == state.target_read_bytes);
247 2 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
248 :
249 2 : gpr_slice_buffer_destroy(&state.incoming);
250 2 : grpc_endpoint_destroy(&exec_ctx, ep);
251 2 : grpc_exec_ctx_finish(&exec_ctx);
252 2 : }
253 :
254 : struct write_socket_state {
255 : grpc_endpoint *ep;
256 : int write_done;
257 : };
258 :
259 35 : static gpr_slice *allocate_blocks(size_t num_bytes, size_t slice_size,
260 : size_t *num_blocks, gpr_uint8 *current_data) {
261 35 : size_t nslices = num_bytes / slice_size + (num_bytes % slice_size ? 1u : 0u);
262 35 : gpr_slice *slices = gpr_malloc(sizeof(gpr_slice) * nslices);
263 35 : size_t num_bytes_left = num_bytes;
264 : unsigned i, j;
265 : unsigned char *buf;
266 35 : *num_blocks = nslices;
267 :
268 232181 : for (i = 0; i < nslices; ++i) {
269 232146 : slices[i] = gpr_slice_malloc(slice_size > num_bytes_left ? num_bytes_left
270 : : slice_size);
271 232146 : num_bytes_left -= GPR_SLICE_LENGTH(slices[i]);
272 232146 : buf = GPR_SLICE_START_PTR(slices[i]);
273 1741946 : for (j = 0; j < GPR_SLICE_LENGTH(slices[i]); ++j) {
274 1509800 : buf[j] = *current_data;
275 1509800 : (*current_data)++;
276 : }
277 : }
278 35 : GPR_ASSERT(num_bytes_left == 0);
279 35 : return slices;
280 : }
281 :
282 35 : static void write_done(grpc_exec_ctx *exec_ctx,
283 : void *user_data /* write_socket_state */, int success) {
284 35 : struct write_socket_state *state = (struct write_socket_state *)user_data;
285 35 : gpr_log(GPR_INFO, "Write done callback called");
286 35 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
287 35 : gpr_log(GPR_INFO, "Signalling write done");
288 35 : state->write_done = 1;
289 35 : grpc_pollset_kick(&g_pollset, NULL);
290 35 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
291 35 : }
292 :
293 35 : void drain_socket_blocking(int fd, size_t num_bytes, size_t read_size) {
294 35 : unsigned char *buf = malloc(read_size);
295 : ssize_t bytes_read;
296 35 : size_t bytes_left = num_bytes;
297 : int flags;
298 35 : int current = 0;
299 : int i;
300 35 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
301 :
302 35 : flags = fcntl(fd, F_GETFL, 0);
303 35 : GPR_ASSERT(fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) == 0);
304 :
305 : for (;;) {
306 : grpc_pollset_worker worker;
307 48 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
308 48 : grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
309 : gpr_now(GPR_CLOCK_MONOTONIC),
310 48 : GRPC_TIMEOUT_MILLIS_TO_DEADLINE(10));
311 48 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
312 48 : grpc_exec_ctx_finish(&exec_ctx);
313 : do {
314 48 : bytes_read =
315 48 : read(fd, buf, bytes_left > read_size ? read_size : bytes_left);
316 48 : } while (bytes_read < 0 && errno == EINTR);
317 48 : GPR_ASSERT(bytes_read >= 0);
318 1509848 : for (i = 0; i < bytes_read; ++i) {
319 1509800 : GPR_ASSERT(buf[i] == current);
320 1509800 : current = (current + 1) % 256;
321 : }
322 48 : bytes_left -= (size_t)bytes_read;
323 48 : if (bytes_left == 0) break;
324 13 : }
325 35 : flags = fcntl(fd, F_GETFL, 0);
326 35 : GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
327 :
328 35 : gpr_free(buf);
329 35 : }
330 :
331 : /* Write to a socket using the grpc_tcp API, then drain it directly.
332 : Note that if the write does not complete immediately we need to drain the
333 : socket in parallel with the read. */
334 35 : static void write_test(size_t num_bytes, size_t slice_size) {
335 : int sv[2];
336 : grpc_endpoint *ep;
337 : struct write_socket_state state;
338 : size_t num_blocks;
339 : gpr_slice *slices;
340 35 : gpr_uint8 current_data = 0;
341 : gpr_slice_buffer outgoing;
342 : grpc_closure write_done_closure;
343 35 : gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(20);
344 35 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
345 :
346 35 : gpr_log(GPR_INFO, "Start write test with %d bytes, slice size %d", num_bytes,
347 : slice_size);
348 :
349 35 : create_sockets(sv);
350 :
351 35 : ep = grpc_tcp_create(grpc_fd_create(sv[1], "write_test"),
352 : GRPC_TCP_DEFAULT_READ_SLICE_SIZE, "test");
353 35 : grpc_endpoint_add_to_pollset(&exec_ctx, ep, &g_pollset);
354 :
355 35 : state.ep = ep;
356 35 : state.write_done = 0;
357 :
358 35 : slices = allocate_blocks(num_bytes, slice_size, &num_blocks, ¤t_data);
359 :
360 35 : gpr_slice_buffer_init(&outgoing);
361 35 : gpr_slice_buffer_addn(&outgoing, slices, num_blocks);
362 35 : grpc_closure_init(&write_done_closure, write_done, &state);
363 :
364 35 : grpc_endpoint_write(&exec_ctx, ep, &outgoing, &write_done_closure);
365 35 : drain_socket_blocking(sv[0], num_bytes, num_bytes);
366 35 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
367 : for (;;) {
368 : grpc_pollset_worker worker;
369 58 : if (state.write_done) {
370 35 : break;
371 : }
372 23 : grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
373 : gpr_now(GPR_CLOCK_MONOTONIC), deadline);
374 23 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
375 23 : grpc_exec_ctx_finish(&exec_ctx);
376 23 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
377 23 : }
378 35 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
379 :
380 35 : gpr_slice_buffer_destroy(&outgoing);
381 35 : grpc_endpoint_destroy(&exec_ctx, ep);
382 35 : gpr_free(slices);
383 35 : grpc_exec_ctx_finish(&exec_ctx);
384 35 : }
385 :
386 1 : void run_tests(void) {
387 1 : size_t i = 0;
388 :
389 1 : read_test(100, 8192);
390 1 : read_test(10000, 8192);
391 1 : read_test(10000, 137);
392 1 : read_test(10000, 1);
393 1 : large_read_test(8192);
394 1 : large_read_test(1);
395 :
396 1 : write_test(100, 8192);
397 1 : write_test(100, 1);
398 1 : write_test(100000, 8192);
399 1 : write_test(100000, 1);
400 1 : write_test(100000, 137);
401 :
402 31 : for (i = 1; i < 1000; i = GPR_MAX(i + 1, i * 5 / 4)) {
403 30 : write_test(40320, i);
404 : }
405 1 : }
406 :
407 33 : static void clean_up(void) {}
408 :
409 33 : static grpc_endpoint_test_fixture create_fixture_tcp_socketpair(
410 : size_t slice_size) {
411 : int sv[2];
412 : grpc_endpoint_test_fixture f;
413 33 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
414 :
415 33 : create_sockets(sv);
416 33 : f.client_ep = grpc_tcp_create(grpc_fd_create(sv[0], "fixture:client"),
417 : slice_size, "test");
418 33 : f.server_ep = grpc_tcp_create(grpc_fd_create(sv[1], "fixture:server"),
419 : slice_size, "test");
420 33 : grpc_endpoint_add_to_pollset(&exec_ctx, f.client_ep, &g_pollset);
421 33 : grpc_endpoint_add_to_pollset(&exec_ctx, f.server_ep, &g_pollset);
422 :
423 33 : grpc_exec_ctx_finish(&exec_ctx);
424 :
425 33 : return f;
426 : }
427 :
428 : static grpc_endpoint_test_config configs[] = {
429 : {"tcp/tcp_socketpair", create_fixture_tcp_socketpair, clean_up},
430 : };
431 :
432 1 : static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) {
433 1 : grpc_pollset_destroy(p);
434 1 : }
435 :
436 1 : int main(int argc, char **argv) {
437 : grpc_closure destroyed;
438 1 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
439 1 : grpc_test_init(argc, argv);
440 1 : grpc_init();
441 1 : grpc_pollset_init(&g_pollset);
442 1 : run_tests();
443 1 : grpc_endpoint_tests(configs[0], &g_pollset);
444 1 : grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
445 1 : grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed);
446 1 : grpc_exec_ctx_finish(&exec_ctx);
447 1 : grpc_shutdown();
448 :
449 1 : return 0;
450 : }
|