Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/iomgr/fd_posix.h"
35 :
36 : #include <ctype.h>
37 : #include <errno.h>
38 : #include <fcntl.h>
39 : #include <netinet/in.h>
40 : #include <poll.h>
41 : #include <stdio.h>
42 : #include <stdlib.h>
43 : #include <string.h>
44 : #include <sys/socket.h>
45 : #include <sys/time.h>
46 : #include <unistd.h>
47 :
48 : #include <grpc/support/alloc.h>
49 : #include <grpc/support/log.h>
50 : #include <grpc/support/sync.h>
51 : #include <grpc/support/time.h>
52 : #include "test/core/util/test_config.h"
53 :
54 : static grpc_pollset g_pollset;
55 :
56 : /* buffer size used to send and receive data.
57 : 1024 is the minimal value to set TCP send and receive buffer. */
58 : #define BUF_SIZE 1024
59 :
60 : /* Create a test socket with the right properties for testing.
61 : port is the TCP port to listen or connect to.
62 : Return a socket FD and sockaddr_in. */
63 2 : static void create_test_socket(int port, int *socket_fd,
64 : struct sockaddr_in *sin) {
65 : int fd;
66 2 : int one = 1;
67 2 : int buf_size = BUF_SIZE;
68 : int flags;
69 :
70 2 : fd = socket(AF_INET, SOCK_STREAM, 0);
71 2 : setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
72 : /* Reset the size of socket send buffer to the minimal value to facilitate
73 : buffer filling up and triggering notify_on_write */
74 2 : GPR_ASSERT(
75 : setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof(buf_size)) != -1);
76 2 : GPR_ASSERT(
77 : setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size)) != -1);
78 : /* Make fd non-blocking */
79 2 : flags = fcntl(fd, F_GETFL, 0);
80 2 : GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
81 2 : *socket_fd = fd;
82 :
83 : /* Use local address for test */
84 2 : sin->sin_family = AF_INET;
85 2 : sin->sin_addr.s_addr = htonl(0x7f000001);
86 2 : GPR_ASSERT(port >= 0 && port < 65536);
87 2 : sin->sin_port = htons((gpr_uint16)port);
88 2 : }
89 :
90 : /* Dummy gRPC callback */
91 0 : void no_op_cb(void *arg, int success) {}
92 :
93 : /* =======An upload server to test notify_on_read===========
94 : The server simply reads and counts a stream of bytes. */
95 :
96 : /* An upload server. */
97 : typedef struct {
98 : grpc_fd *em_fd; /* listening fd */
99 : ssize_t read_bytes_total; /* total number of received bytes */
100 : int done; /* set to 1 when a server finishes serving */
101 : grpc_closure listen_closure;
102 : } server;
103 :
104 1 : static void server_init(server *sv) {
105 1 : sv->read_bytes_total = 0;
106 1 : sv->done = 0;
107 1 : }
108 :
109 : /* An upload session.
110 : Created when a new upload request arrives in the server. */
111 : typedef struct {
112 : server *sv; /* not owned by a single session */
113 : grpc_fd *em_fd; /* fd to read upload bytes */
114 : char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
115 : grpc_closure session_read_closure;
116 : } session;
117 :
118 : /* Called when an upload session can be safely shutdown.
119 : Close session FD and start to shutdown listen FD. */
120 1 : static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
121 : int success) {
122 1 : session *se = arg;
123 1 : server *sv = se->sv;
124 1 : grpc_fd_orphan(exec_ctx, se->em_fd, NULL, "a");
125 1 : gpr_free(se);
126 : /* Start to shutdown listen fd. */
127 1 : grpc_fd_shutdown(exec_ctx, sv->em_fd);
128 1 : }
129 :
130 : /* Called when data become readable in a session. */
131 4 : static void session_read_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
132 : int success) {
133 4 : session *se = arg;
134 4 : int fd = se->em_fd->fd;
135 :
136 4 : ssize_t read_once = 0;
137 4 : ssize_t read_total = 0;
138 :
139 4 : if (!success) {
140 0 : session_shutdown_cb(exec_ctx, arg, 1);
141 0 : return;
142 : }
143 :
144 : do {
145 8 : read_once = read(fd, se->read_buf, BUF_SIZE);
146 8 : if (read_once > 0) read_total += read_once;
147 8 : } while (read_once > 0);
148 4 : se->sv->read_bytes_total += read_total;
149 :
150 : /* read() returns 0 to indicate the TCP connection was closed by the client.
151 : read(fd, read_buf, 0) also returns 0 which should never be called as such.
152 : It is possible to read nothing due to spurious edge event or data has
153 : been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
154 4 : if (read_once == 0) {
155 1 : session_shutdown_cb(exec_ctx, arg, 1);
156 3 : } else if (read_once == -1) {
157 3 : if (errno == EAGAIN) {
158 : /* An edge triggered event is cached in the kernel until next poll.
159 : In the current single thread implementation, session_read_cb is called
160 : in the polling thread, such that polling only happens after this
161 : callback, and will catch read edge event if data is available again
162 : before notify_on_read.
163 : TODO(chenw): in multi-threaded version, callback and polling can be
164 : run in different threads. polling may catch a persist read edge event
165 : before notify_on_read is called. */
166 3 : grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure);
167 : } else {
168 0 : gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
169 0 : abort();
170 : }
171 : }
172 : }
173 :
174 : /* Called when the listen FD can be safely shutdown.
175 : Close listen FD and signal that server can be shutdown. */
176 1 : static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */,
177 : int success) {
178 1 : server *sv = arg;
179 :
180 1 : grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, "b");
181 :
182 1 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
183 1 : sv->done = 1;
184 1 : grpc_pollset_kick(&g_pollset, NULL);
185 1 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
186 1 : }
187 :
188 : /* Called when a new TCP connection request arrives in the listening port. */
189 2 : static void listen_cb(grpc_exec_ctx *exec_ctx, void *arg, /*=sv_arg*/
190 : int success) {
191 2 : server *sv = arg;
192 : int fd;
193 : int flags;
194 : session *se;
195 : struct sockaddr_storage ss;
196 2 : socklen_t slen = sizeof(ss);
197 2 : grpc_fd *listen_em_fd = sv->em_fd;
198 :
199 2 : if (!success) {
200 1 : listen_shutdown_cb(exec_ctx, arg, 1);
201 3 : return;
202 : }
203 :
204 1 : fd = accept(listen_em_fd->fd, (struct sockaddr *)&ss, &slen);
205 1 : GPR_ASSERT(fd >= 0);
206 1 : GPR_ASSERT(fd < FD_SETSIZE);
207 1 : flags = fcntl(fd, F_GETFL, 0);
208 1 : fcntl(fd, F_SETFL, flags | O_NONBLOCK);
209 1 : se = gpr_malloc(sizeof(*se));
210 1 : se->sv = sv;
211 1 : se->em_fd = grpc_fd_create(fd, "listener");
212 1 : grpc_pollset_add_fd(exec_ctx, &g_pollset, se->em_fd);
213 1 : se->session_read_closure.cb = session_read_cb;
214 1 : se->session_read_closure.cb_arg = se;
215 1 : grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure);
216 :
217 1 : grpc_fd_notify_on_read(exec_ctx, listen_em_fd, &sv->listen_closure);
218 : }
219 :
220 : /* Max number of connections pending to be accepted by listen(). */
221 : #define MAX_NUM_FD 1024
222 :
223 : /* Start a test server, return the TCP listening port bound to listen_fd.
224 : listen_cb() is registered to be interested in reading from listen_fd.
225 : When connection request arrives, listen_cb() is called to accept the
226 : connection request. */
227 1 : static int server_start(grpc_exec_ctx *exec_ctx, server *sv) {
228 1 : int port = 0;
229 : int fd;
230 : struct sockaddr_in sin;
231 : socklen_t addr_len;
232 :
233 1 : create_test_socket(port, &fd, &sin);
234 1 : addr_len = sizeof(sin);
235 1 : GPR_ASSERT(bind(fd, (struct sockaddr *)&sin, addr_len) == 0);
236 1 : GPR_ASSERT(getsockname(fd, (struct sockaddr *)&sin, &addr_len) == 0);
237 1 : port = ntohs(sin.sin_port);
238 1 : GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
239 :
240 1 : sv->em_fd = grpc_fd_create(fd, "server");
241 1 : grpc_pollset_add_fd(exec_ctx, &g_pollset, sv->em_fd);
242 : /* Register to be interested in reading from listen_fd. */
243 1 : sv->listen_closure.cb = listen_cb;
244 1 : sv->listen_closure.cb_arg = sv;
245 1 : grpc_fd_notify_on_read(exec_ctx, sv->em_fd, &sv->listen_closure);
246 :
247 1 : return port;
248 : }
249 :
250 : /* Wait and shutdown a sever. */
251 1 : static void server_wait_and_shutdown(server *sv) {
252 1 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
253 3 : while (!sv->done) {
254 1 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
255 : grpc_pollset_worker worker;
256 1 : grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
257 : gpr_now(GPR_CLOCK_MONOTONIC),
258 : gpr_inf_future(GPR_CLOCK_MONOTONIC));
259 1 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
260 1 : grpc_exec_ctx_finish(&exec_ctx);
261 1 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
262 : }
263 1 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
264 1 : }
265 :
266 : /* ===An upload client to test notify_on_write=== */
267 :
268 : /* Client write buffer size */
269 : #define CLIENT_WRITE_BUF_SIZE 10
270 : /* Total number of times that the client fills up the write buffer */
271 : #define CLIENT_TOTAL_WRITE_CNT 3
272 :
273 : /* An upload client. */
274 : typedef struct {
275 : grpc_fd *em_fd;
276 : char write_buf[CLIENT_WRITE_BUF_SIZE];
277 : ssize_t write_bytes_total;
278 : /* Number of times that the client fills up the write buffer and calls
279 : notify_on_write to schedule another write. */
280 : int client_write_cnt;
281 :
282 : int done; /* set to 1 when a client finishes sending */
283 : grpc_closure write_closure;
284 : } client;
285 :
286 1 : static void client_init(client *cl) {
287 1 : memset(cl->write_buf, 0, sizeof(cl->write_buf));
288 1 : cl->write_bytes_total = 0;
289 1 : cl->client_write_cnt = 0;
290 1 : cl->done = 0;
291 1 : }
292 :
293 : /* Called when a client upload session is ready to shutdown. */
294 1 : static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx,
295 : void *arg /*client */, int success) {
296 1 : client *cl = arg;
297 1 : grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, "c");
298 1 : cl->done = 1;
299 1 : grpc_pollset_kick(&g_pollset, NULL);
300 1 : }
301 :
302 : /* Write as much as possible, then register notify_on_write. */
303 4 : static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */
304 : int success) {
305 4 : client *cl = arg;
306 4 : int fd = cl->em_fd->fd;
307 4 : ssize_t write_once = 0;
308 :
309 4 : if (!success) {
310 0 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
311 0 : client_session_shutdown_cb(exec_ctx, arg, 1);
312 0 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
313 0 : return;
314 : }
315 :
316 : do {
317 241 : write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
318 241 : if (write_once > 0) cl->write_bytes_total += write_once;
319 241 : } while (write_once > 0);
320 :
321 4 : if (errno == EAGAIN) {
322 4 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
323 4 : if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
324 3 : cl->write_closure.cb = client_session_write;
325 3 : cl->write_closure.cb_arg = cl;
326 3 : grpc_fd_notify_on_write(exec_ctx, cl->em_fd, &cl->write_closure);
327 3 : cl->client_write_cnt++;
328 : } else {
329 1 : client_session_shutdown_cb(exec_ctx, arg, 1);
330 : }
331 4 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
332 : } else {
333 0 : gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno));
334 0 : abort();
335 : }
336 : }
337 :
338 : /* Start a client to send a stream of bytes. */
339 1 : static void client_start(grpc_exec_ctx *exec_ctx, client *cl, int port) {
340 : int fd;
341 : struct sockaddr_in sin;
342 1 : create_test_socket(port, &fd, &sin);
343 1 : if (connect(fd, (struct sockaddr *)&sin, sizeof(sin)) == -1) {
344 1 : if (errno == EINPROGRESS) {
345 : struct pollfd pfd;
346 1 : pfd.fd = fd;
347 1 : pfd.events = POLLOUT;
348 1 : pfd.revents = 0;
349 1 : if (poll(&pfd, 1, -1) == -1) {
350 0 : gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
351 0 : abort();
352 : }
353 : } else {
354 0 : gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno);
355 0 : abort();
356 : }
357 : }
358 :
359 1 : cl->em_fd = grpc_fd_create(fd, "client");
360 1 : grpc_pollset_add_fd(exec_ctx, &g_pollset, cl->em_fd);
361 :
362 1 : client_session_write(exec_ctx, cl, 1);
363 1 : }
364 :
365 : /* Wait for the signal to shutdown a client. */
366 1 : static void client_wait_and_shutdown(client *cl) {
367 1 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
368 9 : while (!cl->done) {
369 : grpc_pollset_worker worker;
370 7 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
371 7 : grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
372 : gpr_now(GPR_CLOCK_MONOTONIC),
373 : gpr_inf_future(GPR_CLOCK_MONOTONIC));
374 7 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
375 7 : grpc_exec_ctx_finish(&exec_ctx);
376 7 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
377 : }
378 1 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
379 1 : }
380 :
381 : /* Test grpc_fd. Start an upload server and client, upload a stream of
382 : bytes from the client to the server, and verify that the total number of
383 : sent bytes is equal to the total number of received bytes. */
384 1 : static void test_grpc_fd(void) {
385 : server sv;
386 : client cl;
387 : int port;
388 1 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
389 :
390 1 : server_init(&sv);
391 1 : port = server_start(&exec_ctx, &sv);
392 1 : client_init(&cl);
393 1 : client_start(&exec_ctx, &cl, port);
394 1 : grpc_exec_ctx_finish(&exec_ctx);
395 1 : client_wait_and_shutdown(&cl);
396 1 : server_wait_and_shutdown(&sv);
397 1 : GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
398 1 : gpr_log(GPR_INFO, "Total read bytes %d", sv.read_bytes_total);
399 1 : }
400 :
401 : typedef struct fd_change_data {
402 : void (*cb_that_ran)(grpc_exec_ctx *exec_ctx, void *, int success);
403 : } fd_change_data;
404 :
405 2 : void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; }
406 :
407 2 : void destroy_change_data(fd_change_data *fdc) {}
408 :
409 1 : static void first_read_callback(grpc_exec_ctx *exec_ctx,
410 : void *arg /* fd_change_data */, int success) {
411 1 : fd_change_data *fdc = arg;
412 :
413 1 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
414 1 : fdc->cb_that_ran = first_read_callback;
415 1 : grpc_pollset_kick(&g_pollset, NULL);
416 1 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
417 1 : }
418 :
419 1 : static void second_read_callback(grpc_exec_ctx *exec_ctx,
420 : void *arg /* fd_change_data */, int success) {
421 1 : fd_change_data *fdc = arg;
422 :
423 1 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
424 1 : fdc->cb_that_ran = second_read_callback;
425 1 : grpc_pollset_kick(&g_pollset, NULL);
426 1 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
427 1 : }
428 :
429 : /* Test that changing the callback we use for notify_on_read actually works.
430 : Note that we have two different but almost identical callbacks above -- the
431 : point is to have two different function pointers and two different data
432 : pointers and make sure that changing both really works. */
433 1 : static void test_grpc_fd_change(void) {
434 : grpc_fd *em_fd;
435 : fd_change_data a, b;
436 : int flags;
437 : int sv[2];
438 : char data;
439 : ssize_t result;
440 : grpc_closure first_closure;
441 : grpc_closure second_closure;
442 1 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
443 :
444 1 : first_closure.cb = first_read_callback;
445 1 : first_closure.cb_arg = &a;
446 1 : second_closure.cb = second_read_callback;
447 1 : second_closure.cb_arg = &b;
448 :
449 1 : init_change_data(&a);
450 1 : init_change_data(&b);
451 :
452 1 : GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
453 1 : flags = fcntl(sv[0], F_GETFL, 0);
454 1 : GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
455 1 : flags = fcntl(sv[1], F_GETFL, 0);
456 1 : GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
457 :
458 1 : em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change");
459 1 : grpc_pollset_add_fd(&exec_ctx, &g_pollset, em_fd);
460 :
461 : /* Register the first callback, then make its FD readable */
462 1 : grpc_fd_notify_on_read(&exec_ctx, em_fd, &first_closure);
463 1 : data = 0;
464 1 : result = write(sv[1], &data, 1);
465 1 : GPR_ASSERT(result == 1);
466 :
467 : /* And now wait for it to run. */
468 1 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
469 3 : while (a.cb_that_ran == NULL) {
470 : grpc_pollset_worker worker;
471 1 : grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
472 : gpr_now(GPR_CLOCK_MONOTONIC),
473 : gpr_inf_future(GPR_CLOCK_MONOTONIC));
474 1 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
475 1 : grpc_exec_ctx_finish(&exec_ctx);
476 1 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
477 : }
478 1 : GPR_ASSERT(a.cb_that_ran == first_read_callback);
479 1 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
480 :
481 : /* And drain the socket so we can generate a new read edge */
482 1 : result = read(sv[0], &data, 1);
483 1 : GPR_ASSERT(result == 1);
484 :
485 : /* Now register a second callback with distinct change data, and do the same
486 : thing again. */
487 1 : grpc_fd_notify_on_read(&exec_ctx, em_fd, &second_closure);
488 1 : data = 0;
489 1 : result = write(sv[1], &data, 1);
490 1 : GPR_ASSERT(result == 1);
491 :
492 1 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
493 3 : while (b.cb_that_ran == NULL) {
494 : grpc_pollset_worker worker;
495 1 : grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
496 : gpr_now(GPR_CLOCK_MONOTONIC),
497 : gpr_inf_future(GPR_CLOCK_MONOTONIC));
498 1 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
499 1 : grpc_exec_ctx_finish(&exec_ctx);
500 1 : gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
501 : }
502 : /* Except now we verify that second_read_callback ran instead */
503 1 : GPR_ASSERT(b.cb_that_ran == second_read_callback);
504 1 : gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
505 :
506 1 : grpc_fd_orphan(&exec_ctx, em_fd, NULL, "d");
507 1 : grpc_exec_ctx_finish(&exec_ctx);
508 1 : destroy_change_data(&a);
509 1 : destroy_change_data(&b);
510 1 : close(sv[1]);
511 1 : }
512 :
513 1 : static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) {
514 1 : grpc_pollset_destroy(p);
515 1 : }
516 :
517 1 : int main(int argc, char **argv) {
518 : grpc_closure destroyed;
519 1 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
520 1 : grpc_test_init(argc, argv);
521 1 : grpc_iomgr_init();
522 1 : grpc_pollset_init(&g_pollset);
523 1 : test_grpc_fd();
524 1 : test_grpc_fd_change();
525 1 : grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
526 1 : grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed);
527 1 : grpc_exec_ctx_finish(&exec_ctx);
528 1 : grpc_iomgr_shutdown();
529 1 : return 0;
530 : }
|