LCOV - code coverage report
Current view: top level - core/iomgr - pollset_multipoller_with_epoll.c (source / functions) Hit Total Coverage
Test: tmp.CaZ6RjdVn2 Lines: 89 107 83.2 %
Date: 2015-12-10 22:15:08 Functions: 7 8 87.5 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include <grpc/support/port_platform.h>
      35             : 
      36             : #ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
      37             : 
      38             : #include <errno.h>
      39             : #include <poll.h>
      40             : #include <string.h>
      41             : #include <sys/epoll.h>
      42             : #include <unistd.h>
      43             : 
      44             : #include <grpc/support/alloc.h>
      45             : #include <grpc/support/log.h>
      46             : #include "src/core/iomgr/fd_posix.h"
      47             : #include "src/core/support/block_annotate.h"
      48             : #include "src/core/profiling/timers.h"
      49             : 
      50             : typedef struct {
      51             :   grpc_pollset *pollset;
      52             :   grpc_fd *fd;
      53             :   grpc_closure closure;
      54             : } delayed_add;
      55             : 
      56             : typedef struct { int epoll_fd; } pollset_hdr;
      57             : 
      58     2237151 : static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
      59             :                            grpc_fd *fd) {
      60     2237151 :   pollset_hdr *h = pollset->data.ptr;
      61             :   struct epoll_event ev;
      62             :   int err;
      63             :   grpc_fd_watcher watcher;
      64             : 
      65             :   /* We pretend to be polling whilst adding an fd to keep the fd from being
      66             :      closed during the add. This may result in a spurious wakeup being assigned
      67             :      to this pollset whilst adding, but that should be benign. */
      68     2237151 :   GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0);
      69     2237155 :   if (watcher.fd != NULL) {
      70     2237155 :     ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
      71     2237155 :     ev.data.ptr = fd;
      72     2237155 :     err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
      73     2237118 :     if (err < 0) {
      74             :       /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
      75     2227889 :       if (errno != EEXIST) {
      76           0 :         gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
      77           0 :                 strerror(errno));
      78             :       }
      79             :     }
      80             :   }
      81     2237120 :   grpc_fd_end_poll(exec_ctx, &watcher, 0, 0);
      82     2236981 : }
      83             : 
      84        5663 : static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
      85             :                                 int iomgr_status) {
      86        5616 :   delayed_add *da = arg;
      87             : 
      88        5663 :   if (!grpc_fd_is_orphaned(da->fd)) {
      89        5662 :     finally_add_fd(exec_ctx, da->pollset, da->fd);
      90             :   }
      91             : 
      92        5663 :   gpr_mu_lock(&da->pollset->mu);
      93        5663 :   da->pollset->in_flight_cbs--;
      94        5663 :   if (da->pollset->shutting_down) {
      95             :     /* We don't care about this pollset anymore. */
      96           2 :     if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
      97           1 :       da->pollset->called_shutdown = 1;
      98           1 :       grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, 1);
      99             :     }
     100             :   }
     101        5663 :   gpr_mu_unlock(&da->pollset->mu);
     102             : 
     103        5663 :   GRPC_FD_UNREF(da->fd, "delayed_add");
     104             : 
     105        5663 :   gpr_free(da);
     106        5663 : }
     107             : 
     108     2237153 : static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
     109             :                                                 grpc_pollset *pollset,
     110             :                                                 grpc_fd *fd,
     111             :                                                 int and_unlock_pollset) {
     112     2237153 :   if (and_unlock_pollset) {
     113     2231490 :     gpr_mu_unlock(&pollset->mu);
     114     2231490 :     finally_add_fd(exec_ctx, pollset, fd);
     115             :   } else {
     116        5663 :     delayed_add *da = gpr_malloc(sizeof(*da));
     117        5663 :     da->pollset = pollset;
     118        5663 :     da->fd = fd;
     119        5663 :     GRPC_FD_REF(fd, "delayed_add");
     120        5663 :     grpc_closure_init(&da->closure, perform_delayed_add, da);
     121        5663 :     pollset->in_flight_cbs++;
     122        5663 :     grpc_exec_ctx_enqueue(exec_ctx, &da->closure, 1);
     123             :   }
     124     2237156 : }
     125             : 
     126           0 : static void multipoll_with_epoll_pollset_del_fd(grpc_exec_ctx *exec_ctx,
     127             :                                                 grpc_pollset *pollset,
     128             :                                                 grpc_fd *fd,
     129             :                                                 int and_unlock_pollset) {
     130           0 :   pollset_hdr *h = pollset->data.ptr;
     131             :   int err;
     132             : 
     133           0 :   if (and_unlock_pollset) {
     134           0 :     gpr_mu_unlock(&pollset->mu);
     135             :   }
     136             : 
     137             :   /* Note that this can race with concurrent poll, but that should be fine since
     138             :    * at worst it creates a spurious read event on a reused grpc_fd object. */
     139           0 :   err = epoll_ctl(h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
     140           0 :   if (err < 0) {
     141           0 :     gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd->fd,
     142           0 :             strerror(errno));
     143             :   }
     144           0 : }
     145             : 
     146             : /* TODO(klempner): We probably want to turn this down a bit */
     147             : #define GRPC_EPOLL_MAX_EVENTS 1000
     148             : 
     149     2001592 : static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
     150             :     grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
     151             :     gpr_timespec deadline, gpr_timespec now) {
     152             :   struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
     153             :   int ep_rv;
     154             :   int poll_rv;
     155     2001592 :   pollset_hdr *h = pollset->data.ptr;
     156             :   int timeout_ms;
     157             :   struct pollfd pfds[2];
     158             : 
     159             :   /* If you want to ignore epoll's ability to sanely handle parallel pollers,
     160             :    * for a more apples-to-apples performance comparison with poll, add a
     161             :    * if (pollset->counter != 0) { return 0; }
     162             :    * here.
     163             :    */
     164             : 
     165     2001592 :   gpr_mu_unlock(&pollset->mu);
     166             : 
     167     2001743 :   timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
     168             : 
     169     2001759 :   pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
     170     2001759 :   pfds[0].events = POLLIN;
     171     2001759 :   pfds[0].revents = 0;
     172     2001759 :   pfds[1].fd = h->epoll_fd;
     173     2001759 :   pfds[1].events = POLLIN;
     174     2001759 :   pfds[1].revents = 0;
     175             : 
     176             :   /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
     177             :      even going into the blocking annotation if possible */
     178             :   GPR_TIMER_BEGIN("poll", 0);
     179             :   GRPC_SCHEDULING_START_BLOCKING_REGION;
     180     2001759 :   poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
     181             :   GRPC_SCHEDULING_END_BLOCKING_REGION;
     182             :   GPR_TIMER_END("poll", 0);
     183             : 
     184     2001376 :   if (poll_rv < 0) {
     185          44 :     if (errno != EINTR) {
     186           0 :       gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
     187             :     }
     188     2001332 :   } else if (poll_rv == 0) {
     189             :     /* do nothing */
     190             :   } else {
     191     1902865 :     if (pfds[0].revents) {
     192       74058 :       grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
     193             :     }
     194     1902884 :     if (pfds[1].revents) {
     195             :       do {
     196             :         /* The following epoll_wait never blocks; it has a timeout of 0 */
     197     1850937 :         ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
     198     1850909 :         if (ep_rv < 0) {
     199           0 :           if (errno != EINTR) {
     200           0 :             gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
     201             :           }
     202             :         } else {
     203             :           int i;
     204     3810271 :           for (i = 0; i < ep_rv; ++i) {
     205     1959753 :             grpc_fd *fd = ep_ev[i].data.ptr;
     206             :             /* TODO(klempner): We might want to consider making err and pri
     207             :              * separate events */
     208     1959753 :             int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
     209     1959753 :             int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
     210     1959753 :             int write_ev = ep_ev[i].events & EPOLLOUT;
     211     1959753 :             if (fd == NULL) {
     212         114 :               grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
     213             :             } else {
     214     1959639 :               if (read_ev || cancel) {
     215     1854376 :                 grpc_fd_become_readable(exec_ctx, fd);
     216             :               }
     217     1959977 :               if (write_ev || cancel) {
     218     1958496 :                 grpc_fd_become_writable(exec_ctx, fd);
     219             :               }
     220             :             }
     221             :           }
     222             :         }
     223     1851193 :       } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
     224             :     }
     225             :   }
     226     2001651 : }
     227             : 
     228        2755 : static void multipoll_with_epoll_pollset_finish_shutdown(
     229        2755 :     grpc_pollset *pollset) {}
     230             : 
     231        2753 : static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
     232        2753 :   pollset_hdr *h = pollset->data.ptr;
     233        2753 :   close(h->epoll_fd);
     234        2753 :   gpr_free(h);
     235        2753 : }
     236             : 
     237             : static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
     238             :     multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd,
     239             :     multipoll_with_epoll_pollset_maybe_work_and_unlock,
     240             :     multipoll_with_epoll_pollset_finish_shutdown,
     241             :     multipoll_with_epoll_pollset_destroy};
     242             : 
     243        2779 : static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
     244             :                                      grpc_pollset *pollset, grpc_fd **fds,
     245             :                                      size_t nfds) {
     246             :   size_t i;
     247        2779 :   pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
     248             :   struct epoll_event ev;
     249             :   int err;
     250             : 
     251        2779 :   pollset->vtable = &multipoll_with_epoll_pollset;
     252        2779 :   pollset->data.ptr = h;
     253        2779 :   h->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
     254        2779 :   if (h->epoll_fd < 0) {
     255             :     /* TODO(klempner): Fall back to poll here, especially on ENOSYS */
     256           0 :     gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
     257           0 :     abort();
     258             :   }
     259             : 
     260        2779 :   ev.events = (uint32_t)(EPOLLIN | EPOLLET);
     261        2779 :   ev.data.ptr = NULL;
     262        2779 :   err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD,
     263             :                   GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
     264        2779 :   if (err < 0) {
     265           0 :     gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s",
     266             :             GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd),
     267           0 :             strerror(errno));
     268             :   }
     269             : 
     270        8314 :   for (i = 0; i < nfds; i++) {
     271        5558 :     multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0);
     272             :   }
     273        2779 : }
     274             : 
     275             : grpc_platform_become_multipoller_type grpc_platform_become_multipoller =
     276             :     epoll_become_multipoller;
     277             : 
     278             : #endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */

Generated by: LCOV version 1.11