Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <grpc/support/port_platform.h>
35 :
36 : #ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
37 :
38 : #include <errno.h>
39 : #include <poll.h>
40 : #include <string.h>
41 : #include <sys/epoll.h>
42 : #include <unistd.h>
43 :
44 : #include "src/core/iomgr/fd_posix.h"
45 : #include "src/core/support/block_annotate.h"
46 : #include <grpc/support/alloc.h>
47 : #include <grpc/support/log.h>
48 :
49 : typedef struct wakeup_fd_hdl {
50 : grpc_wakeup_fd wakeup_fd;
51 : struct wakeup_fd_hdl *next;
52 : } wakeup_fd_hdl;
53 :
54 : typedef struct {
55 : grpc_pollset *pollset;
56 : grpc_fd *fd;
57 : grpc_closure closure;
58 : } delayed_add;
59 :
60 : typedef struct {
61 : int epoll_fd;
62 : wakeup_fd_hdl *free_wakeup_fds;
63 : } pollset_hdr;
64 :
65 1397628 : static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
66 : grpc_fd *fd) {
67 1397628 : pollset_hdr *h = pollset->data.ptr;
68 : struct epoll_event ev;
69 : int err;
70 : grpc_fd_watcher watcher;
71 :
72 : /* We pretend to be polling whilst adding an fd to keep the fd from being
73 : closed during the add. This may result in a spurious wakeup being assigned
74 : to this pollset whilst adding, but that should be benign. */
75 1397628 : GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0);
76 1397815 : if (watcher.fd != NULL) {
77 1397553 : ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
78 1397553 : ev.data.ptr = fd;
79 1397553 : err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
80 1397416 : if (err < 0) {
81 : /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
82 1392559 : if (errno != EEXIST) {
83 0 : gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
84 0 : strerror(errno));
85 : }
86 : }
87 : }
88 1397780 : grpc_fd_end_poll(exec_ctx, &watcher, 0, 0);
89 1397726 : }
90 :
91 3987 : static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
92 : int iomgr_status) {
93 3987 : delayed_add *da = arg;
94 :
95 3987 : if (!grpc_fd_is_orphaned(da->fd)) {
96 3987 : finally_add_fd(exec_ctx, da->pollset, da->fd);
97 : }
98 :
99 3987 : gpr_mu_lock(&da->pollset->mu);
100 3987 : da->pollset->in_flight_cbs--;
101 3987 : if (da->pollset->shutting_down) {
102 : /* We don't care about this pollset anymore. */
103 0 : if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
104 0 : da->pollset->called_shutdown = 1;
105 0 : grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, 1);
106 : }
107 : }
108 3987 : gpr_mu_unlock(&da->pollset->mu);
109 :
110 3987 : GRPC_FD_UNREF(da->fd, "delayed_add");
111 :
112 3987 : gpr_free(da);
113 3987 : }
114 :
115 1396876 : static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
116 : grpc_pollset *pollset,
117 : grpc_fd *fd,
118 : int and_unlock_pollset) {
119 1396876 : if (and_unlock_pollset) {
120 1392889 : gpr_mu_unlock(&pollset->mu);
121 1393642 : finally_add_fd(exec_ctx, pollset, fd);
122 : } else {
123 3987 : delayed_add *da = gpr_malloc(sizeof(*da));
124 3987 : da->pollset = pollset;
125 3987 : da->fd = fd;
126 3987 : GRPC_FD_REF(fd, "delayed_add");
127 3987 : grpc_closure_init(&da->closure, perform_delayed_add, da);
128 3987 : pollset->in_flight_cbs++;
129 3987 : grpc_exec_ctx_enqueue(exec_ctx, &da->closure, 1);
130 : }
131 1397820 : }
132 :
133 0 : static void multipoll_with_epoll_pollset_del_fd(grpc_exec_ctx *exec_ctx,
134 : grpc_pollset *pollset,
135 : grpc_fd *fd,
136 : int and_unlock_pollset) {
137 0 : pollset_hdr *h = pollset->data.ptr;
138 : int err;
139 :
140 0 : if (and_unlock_pollset) {
141 0 : gpr_mu_unlock(&pollset->mu);
142 : }
143 :
144 : /* Note that this can race with concurrent poll, but that should be fine since
145 : * at worst it creates a spurious read event on a reused grpc_fd object. */
146 0 : err = epoll_ctl(h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
147 0 : if (err < 0) {
148 0 : gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd->fd,
149 0 : strerror(errno));
150 : }
151 0 : }
152 :
153 : /* TODO(klempner): We probably want to turn this down a bit */
154 : #define GRPC_EPOLL_MAX_EVENTS 1000
155 :
156 3881885 : static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
157 : grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
158 : gpr_timespec deadline, gpr_timespec now) {
159 : struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
160 : int ep_rv;
161 : int poll_rv;
162 3881885 : pollset_hdr *h = pollset->data.ptr;
163 : int timeout_ms;
164 : struct pollfd pfds[2];
165 :
166 : /* If you want to ignore epoll's ability to sanely handle parallel pollers,
167 : * for a more apples-to-apples performance comparison with poll, add a
168 : * if (pollset->counter != 0) { return 0; }
169 : * here.
170 : */
171 :
172 3881885 : gpr_mu_unlock(&pollset->mu);
173 :
174 3895066 : timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
175 :
176 3897850 : pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
177 3897850 : pfds[0].events = POLLIN;
178 3897850 : pfds[0].revents = 0;
179 3897850 : pfds[1].fd = h->epoll_fd;
180 3897850 : pfds[1].events = POLLIN;
181 3897850 : pfds[1].revents = 0;
182 :
183 : /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
184 : even going into the blocking annotation if possible */
185 : GRPC_SCHEDULING_START_BLOCKING_REGION;
186 3897850 : poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
187 : GRPC_SCHEDULING_END_BLOCKING_REGION;
188 :
189 3896005 : if (poll_rv < 0) {
190 138 : if (errno != EINTR) {
191 0 : gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
192 : }
193 3895867 : } else if (poll_rv == 0) {
194 : /* do nothing */
195 : } else {
196 1670499 : if (pfds[0].revents) {
197 242544 : grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
198 : }
199 1670491 : if (pfds[1].revents) {
200 : do {
201 : /* The following epoll_wait never blocks; it has a timeout of 0 */
202 1450239 : ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
203 1450097 : if (ep_rv < 0) {
204 0 : if (errno != EINTR) {
205 0 : gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
206 : }
207 : } else {
208 : int i;
209 2996505 : for (i = 0; i < ep_rv; ++i) {
210 1546002 : grpc_fd *fd = ep_ev[i].data.ptr;
211 : /* TODO(klempner): We might want to consider making err and pri
212 : * separate events */
213 1546002 : int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
214 1546002 : int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
215 1546002 : int write_ev = ep_ev[i].events & EPOLLOUT;
216 1546002 : if (fd == NULL) {
217 0 : grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
218 : } else {
219 1546055 : if (read_ev || cancel) {
220 1020966 : grpc_fd_become_readable(exec_ctx, fd);
221 : }
222 1546359 : if (write_ev || cancel) {
223 1545962 : grpc_fd_become_writable(exec_ctx, fd);
224 : }
225 : }
226 : }
227 : }
228 1450503 : } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
229 : }
230 : }
231 3896261 : }
232 :
233 1586 : static void multipoll_with_epoll_pollset_finish_shutdown(
234 1586 : grpc_pollset *pollset) {}
235 :
236 1586 : static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
237 1586 : pollset_hdr *h = pollset->data.ptr;
238 1586 : close(h->epoll_fd);
239 1586 : gpr_free(h);
240 1586 : }
241 :
242 : static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
243 : multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd,
244 : multipoll_with_epoll_pollset_maybe_work_and_unlock,
245 : multipoll_with_epoll_pollset_finish_shutdown,
246 : multipoll_with_epoll_pollset_destroy};
247 :
248 1586 : static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
249 : grpc_pollset *pollset, grpc_fd **fds,
250 : size_t nfds) {
251 : size_t i;
252 1586 : pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
253 : struct epoll_event ev;
254 : int err;
255 :
256 1586 : pollset->vtable = &multipoll_with_epoll_pollset;
257 1586 : pollset->data.ptr = h;
258 1586 : h->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
259 1586 : if (h->epoll_fd < 0) {
260 : /* TODO(klempner): Fall back to poll here, especially on ENOSYS */
261 0 : gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
262 0 : abort();
263 : }
264 :
265 1586 : ev.events = (uint32_t)(EPOLLIN | EPOLLET);
266 1586 : ev.data.ptr = NULL;
267 1586 : err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD,
268 : GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
269 1586 : if (err < 0) {
270 0 : gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s",
271 : GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd),
272 0 : strerror(errno));
273 : }
274 :
275 4758 : for (i = 0; i < nfds; i++) {
276 3172 : multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0);
277 : }
278 1586 : }
279 :
280 : grpc_platform_become_multipoller_type grpc_platform_become_multipoller =
281 : epoll_become_multipoller;
282 :
283 : #endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
|