Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <grpc/support/port_platform.h>
35 :
36 : #ifdef GPR_LINUX_MULTIPOLL_WITH_EPOLL
37 :
38 : #include <errno.h>
39 : #include <poll.h>
40 : #include <string.h>
41 : #include <sys/epoll.h>
42 : #include <unistd.h>
43 :
44 : #include <grpc/support/alloc.h>
45 : #include <grpc/support/log.h>
46 : #include "src/core/iomgr/fd_posix.h"
47 : #include "src/core/support/block_annotate.h"
48 : #include "src/core/profiling/timers.h"
49 :
50 : typedef struct {
51 : grpc_pollset *pollset;
52 : grpc_fd *fd;
53 : grpc_closure closure;
54 : } delayed_add;
55 :
56 : typedef struct { int epoll_fd; } pollset_hdr;
57 :
58 2237151 : static void finally_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
59 : grpc_fd *fd) {
60 2237151 : pollset_hdr *h = pollset->data.ptr;
61 : struct epoll_event ev;
62 : int err;
63 : grpc_fd_watcher watcher;
64 :
65 : /* We pretend to be polling whilst adding an fd to keep the fd from being
66 : closed during the add. This may result in a spurious wakeup being assigned
67 : to this pollset whilst adding, but that should be benign. */
68 2237151 : GPR_ASSERT(grpc_fd_begin_poll(fd, pollset, NULL, 0, 0, &watcher) == 0);
69 2237155 : if (watcher.fd != NULL) {
70 2237155 : ev.events = (uint32_t)(EPOLLIN | EPOLLOUT | EPOLLET);
71 2237155 : ev.data.ptr = fd;
72 2237155 : err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD, fd->fd, &ev);
73 2237118 : if (err < 0) {
74 : /* FDs may be added to a pollset multiple times, so EEXIST is normal. */
75 2227889 : if (errno != EEXIST) {
76 0 : gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s", fd->fd,
77 0 : strerror(errno));
78 : }
79 : }
80 : }
81 2237120 : grpc_fd_end_poll(exec_ctx, &watcher, 0, 0);
82 2236981 : }
83 :
84 5663 : static void perform_delayed_add(grpc_exec_ctx *exec_ctx, void *arg,
85 : int iomgr_status) {
86 5616 : delayed_add *da = arg;
87 :
88 5663 : if (!grpc_fd_is_orphaned(da->fd)) {
89 5662 : finally_add_fd(exec_ctx, da->pollset, da->fd);
90 : }
91 :
92 5663 : gpr_mu_lock(&da->pollset->mu);
93 5663 : da->pollset->in_flight_cbs--;
94 5663 : if (da->pollset->shutting_down) {
95 : /* We don't care about this pollset anymore. */
96 2 : if (da->pollset->in_flight_cbs == 0 && !da->pollset->called_shutdown) {
97 1 : da->pollset->called_shutdown = 1;
98 1 : grpc_exec_ctx_enqueue(exec_ctx, da->pollset->shutdown_done, 1);
99 : }
100 : }
101 5663 : gpr_mu_unlock(&da->pollset->mu);
102 :
103 5663 : GRPC_FD_UNREF(da->fd, "delayed_add");
104 :
105 5663 : gpr_free(da);
106 5663 : }
107 :
108 2237153 : static void multipoll_with_epoll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
109 : grpc_pollset *pollset,
110 : grpc_fd *fd,
111 : int and_unlock_pollset) {
112 2237153 : if (and_unlock_pollset) {
113 2231490 : gpr_mu_unlock(&pollset->mu);
114 2231490 : finally_add_fd(exec_ctx, pollset, fd);
115 : } else {
116 5663 : delayed_add *da = gpr_malloc(sizeof(*da));
117 5663 : da->pollset = pollset;
118 5663 : da->fd = fd;
119 5663 : GRPC_FD_REF(fd, "delayed_add");
120 5663 : grpc_closure_init(&da->closure, perform_delayed_add, da);
121 5663 : pollset->in_flight_cbs++;
122 5663 : grpc_exec_ctx_enqueue(exec_ctx, &da->closure, 1);
123 : }
124 2237156 : }
125 :
126 0 : static void multipoll_with_epoll_pollset_del_fd(grpc_exec_ctx *exec_ctx,
127 : grpc_pollset *pollset,
128 : grpc_fd *fd,
129 : int and_unlock_pollset) {
130 0 : pollset_hdr *h = pollset->data.ptr;
131 : int err;
132 :
133 0 : if (and_unlock_pollset) {
134 0 : gpr_mu_unlock(&pollset->mu);
135 : }
136 :
137 : /* Note that this can race with concurrent poll, but that should be fine since
138 : * at worst it creates a spurious read event on a reused grpc_fd object. */
139 0 : err = epoll_ctl(h->epoll_fd, EPOLL_CTL_DEL, fd->fd, NULL);
140 0 : if (err < 0) {
141 0 : gpr_log(GPR_ERROR, "epoll_ctl del for %d failed: %s", fd->fd,
142 0 : strerror(errno));
143 : }
144 0 : }
145 :
146 : /* TODO(klempner): We probably want to turn this down a bit */
147 : #define GRPC_EPOLL_MAX_EVENTS 1000
148 :
149 2001592 : static void multipoll_with_epoll_pollset_maybe_work_and_unlock(
150 : grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
151 : gpr_timespec deadline, gpr_timespec now) {
152 : struct epoll_event ep_ev[GRPC_EPOLL_MAX_EVENTS];
153 : int ep_rv;
154 : int poll_rv;
155 2001592 : pollset_hdr *h = pollset->data.ptr;
156 : int timeout_ms;
157 : struct pollfd pfds[2];
158 :
159 : /* If you want to ignore epoll's ability to sanely handle parallel pollers,
160 : * for a more apples-to-apples performance comparison with poll, add a
161 : * if (pollset->counter != 0) { return 0; }
162 : * here.
163 : */
164 :
165 2001592 : gpr_mu_unlock(&pollset->mu);
166 :
167 2001743 : timeout_ms = grpc_poll_deadline_to_millis_timeout(deadline, now);
168 :
169 2001759 : pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
170 2001759 : pfds[0].events = POLLIN;
171 2001759 : pfds[0].revents = 0;
172 2001759 : pfds[1].fd = h->epoll_fd;
173 2001759 : pfds[1].events = POLLIN;
174 2001759 : pfds[1].revents = 0;
175 :
176 : /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
177 : even going into the blocking annotation if possible */
178 : GPR_TIMER_BEGIN("poll", 0);
179 : GRPC_SCHEDULING_START_BLOCKING_REGION;
180 2001759 : poll_rv = grpc_poll_function(pfds, 2, timeout_ms);
181 : GRPC_SCHEDULING_END_BLOCKING_REGION;
182 : GPR_TIMER_END("poll", 0);
183 :
184 2001376 : if (poll_rv < 0) {
185 44 : if (errno != EINTR) {
186 0 : gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
187 : }
188 2001332 : } else if (poll_rv == 0) {
189 : /* do nothing */
190 : } else {
191 1902865 : if (pfds[0].revents) {
192 74058 : grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
193 : }
194 1902884 : if (pfds[1].revents) {
195 : do {
196 : /* The following epoll_wait never blocks; it has a timeout of 0 */
197 1850937 : ep_rv = epoll_wait(h->epoll_fd, ep_ev, GRPC_EPOLL_MAX_EVENTS, 0);
198 1850909 : if (ep_rv < 0) {
199 0 : if (errno != EINTR) {
200 0 : gpr_log(GPR_ERROR, "epoll_wait() failed: %s", strerror(errno));
201 : }
202 : } else {
203 : int i;
204 3810271 : for (i = 0; i < ep_rv; ++i) {
205 1959753 : grpc_fd *fd = ep_ev[i].data.ptr;
206 : /* TODO(klempner): We might want to consider making err and pri
207 : * separate events */
208 1959753 : int cancel = ep_ev[i].events & (EPOLLERR | EPOLLHUP);
209 1959753 : int read_ev = ep_ev[i].events & (EPOLLIN | EPOLLPRI);
210 1959753 : int write_ev = ep_ev[i].events & EPOLLOUT;
211 1959753 : if (fd == NULL) {
212 114 : grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
213 : } else {
214 1959639 : if (read_ev || cancel) {
215 1854376 : grpc_fd_become_readable(exec_ctx, fd);
216 : }
217 1959977 : if (write_ev || cancel) {
218 1958496 : grpc_fd_become_writable(exec_ctx, fd);
219 : }
220 : }
221 : }
222 : }
223 1851193 : } while (ep_rv == GRPC_EPOLL_MAX_EVENTS);
224 : }
225 : }
226 2001651 : }
227 :
228 2755 : static void multipoll_with_epoll_pollset_finish_shutdown(
229 2755 : grpc_pollset *pollset) {}
230 :
231 2753 : static void multipoll_with_epoll_pollset_destroy(grpc_pollset *pollset) {
232 2753 : pollset_hdr *h = pollset->data.ptr;
233 2753 : close(h->epoll_fd);
234 2753 : gpr_free(h);
235 2753 : }
236 :
237 : static const grpc_pollset_vtable multipoll_with_epoll_pollset = {
238 : multipoll_with_epoll_pollset_add_fd, multipoll_with_epoll_pollset_del_fd,
239 : multipoll_with_epoll_pollset_maybe_work_and_unlock,
240 : multipoll_with_epoll_pollset_finish_shutdown,
241 : multipoll_with_epoll_pollset_destroy};
242 :
243 2779 : static void epoll_become_multipoller(grpc_exec_ctx *exec_ctx,
244 : grpc_pollset *pollset, grpc_fd **fds,
245 : size_t nfds) {
246 : size_t i;
247 2779 : pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
248 : struct epoll_event ev;
249 : int err;
250 :
251 2779 : pollset->vtable = &multipoll_with_epoll_pollset;
252 2779 : pollset->data.ptr = h;
253 2779 : h->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
254 2779 : if (h->epoll_fd < 0) {
255 : /* TODO(klempner): Fall back to poll here, especially on ENOSYS */
256 0 : gpr_log(GPR_ERROR, "epoll_create1 failed: %s", strerror(errno));
257 0 : abort();
258 : }
259 :
260 2779 : ev.events = (uint32_t)(EPOLLIN | EPOLLET);
261 2779 : ev.data.ptr = NULL;
262 2779 : err = epoll_ctl(h->epoll_fd, EPOLL_CTL_ADD,
263 : GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd), &ev);
264 2779 : if (err < 0) {
265 0 : gpr_log(GPR_ERROR, "epoll_ctl add for %d failed: %s",
266 : GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd),
267 0 : strerror(errno));
268 : }
269 :
270 8314 : for (i = 0; i < nfds; i++) {
271 5558 : multipoll_with_epoll_pollset_add_fd(exec_ctx, pollset, fds[i], 0);
272 : }
273 2779 : }
274 :
275 : grpc_platform_become_multipoller_type grpc_platform_become_multipoller =
276 : epoll_become_multipoller;
277 :
278 : #endif /* GPR_LINUX_MULTIPOLL_WITH_EPOLL */
|