LCOV - code coverage report
Current view: top level - test/core/iomgr - fd_posix_test.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 234 249 94.0 %
Date: 2015-10-10 Functions: 21 22 95.5 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/iomgr/fd_posix.h"
      35             : 
      36             : #include <ctype.h>
      37             : #include <errno.h>
      38             : #include <fcntl.h>
      39             : #include <netinet/in.h>
      40             : #include <poll.h>
      41             : #include <stdio.h>
      42             : #include <stdlib.h>
      43             : #include <string.h>
      44             : #include <sys/socket.h>
      45             : #include <sys/time.h>
      46             : #include <unistd.h>
      47             : 
      48             : #include <grpc/support/alloc.h>
      49             : #include <grpc/support/log.h>
      50             : #include <grpc/support/sync.h>
      51             : #include <grpc/support/time.h>
      52             : #include "test/core/util/test_config.h"
      53             : 
      54             : static grpc_pollset g_pollset;
      55             : 
      56             : /* buffer size used to send and receive data.
      57             :    1024 is the minimal value to set TCP send and receive buffer. */
      58             : #define BUF_SIZE 1024
      59             : 
      60             : /* Create a test socket with the right properties for testing.
      61             :    port is the TCP port to listen or connect to.
      62             :    Return a socket FD and sockaddr_in. */
      63           2 : static void create_test_socket(int port, int *socket_fd,
      64             :                                struct sockaddr_in *sin) {
      65             :   int fd;
      66           2 :   int one = 1;
      67           2 :   int buf_size = BUF_SIZE;
      68             :   int flags;
      69             : 
      70           2 :   fd = socket(AF_INET, SOCK_STREAM, 0);
      71           2 :   setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
      72             :   /* Reset the size of socket send buffer to the minimal value to facilitate
      73             :      buffer filling up and triggering notify_on_write  */
      74           2 :   GPR_ASSERT(
      75             :       setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof(buf_size)) != -1);
      76           2 :   GPR_ASSERT(
      77             :       setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &buf_size, sizeof(buf_size)) != -1);
      78             :   /* Make fd non-blocking */
      79           2 :   flags = fcntl(fd, F_GETFL, 0);
      80           2 :   GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0);
      81           2 :   *socket_fd = fd;
      82             : 
      83             :   /* Use local address for test */
      84           2 :   sin->sin_family = AF_INET;
      85           2 :   sin->sin_addr.s_addr = htonl(0x7f000001);
      86           2 :   GPR_ASSERT(port >= 0 && port < 65536);
      87           2 :   sin->sin_port = htons((gpr_uint16)port);
      88           2 : }
      89             : 
      90             : /* Dummy gRPC callback */
      91           0 : void no_op_cb(void *arg, int success) {}
      92             : 
      93             : /* =======An upload server to test notify_on_read===========
      94             :    The server simply reads and counts a stream of bytes. */
      95             : 
      96             : /* An upload server. */
      97             : typedef struct {
      98             :   grpc_fd *em_fd;           /* listening fd */
      99             :   ssize_t read_bytes_total; /* total number of received bytes */
     100             :   int done;                 /* set to 1 when a server finishes serving */
     101             :   grpc_closure listen_closure;
     102             : } server;
     103             : 
     104           1 : static void server_init(server *sv) {
     105           1 :   sv->read_bytes_total = 0;
     106           1 :   sv->done = 0;
     107           1 : }
     108             : 
     109             : /* An upload session.
     110             :    Created when a new upload request arrives in the server. */
     111             : typedef struct {
     112             :   server *sv;              /* not owned by a single session */
     113             :   grpc_fd *em_fd;          /* fd to read upload bytes */
     114             :   char read_buf[BUF_SIZE]; /* buffer to store upload bytes */
     115             :   grpc_closure session_read_closure;
     116             : } session;
     117             : 
     118             : /* Called when an upload session can be safely shutdown.
     119             :    Close session FD and start to shutdown listen FD. */
     120           1 : static void session_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
     121             :                                 int success) {
     122           1 :   session *se = arg;
     123           1 :   server *sv = se->sv;
     124           1 :   grpc_fd_orphan(exec_ctx, se->em_fd, NULL, "a");
     125           1 :   gpr_free(se);
     126             :   /* Start to shutdown listen fd. */
     127           1 :   grpc_fd_shutdown(exec_ctx, sv->em_fd);
     128           1 : }
     129             : 
     130             : /* Called when data become readable in a session. */
     131           4 : static void session_read_cb(grpc_exec_ctx *exec_ctx, void *arg, /*session */
     132             :                             int success) {
     133           4 :   session *se = arg;
     134           4 :   int fd = se->em_fd->fd;
     135             : 
     136           4 :   ssize_t read_once = 0;
     137           4 :   ssize_t read_total = 0;
     138             : 
     139           4 :   if (!success) {
     140           0 :     session_shutdown_cb(exec_ctx, arg, 1);
     141           0 :     return;
     142             :   }
     143             : 
     144             :   do {
     145           8 :     read_once = read(fd, se->read_buf, BUF_SIZE);
     146           8 :     if (read_once > 0) read_total += read_once;
     147           8 :   } while (read_once > 0);
     148           4 :   se->sv->read_bytes_total += read_total;
     149             : 
     150             :   /* read() returns 0 to indicate the TCP connection was closed by the client.
     151             :      read(fd, read_buf, 0) also returns 0 which should never be called as such.
     152             :      It is possible to read nothing due to spurious edge event or data has
     153             :      been drained, In such a case, read() returns -1 and set errno to EAGAIN. */
     154           4 :   if (read_once == 0) {
     155           1 :     session_shutdown_cb(exec_ctx, arg, 1);
     156           3 :   } else if (read_once == -1) {
     157           3 :     if (errno == EAGAIN) {
     158             :       /* An edge triggered event is cached in the kernel until next poll.
     159             :          In the current single thread implementation, session_read_cb is called
     160             :          in the polling thread, such that polling only happens after this
     161             :          callback, and will catch read edge event if data is available again
     162             :          before notify_on_read.
     163             :          TODO(chenw): in multi-threaded version, callback and polling can be
     164             :          run in different threads. polling may catch a persist read edge event
     165             :          before notify_on_read is called.  */
     166           3 :       grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure);
     167             :     } else {
     168           0 :       gpr_log(GPR_ERROR, "Unhandled read error %s", strerror(errno));
     169           0 :       abort();
     170             :     }
     171             :   }
     172             : }
     173             : 
     174             : /* Called when the listen FD can be safely shutdown.
     175             :    Close listen FD and signal that server can be shutdown. */
     176           1 : static void listen_shutdown_cb(grpc_exec_ctx *exec_ctx, void *arg /*server */,
     177             :                                int success) {
     178           1 :   server *sv = arg;
     179             : 
     180           1 :   grpc_fd_orphan(exec_ctx, sv->em_fd, NULL, "b");
     181             : 
     182           1 :   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     183           1 :   sv->done = 1;
     184           1 :   grpc_pollset_kick(&g_pollset, NULL);
     185           1 :   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     186           1 : }
     187             : 
     188             : /* Called when a new TCP connection request arrives in the listening port. */
     189           2 : static void listen_cb(grpc_exec_ctx *exec_ctx, void *arg, /*=sv_arg*/
     190             :                       int success) {
     191           2 :   server *sv = arg;
     192             :   int fd;
     193             :   int flags;
     194             :   session *se;
     195             :   struct sockaddr_storage ss;
     196           2 :   socklen_t slen = sizeof(ss);
     197           2 :   grpc_fd *listen_em_fd = sv->em_fd;
     198             : 
     199           2 :   if (!success) {
     200           1 :     listen_shutdown_cb(exec_ctx, arg, 1);
     201           3 :     return;
     202             :   }
     203             : 
     204           1 :   fd = accept(listen_em_fd->fd, (struct sockaddr *)&ss, &slen);
     205           1 :   GPR_ASSERT(fd >= 0);
     206           1 :   GPR_ASSERT(fd < FD_SETSIZE);
     207           1 :   flags = fcntl(fd, F_GETFL, 0);
     208           1 :   fcntl(fd, F_SETFL, flags | O_NONBLOCK);
     209           1 :   se = gpr_malloc(sizeof(*se));
     210           1 :   se->sv = sv;
     211           1 :   se->em_fd = grpc_fd_create(fd, "listener");
     212           1 :   grpc_pollset_add_fd(exec_ctx, &g_pollset, se->em_fd);
     213           1 :   se->session_read_closure.cb = session_read_cb;
     214           1 :   se->session_read_closure.cb_arg = se;
     215           1 :   grpc_fd_notify_on_read(exec_ctx, se->em_fd, &se->session_read_closure);
     216             : 
     217           1 :   grpc_fd_notify_on_read(exec_ctx, listen_em_fd, &sv->listen_closure);
     218             : }
     219             : 
     220             : /* Max number of connections pending to be accepted by listen(). */
     221             : #define MAX_NUM_FD 1024
     222             : 
     223             : /* Start a test server, return the TCP listening port bound to listen_fd.
     224             :    listen_cb() is registered to be interested in reading from listen_fd.
     225             :    When connection request arrives, listen_cb() is called to accept the
     226             :    connection request. */
     227           1 : static int server_start(grpc_exec_ctx *exec_ctx, server *sv) {
     228           1 :   int port = 0;
     229             :   int fd;
     230             :   struct sockaddr_in sin;
     231             :   socklen_t addr_len;
     232             : 
     233           1 :   create_test_socket(port, &fd, &sin);
     234           1 :   addr_len = sizeof(sin);
     235           1 :   GPR_ASSERT(bind(fd, (struct sockaddr *)&sin, addr_len) == 0);
     236           1 :   GPR_ASSERT(getsockname(fd, (struct sockaddr *)&sin, &addr_len) == 0);
     237           1 :   port = ntohs(sin.sin_port);
     238           1 :   GPR_ASSERT(listen(fd, MAX_NUM_FD) == 0);
     239             : 
     240           1 :   sv->em_fd = grpc_fd_create(fd, "server");
     241           1 :   grpc_pollset_add_fd(exec_ctx, &g_pollset, sv->em_fd);
     242             :   /* Register to be interested in reading from listen_fd. */
     243           1 :   sv->listen_closure.cb = listen_cb;
     244           1 :   sv->listen_closure.cb_arg = sv;
     245           1 :   grpc_fd_notify_on_read(exec_ctx, sv->em_fd, &sv->listen_closure);
     246             : 
     247           1 :   return port;
     248             : }
     249             : 
     250             : /* Wait and shutdown a sever. */
     251           1 : static void server_wait_and_shutdown(server *sv) {
     252           1 :   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     253           3 :   while (!sv->done) {
     254           1 :     grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     255             :     grpc_pollset_worker worker;
     256           1 :     grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
     257             :                       gpr_now(GPR_CLOCK_MONOTONIC),
     258             :                       gpr_inf_future(GPR_CLOCK_MONOTONIC));
     259           1 :     gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     260           1 :     grpc_exec_ctx_finish(&exec_ctx);
     261           1 :     gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     262             :   }
     263           1 :   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     264           1 : }
     265             : 
     266             : /* ===An upload client to test notify_on_write=== */
     267             : 
     268             : /* Client write buffer size */
     269             : #define CLIENT_WRITE_BUF_SIZE 10
     270             : /* Total number of times that the client fills up the write buffer */
     271             : #define CLIENT_TOTAL_WRITE_CNT 3
     272             : 
     273             : /* An upload client. */
     274             : typedef struct {
     275             :   grpc_fd *em_fd;
     276             :   char write_buf[CLIENT_WRITE_BUF_SIZE];
     277             :   ssize_t write_bytes_total;
     278             :   /* Number of times that the client fills up the write buffer and calls
     279             :      notify_on_write to schedule another write. */
     280             :   int client_write_cnt;
     281             : 
     282             :   int done; /* set to 1 when a client finishes sending */
     283             :   grpc_closure write_closure;
     284             : } client;
     285             : 
     286           1 : static void client_init(client *cl) {
     287           1 :   memset(cl->write_buf, 0, sizeof(cl->write_buf));
     288           1 :   cl->write_bytes_total = 0;
     289           1 :   cl->client_write_cnt = 0;
     290           1 :   cl->done = 0;
     291           1 : }
     292             : 
     293             : /* Called when a client upload session is ready to shutdown. */
     294           1 : static void client_session_shutdown_cb(grpc_exec_ctx *exec_ctx,
     295             :                                        void *arg /*client */, int success) {
     296           1 :   client *cl = arg;
     297           1 :   grpc_fd_orphan(exec_ctx, cl->em_fd, NULL, "c");
     298           1 :   cl->done = 1;
     299           1 :   grpc_pollset_kick(&g_pollset, NULL);
     300           1 : }
     301             : 
     302             : /* Write as much as possible, then register notify_on_write. */
     303           4 : static void client_session_write(grpc_exec_ctx *exec_ctx, void *arg, /*client */
     304             :                                  int success) {
     305           4 :   client *cl = arg;
     306           4 :   int fd = cl->em_fd->fd;
     307           4 :   ssize_t write_once = 0;
     308             : 
     309           4 :   if (!success) {
     310           0 :     gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     311           0 :     client_session_shutdown_cb(exec_ctx, arg, 1);
     312           0 :     gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     313           0 :     return;
     314             :   }
     315             : 
     316             :   do {
     317         241 :     write_once = write(fd, cl->write_buf, CLIENT_WRITE_BUF_SIZE);
     318         241 :     if (write_once > 0) cl->write_bytes_total += write_once;
     319         241 :   } while (write_once > 0);
     320             : 
     321           4 :   if (errno == EAGAIN) {
     322           4 :     gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     323           4 :     if (cl->client_write_cnt < CLIENT_TOTAL_WRITE_CNT) {
     324           3 :       cl->write_closure.cb = client_session_write;
     325           3 :       cl->write_closure.cb_arg = cl;
     326           3 :       grpc_fd_notify_on_write(exec_ctx, cl->em_fd, &cl->write_closure);
     327           3 :       cl->client_write_cnt++;
     328             :     } else {
     329           1 :       client_session_shutdown_cb(exec_ctx, arg, 1);
     330             :     }
     331           4 :     gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     332             :   } else {
     333           0 :     gpr_log(GPR_ERROR, "unknown errno %s", strerror(errno));
     334           0 :     abort();
     335             :   }
     336             : }
     337             : 
     338             : /* Start a client to send a stream of bytes. */
     339           1 : static void client_start(grpc_exec_ctx *exec_ctx, client *cl, int port) {
     340             :   int fd;
     341             :   struct sockaddr_in sin;
     342           1 :   create_test_socket(port, &fd, &sin);
     343           1 :   if (connect(fd, (struct sockaddr *)&sin, sizeof(sin)) == -1) {
     344           1 :     if (errno == EINPROGRESS) {
     345             :       struct pollfd pfd;
     346           1 :       pfd.fd = fd;
     347           1 :       pfd.events = POLLOUT;
     348           1 :       pfd.revents = 0;
     349           1 :       if (poll(&pfd, 1, -1) == -1) {
     350           0 :         gpr_log(GPR_ERROR, "poll() failed during connect; errno=%d", errno);
     351           0 :         abort();
     352             :       }
     353             :     } else {
     354           0 :       gpr_log(GPR_ERROR, "Failed to connect to the server (errno=%d)", errno);
     355           0 :       abort();
     356             :     }
     357             :   }
     358             : 
     359           1 :   cl->em_fd = grpc_fd_create(fd, "client");
     360           1 :   grpc_pollset_add_fd(exec_ctx, &g_pollset, cl->em_fd);
     361             : 
     362           1 :   client_session_write(exec_ctx, cl, 1);
     363           1 : }
     364             : 
     365             : /* Wait for the signal to shutdown a client. */
     366           1 : static void client_wait_and_shutdown(client *cl) {
     367           1 :   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     368           9 :   while (!cl->done) {
     369             :     grpc_pollset_worker worker;
     370           7 :     grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     371           7 :     grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
     372             :                       gpr_now(GPR_CLOCK_MONOTONIC),
     373             :                       gpr_inf_future(GPR_CLOCK_MONOTONIC));
     374           7 :     gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     375           7 :     grpc_exec_ctx_finish(&exec_ctx);
     376           7 :     gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     377             :   }
     378           1 :   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     379           1 : }
     380             : 
     381             : /* Test grpc_fd. Start an upload server and client, upload a stream of
     382             :    bytes from the client to the server, and verify that the total number of
     383             :    sent bytes is equal to the total number of received bytes. */
     384           1 : static void test_grpc_fd(void) {
     385             :   server sv;
     386             :   client cl;
     387             :   int port;
     388           1 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     389             : 
     390           1 :   server_init(&sv);
     391           1 :   port = server_start(&exec_ctx, &sv);
     392           1 :   client_init(&cl);
     393           1 :   client_start(&exec_ctx, &cl, port);
     394           1 :   grpc_exec_ctx_finish(&exec_ctx);
     395           1 :   client_wait_and_shutdown(&cl);
     396           1 :   server_wait_and_shutdown(&sv);
     397           1 :   GPR_ASSERT(sv.read_bytes_total == cl.write_bytes_total);
     398           1 :   gpr_log(GPR_INFO, "Total read bytes %d", sv.read_bytes_total);
     399           1 : }
     400             : 
     401             : typedef struct fd_change_data {
     402             :   void (*cb_that_ran)(grpc_exec_ctx *exec_ctx, void *, int success);
     403             : } fd_change_data;
     404             : 
     405           2 : void init_change_data(fd_change_data *fdc) { fdc->cb_that_ran = NULL; }
     406             : 
     407           2 : void destroy_change_data(fd_change_data *fdc) {}
     408             : 
     409           1 : static void first_read_callback(grpc_exec_ctx *exec_ctx,
     410             :                                 void *arg /* fd_change_data */, int success) {
     411           1 :   fd_change_data *fdc = arg;
     412             : 
     413           1 :   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     414           1 :   fdc->cb_that_ran = first_read_callback;
     415           1 :   grpc_pollset_kick(&g_pollset, NULL);
     416           1 :   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     417           1 : }
     418             : 
     419           1 : static void second_read_callback(grpc_exec_ctx *exec_ctx,
     420             :                                  void *arg /* fd_change_data */, int success) {
     421           1 :   fd_change_data *fdc = arg;
     422             : 
     423           1 :   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     424           1 :   fdc->cb_that_ran = second_read_callback;
     425           1 :   grpc_pollset_kick(&g_pollset, NULL);
     426           1 :   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     427           1 : }
     428             : 
     429             : /* Test that changing the callback we use for notify_on_read actually works.
     430             :    Note that we have two different but almost identical callbacks above -- the
     431             :    point is to have two different function pointers and two different data
     432             :    pointers and make sure that changing both really works. */
     433           1 : static void test_grpc_fd_change(void) {
     434             :   grpc_fd *em_fd;
     435             :   fd_change_data a, b;
     436             :   int flags;
     437             :   int sv[2];
     438             :   char data;
     439             :   ssize_t result;
     440             :   grpc_closure first_closure;
     441             :   grpc_closure second_closure;
     442           1 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     443             : 
     444           1 :   first_closure.cb = first_read_callback;
     445           1 :   first_closure.cb_arg = &a;
     446           1 :   second_closure.cb = second_read_callback;
     447           1 :   second_closure.cb_arg = &b;
     448             : 
     449           1 :   init_change_data(&a);
     450           1 :   init_change_data(&b);
     451             : 
     452           1 :   GPR_ASSERT(socketpair(AF_UNIX, SOCK_STREAM, 0, sv) == 0);
     453           1 :   flags = fcntl(sv[0], F_GETFL, 0);
     454           1 :   GPR_ASSERT(fcntl(sv[0], F_SETFL, flags | O_NONBLOCK) == 0);
     455           1 :   flags = fcntl(sv[1], F_GETFL, 0);
     456           1 :   GPR_ASSERT(fcntl(sv[1], F_SETFL, flags | O_NONBLOCK) == 0);
     457             : 
     458           1 :   em_fd = grpc_fd_create(sv[0], "test_grpc_fd_change");
     459           1 :   grpc_pollset_add_fd(&exec_ctx, &g_pollset, em_fd);
     460             : 
     461             :   /* Register the first callback, then make its FD readable */
     462           1 :   grpc_fd_notify_on_read(&exec_ctx, em_fd, &first_closure);
     463           1 :   data = 0;
     464           1 :   result = write(sv[1], &data, 1);
     465           1 :   GPR_ASSERT(result == 1);
     466             : 
     467             :   /* And now wait for it to run. */
     468           1 :   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     469           3 :   while (a.cb_that_ran == NULL) {
     470             :     grpc_pollset_worker worker;
     471           1 :     grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
     472             :                       gpr_now(GPR_CLOCK_MONOTONIC),
     473             :                       gpr_inf_future(GPR_CLOCK_MONOTONIC));
     474           1 :     gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     475           1 :     grpc_exec_ctx_finish(&exec_ctx);
     476           1 :     gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     477             :   }
     478           1 :   GPR_ASSERT(a.cb_that_ran == first_read_callback);
     479           1 :   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     480             : 
     481             :   /* And drain the socket so we can generate a new read edge */
     482           1 :   result = read(sv[0], &data, 1);
     483           1 :   GPR_ASSERT(result == 1);
     484             : 
     485             :   /* Now register a second callback with distinct change data, and do the same
     486             :      thing again. */
     487           1 :   grpc_fd_notify_on_read(&exec_ctx, em_fd, &second_closure);
     488           1 :   data = 0;
     489           1 :   result = write(sv[1], &data, 1);
     490           1 :   GPR_ASSERT(result == 1);
     491             : 
     492           1 :   gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     493           3 :   while (b.cb_that_ran == NULL) {
     494             :     grpc_pollset_worker worker;
     495           1 :     grpc_pollset_work(&exec_ctx, &g_pollset, &worker,
     496             :                       gpr_now(GPR_CLOCK_MONOTONIC),
     497             :                       gpr_inf_future(GPR_CLOCK_MONOTONIC));
     498           1 :     gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     499           1 :     grpc_exec_ctx_finish(&exec_ctx);
     500           1 :     gpr_mu_lock(GRPC_POLLSET_MU(&g_pollset));
     501             :   }
     502             :   /* Except now we verify that second_read_callback ran instead */
     503           1 :   GPR_ASSERT(b.cb_that_ran == second_read_callback);
     504           1 :   gpr_mu_unlock(GRPC_POLLSET_MU(&g_pollset));
     505             : 
     506           1 :   grpc_fd_orphan(&exec_ctx, em_fd, NULL, "d");
     507           1 :   grpc_exec_ctx_finish(&exec_ctx);
     508           1 :   destroy_change_data(&a);
     509           1 :   destroy_change_data(&b);
     510           1 :   close(sv[1]);
     511           1 : }
     512             : 
     513           1 : static void destroy_pollset(grpc_exec_ctx *exec_ctx, void *p, int success) {
     514           1 :   grpc_pollset_destroy(p);
     515           1 : }
     516             : 
     517           1 : int main(int argc, char **argv) {
     518             :   grpc_closure destroyed;
     519           1 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     520           1 :   grpc_test_init(argc, argv);
     521           1 :   grpc_iomgr_init();
     522           1 :   grpc_pollset_init(&g_pollset);
     523           1 :   test_grpc_fd();
     524           1 :   test_grpc_fd_change();
     525           1 :   grpc_closure_init(&destroyed, destroy_pollset, &g_pollset);
     526           1 :   grpc_pollset_shutdown(&exec_ctx, &g_pollset, &destroyed);
     527           1 :   grpc_exec_ctx_finish(&exec_ctx);
     528           1 :   grpc_iomgr_shutdown();
     529           1 :   return 0;
     530             : }

Generated by: LCOV version 1.10