Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <grpc/support/port_platform.h>
35 :
36 : #ifdef GPR_POSIX_SOCKET
37 :
38 : #include "src/core/iomgr/pollset_posix.h"
39 :
40 : #include <errno.h>
41 : #include <stdlib.h>
42 : #include <string.h>
43 : #include <unistd.h>
44 :
45 : #include "src/core/iomgr/alarm_internal.h"
46 : #include "src/core/iomgr/fd_posix.h"
47 : #include "src/core/iomgr/iomgr_internal.h"
48 : #include "src/core/iomgr/socket_utils_posix.h"
49 : #include "src/core/profiling/timers.h"
50 : #include "src/core/support/block_annotate.h"
51 : #include <grpc/support/alloc.h>
52 : #include <grpc/support/log.h>
53 : #include <grpc/support/thd.h>
54 : #include <grpc/support/tls.h>
55 : #include <grpc/support/useful.h>
56 :
57 : GPR_TLS_DECL(g_current_thread_poller);
58 : GPR_TLS_DECL(g_current_thread_worker);
59 :
60 : /** Default poll() function - a pointer so that it can be overridden by some
61 : * tests */
62 : grpc_poll_function_type grpc_poll_function = poll;
63 :
64 : /** The alarm system needs to be able to wakeup 'some poller' sometimes
65 : * (specifically when a new alarm needs to be triggered earlier than the next
66 : * alarm 'epoch').
67 : * This wakeup_fd gives us something to alert on when such a case occurs. */
68 : grpc_wakeup_fd grpc_global_wakeup_fd;
69 :
70 7498195 : static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
71 7498195 : worker->prev->next = worker->next;
72 7498195 : worker->next->prev = worker->prev;
73 7498195 : }
74 :
75 13542693 : int grpc_pollset_has_workers(grpc_pollset *p) {
76 13542693 : return p->root_worker.next != &p->root_worker;
77 : }
78 :
79 6957610 : static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
80 6957610 : if (grpc_pollset_has_workers(p)) {
81 2598781 : grpc_pollset_worker *w = p->root_worker.next;
82 2598781 : remove_worker(p, w);
83 2598646 : return w;
84 : } else {
85 4358927 : return NULL;
86 : }
87 : }
88 :
89 2598578 : static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
90 2598578 : worker->next = &p->root_worker;
91 2598578 : worker->prev = worker->next->prev;
92 2598578 : worker->prev->next = worker->next->prev = worker;
93 2598578 : }
94 :
95 4897253 : static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
96 4897253 : worker->prev = &p->root_worker;
97 4897253 : worker->next = worker->prev->next;
98 4897253 : worker->prev->next = worker->next->prev = worker;
99 4897253 : }
100 :
101 7717423 : void grpc_pollset_kick_ext(grpc_pollset *p,
102 : grpc_pollset_worker *specific_worker,
103 : gpr_uint32 flags) {
104 : /* pollset->mu already held */
105 7717423 : if (specific_worker != NULL) {
106 760504 : if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
107 330747 : GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
108 676266 : for (specific_worker = p->root_worker.next;
109 345519 : specific_worker != &p->root_worker;
110 14772 : specific_worker = specific_worker->next) {
111 14772 : grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
112 : }
113 330747 : p->kicked_without_pollers = 1;
114 330747 : return;
115 859514 : } else if (gpr_tls_get(&g_current_thread_worker) !=
116 429757 : (gpr_intptr)specific_worker) {
117 429757 : if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
118 115613 : specific_worker->reevaluate_polling_on_wakeup = 1;
119 : }
120 429757 : grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
121 429929 : return;
122 0 : } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
123 0 : if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
124 0 : specific_worker->reevaluate_polling_on_wakeup = 1;
125 : }
126 0 : grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
127 0 : return;
128 : }
129 6956919 : } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
130 6956942 : GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
131 6956942 : specific_worker = pop_front_worker(p);
132 6954541 : if (specific_worker != NULL) {
133 5196220 : if (gpr_tls_get(&g_current_thread_worker) ==
134 2598110 : (gpr_intptr)specific_worker) {
135 0 : push_back_worker(p, specific_worker);
136 0 : specific_worker = pop_front_worker(p);
137 0 : if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
138 0 : gpr_tls_get(&g_current_thread_worker) ==
139 0 : (gpr_intptr)specific_worker) {
140 0 : push_back_worker(p, specific_worker);
141 0 : return;
142 : }
143 : }
144 2598110 : push_back_worker(p, specific_worker);
145 2598548 : grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
146 2598066 : return;
147 : } else {
148 4356431 : p->kicked_without_pollers = 1;
149 4356431 : return;
150 : }
151 : }
152 : }
153 :
154 7597688 : void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
155 7597688 : grpc_pollset_kick_ext(p, specific_worker, 0);
156 7595970 : }
157 :
158 : /* global state management */
159 :
160 2506 : void grpc_pollset_global_init(void) {
161 : gpr_tls_init(&g_current_thread_poller);
162 : gpr_tls_init(&g_current_thread_worker);
163 2506 : grpc_wakeup_fd_global_init();
164 2506 : grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
165 2506 : }
166 :
167 2506 : void grpc_pollset_global_shutdown(void) {
168 2506 : grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
169 2506 : grpc_wakeup_fd_global_destroy();
170 : gpr_tls_destroy(&g_current_thread_poller);
171 : gpr_tls_destroy(&g_current_thread_worker);
172 2506 : }
173 :
174 236 : void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
175 :
176 : /* main interface */
177 :
178 : static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
179 :
180 325418 : void grpc_pollset_init(grpc_pollset *pollset) {
181 325418 : gpr_mu_init(&pollset->mu);
182 325443 : pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
183 325443 : pollset->in_flight_cbs = 0;
184 325443 : pollset->shutting_down = 0;
185 325443 : pollset->called_shutdown = 0;
186 325443 : pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
187 325443 : become_basic_pollset(pollset, NULL);
188 325441 : }
189 :
190 2731404 : void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
191 : grpc_fd *fd) {
192 2731404 : gpr_mu_lock(&pollset->mu);
193 2733602 : pollset->vtable->add_fd(exec_ctx, pollset, fd, 1);
194 : /* the following (enabled only in debug) will reacquire and then release
195 : our lock - meaning that if the unlocking flag passed to del_fd above is
196 : not respected, the code will deadlock (in a way that we have a chance of
197 : debugging) */
198 : #ifndef NDEBUG
199 2733514 : gpr_mu_lock(&pollset->mu);
200 2733260 : gpr_mu_unlock(&pollset->mu);
201 : #endif
202 2733089 : }
203 :
204 0 : void grpc_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
205 : grpc_fd *fd) {
206 0 : gpr_mu_lock(&pollset->mu);
207 0 : pollset->vtable->del_fd(exec_ctx, pollset, fd, 1);
208 : /* the following (enabled only in debug) will reacquire and then release
209 : our lock - meaning that if the unlocking flag passed to del_fd above is
210 : not respected, the code will deadlock (in a way that we have a chance of
211 : debugging) */
212 : #ifndef NDEBUG
213 0 : gpr_mu_lock(&pollset->mu);
214 0 : gpr_mu_unlock(&pollset->mu);
215 : #endif
216 0 : }
217 :
218 325469 : static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
219 325469 : GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
220 325469 : pollset->vtable->finish_shutdown(pollset);
221 325469 : grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, 1);
222 325461 : }
223 :
224 5276078 : void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
225 : grpc_pollset_worker *worker, gpr_timespec now,
226 : gpr_timespec deadline) {
227 : /* pollset->mu already held */
228 5276078 : int added_worker = 0;
229 5276078 : int locked = 1;
230 5276078 : int queued_work = 0;
231 5276078 : int keep_polling = 0;
232 : /* this must happen before we (potentially) drop pollset->mu */
233 5276078 : worker->next = worker->prev = NULL;
234 5276078 : worker->reevaluate_polling_on_wakeup = 0;
235 : /* TODO(ctiller): pool these */
236 5276078 : grpc_wakeup_fd_init(&worker->wakeup_fd);
237 : /* If there's work waiting for the pollset to be idle, and the
238 : pollset is idle, then do that work */
239 10532191 : if (!grpc_pollset_has_workers(pollset) &&
240 5240046 : !grpc_closure_list_empty(pollset->idle_jobs)) {
241 4251 : grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
242 0 : goto done;
243 : }
244 : /* Check alarms - these are a global resource so we just ping
245 : each time through on every pollset.
246 : May update deadline to ensure timely wakeups.
247 : TODO(ctiller): can this work be localized? */
248 5287742 : if (grpc_alarm_check(exec_ctx, now, &deadline)) {
249 318 : gpr_mu_unlock(&pollset->mu);
250 318 : locked = 0;
251 318 : goto done;
252 : }
253 : /* If we're shutting down then we don't execute any extended work */
254 5278839 : if (pollset->shutting_down) {
255 0 : goto done;
256 : }
257 : /* Give do_promote priority so we don't starve it out */
258 5278839 : if (pollset->in_flight_cbs) {
259 4753 : gpr_mu_unlock(&pollset->mu);
260 4753 : locked = 0;
261 4753 : goto done;
262 : }
263 : /* Start polling, and keep doing so while we're being asked to
264 : re-evaluate our pollers (this allows poll() based pollers to
265 : ensure they don't miss wakeups) */
266 5274086 : keep_polling = 1;
267 15929639 : while (keep_polling) {
268 5368666 : keep_polling = 0;
269 5368666 : if (!pollset->kicked_without_pollers) {
270 4989074 : if (!added_worker) {
271 4892075 : push_front_worker(pollset, worker);
272 4897496 : added_worker = 1;
273 : }
274 4994495 : gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
275 4994495 : gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker);
276 4994495 : pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker,
277 : deadline, now);
278 4994115 : locked = 0;
279 4994115 : gpr_tls_set(&g_current_thread_poller, 0);
280 4994115 : gpr_tls_set(&g_current_thread_worker, 0);
281 : } else {
282 379592 : pollset->kicked_without_pollers = 0;
283 : }
284 : /* Finished execution - start cleaning up.
285 : Note that we may arrive here from outside the enclosing while() loop.
286 : In that case we won't loop though as we haven't added worker to the
287 : worker list, which means nobody could ask us to re-evaluate polling). */
288 : done:
289 5374907 : if (!locked) {
290 4999945 : queued_work |= grpc_exec_ctx_flush(exec_ctx);
291 4999903 : gpr_mu_lock(&pollset->mu);
292 5006505 : locked = 1;
293 : }
294 : /* If we're forced to re-evaluate polling (via grpc_pollset_kick with
295 : GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
296 : a loop */
297 5381467 : if (worker->reevaluate_polling_on_wakeup) {
298 98476 : worker->reevaluate_polling_on_wakeup = 0;
299 98476 : pollset->kicked_without_pollers = 0;
300 98476 : if (queued_work) {
301 : /* If there's queued work on the list, then set the deadline to be
302 : immediate so we get back out of the polling loop quickly */
303 43 : deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
304 : }
305 98476 : keep_polling = 1;
306 : }
307 : }
308 5286887 : if (added_worker) {
309 4903335 : remove_worker(pollset, worker);
310 : }
311 5286703 : grpc_wakeup_fd_destroy(&worker->wakeup_fd);
312 5276799 : if (pollset->shutting_down) {
313 9417 : if (grpc_pollset_has_workers(pollset)) {
314 7 : grpc_pollset_kick(pollset, NULL);
315 9410 : } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
316 9409 : pollset->called_shutdown = 1;
317 9409 : gpr_mu_unlock(&pollset->mu);
318 9409 : finish_shutdown(exec_ctx, pollset);
319 9409 : grpc_exec_ctx_flush(exec_ctx);
320 : /* Continuing to access pollset here is safe -- it is the caller's
321 : * responsibility to not destroy when it has outstanding calls to
322 : * grpc_pollset_work.
323 : * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
324 9409 : gpr_mu_lock(&pollset->mu);
325 1 : } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
326 1 : gpr_mu_unlock(&pollset->mu);
327 1 : grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
328 1 : grpc_exec_ctx_flush(exec_ctx);
329 1 : gpr_mu_lock(&pollset->mu);
330 : }
331 : }
332 5276799 : }
333 :
334 325468 : void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
335 : grpc_closure *closure) {
336 325468 : int call_shutdown = 0;
337 325468 : gpr_mu_lock(&pollset->mu);
338 325468 : GPR_ASSERT(!pollset->shutting_down);
339 325468 : pollset->shutting_down = 1;
340 650932 : if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
341 325465 : !grpc_pollset_has_workers(pollset)) {
342 316057 : pollset->called_shutdown = 1;
343 316057 : call_shutdown = 1;
344 : }
345 325467 : if (!grpc_pollset_has_workers(pollset)) {
346 316059 : grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
347 : }
348 325461 : pollset->shutdown_done = closure;
349 325461 : grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
350 325467 : gpr_mu_unlock(&pollset->mu);
351 :
352 325469 : if (call_shutdown) {
353 316058 : finish_shutdown(exec_ctx, pollset);
354 : }
355 325461 : }
356 :
357 325421 : void grpc_pollset_destroy(grpc_pollset *pollset) {
358 325421 : GPR_ASSERT(pollset->shutting_down);
359 325421 : GPR_ASSERT(pollset->in_flight_cbs == 0);
360 325421 : GPR_ASSERT(!grpc_pollset_has_workers(pollset));
361 325463 : pollset->vtable->destroy(pollset);
362 325459 : gpr_mu_destroy(&pollset->mu);
363 325460 : }
364 :
365 4999463 : int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
366 : gpr_timespec now) {
367 : gpr_timespec timeout;
368 : static const int max_spin_polling_us = 10;
369 4999463 : if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
370 13883 : return -1;
371 : }
372 4988745 : if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
373 : max_spin_polling_us,
374 : GPR_TIMESPAN))) <= 0) {
375 2358600 : return 0;
376 : }
377 2629477 : timeout = gpr_time_sub(deadline, now);
378 2629408 : return gpr_time_to_millis(gpr_time_add(
379 : timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
380 : }
381 :
382 : /*
383 : * basic_pollset - a vtable that provides polling for zero or one file
384 : * descriptor via poll()
385 : */
386 :
387 : typedef struct grpc_unary_promote_args {
388 : const grpc_pollset_vtable *original_vtable;
389 : grpc_pollset *pollset;
390 : grpc_fd *fd;
391 : grpc_closure promotion_closure;
392 : } grpc_unary_promote_args;
393 :
394 5356 : static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, int success) {
395 5356 : grpc_unary_promote_args *up_args = args;
396 5356 : const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
397 5356 : grpc_pollset *pollset = up_args->pollset;
398 5356 : grpc_fd *fd = up_args->fd;
399 :
400 : /*
401 : * This is quite tricky. There are a number of cases to keep in mind here:
402 : * 1. fd may have been orphaned
403 : * 2. The pollset may no longer be a unary poller (and we can't let case #1
404 : * leak to other pollset types!)
405 : * 3. pollset's fd (which may have changed) may have been orphaned
406 : * 4. The pollset may be shutting down.
407 : */
408 :
409 5356 : gpr_mu_lock(&pollset->mu);
410 : /* First we need to ensure that nobody is polling concurrently */
411 5356 : GPR_ASSERT(!grpc_pollset_has_workers(pollset));
412 :
413 5356 : gpr_free(up_args);
414 : /* At this point the pollset may no longer be a unary poller. In that case
415 : * we should just call the right add function and be done. */
416 : /* TODO(klempner): If we're not careful this could cause infinite recursion.
417 : * That's not a problem for now because empty_pollset has a trivial poller
418 : * and we don't have any mechanism to unbecome multipoller. */
419 5356 : pollset->in_flight_cbs--;
420 5356 : if (pollset->shutting_down) {
421 : /* We don't care about this pollset anymore. */
422 4 : if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
423 3 : pollset->called_shutdown = 1;
424 3 : finish_shutdown(exec_ctx, pollset);
425 : }
426 5352 : } else if (grpc_fd_is_orphaned(fd)) {
427 : /* Don't try to add it to anything, we'll drop our ref on it below */
428 5342 : } else if (pollset->vtable != original_vtable) {
429 1060 : pollset->vtable->add_fd(exec_ctx, pollset, fd, 0);
430 4282 : } else if (fd != pollset->data.ptr) {
431 : grpc_fd *fds[2];
432 4236 : fds[0] = pollset->data.ptr;
433 4236 : fds[1] = fd;
434 :
435 4236 : if (fds[0] && !grpc_fd_is_orphaned(fds[0])) {
436 1049 : grpc_platform_become_multipoller(exec_ctx, pollset, fds,
437 : GPR_ARRAY_SIZE(fds));
438 1049 : GRPC_FD_UNREF(fds[0], "basicpoll");
439 : } else {
440 : /* old fd is orphaned and we haven't cleaned it up until now, so remain a
441 : * unary poller */
442 : /* Note that it is possible that fds[1] is also orphaned at this point.
443 : * That's okay, we'll correct it at the next add or poll. */
444 3187 : if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll");
445 3187 : pollset->data.ptr = fd;
446 3187 : GRPC_FD_REF(fd, "basicpoll");
447 : }
448 : }
449 :
450 5356 : gpr_mu_unlock(&pollset->mu);
451 :
452 : /* Matching ref in basic_pollset_add_fd */
453 5356 : GRPC_FD_UNREF(fd, "basicpoll_add");
454 5356 : }
455 :
456 1338525 : static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
457 : grpc_fd *fd, int and_unlock_pollset) {
458 : grpc_unary_promote_args *up_args;
459 1338525 : GPR_ASSERT(fd);
460 1338525 : if (fd == pollset->data.ptr) goto exit;
461 :
462 327306 : if (!grpc_pollset_has_workers(pollset)) {
463 : /* Fast path -- no in flight cbs */
464 : /* TODO(klempner): Comment this out and fix any test failures or establish
465 : * they are due to timing issues */
466 : grpc_fd *fds[2];
467 321952 : fds[0] = pollset->data.ptr;
468 321952 : fds[1] = fd;
469 :
470 321952 : if (fds[0] == NULL) {
471 321061 : pollset->data.ptr = fd;
472 321061 : GRPC_FD_REF(fd, "basicpoll");
473 891 : } else if (!grpc_fd_is_orphaned(fds[0])) {
474 839 : grpc_platform_become_multipoller(exec_ctx, pollset, fds,
475 : GPR_ARRAY_SIZE(fds));
476 839 : GRPC_FD_UNREF(fds[0], "basicpoll");
477 : } else {
478 : /* old fd is orphaned and we haven't cleaned it up until now, so remain a
479 : * unary poller */
480 52 : GRPC_FD_UNREF(fds[0], "basicpoll");
481 52 : pollset->data.ptr = fd;
482 52 : GRPC_FD_REF(fd, "basicpoll");
483 : }
484 321926 : goto exit;
485 : }
486 :
487 : /* Now we need to promote. This needs to happen when we're not polling. Since
488 : * this may be called from poll, the wait needs to happen asynchronously. */
489 5356 : GRPC_FD_REF(fd, "basicpoll_add");
490 5356 : pollset->in_flight_cbs++;
491 5356 : up_args = gpr_malloc(sizeof(*up_args));
492 5356 : up_args->fd = fd;
493 5356 : up_args->original_vtable = pollset->vtable;
494 5356 : up_args->pollset = pollset;
495 5356 : up_args->promotion_closure.cb = basic_do_promote;
496 5356 : up_args->promotion_closure.cb_arg = up_args;
497 :
498 5356 : grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1);
499 5356 : grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
500 :
501 : exit:
502 1337577 : if (and_unlock_pollset) {
503 1337577 : gpr_mu_unlock(&pollset->mu);
504 : }
505 1338589 : }
506 :
507 0 : static void basic_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
508 : grpc_fd *fd, int and_unlock_pollset) {
509 0 : GPR_ASSERT(fd);
510 0 : if (fd == pollset->data.ptr) {
511 0 : GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
512 0 : pollset->data.ptr = NULL;
513 : }
514 :
515 0 : if (and_unlock_pollset) {
516 0 : gpr_mu_unlock(&pollset->mu);
517 : }
518 0 : }
519 :
520 1101850 : static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
521 : grpc_pollset *pollset,
522 : grpc_pollset_worker *worker,
523 : gpr_timespec deadline,
524 : gpr_timespec now) {
525 : #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
526 : #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
527 :
528 : struct pollfd pfd[3];
529 : grpc_fd *fd;
530 : grpc_fd_watcher fd_watcher;
531 : int timeout;
532 : int r;
533 : nfds_t nfds;
534 :
535 1101850 : fd = pollset->data.ptr;
536 1101850 : if (fd && grpc_fd_is_orphaned(fd)) {
537 56 : GRPC_FD_UNREF(fd, "basicpoll");
538 56 : fd = pollset->data.ptr = NULL;
539 : }
540 1101852 : timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
541 1101886 : pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
542 1101886 : pfd[0].events = POLLIN;
543 1101886 : pfd[0].revents = 0;
544 1101886 : pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
545 1101886 : pfd[1].events = POLLIN;
546 1101886 : pfd[1].revents = 0;
547 1101886 : nfds = 2;
548 1101886 : if (fd) {
549 1096833 : pfd[2].fd = fd->fd;
550 1096833 : pfd[2].revents = 0;
551 1096833 : GRPC_FD_REF(fd, "basicpoll_begin");
552 1096882 : gpr_mu_unlock(&pollset->mu);
553 1096881 : pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN,
554 : POLLOUT, &fd_watcher);
555 1096772 : if (pfd[2].events != 0) {
556 814266 : nfds++;
557 : }
558 : } else {
559 5053 : gpr_mu_unlock(&pollset->mu);
560 : }
561 :
562 : /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
563 : even going into the blocking annotation if possible */
564 : /* poll fd count (argument 2) is shortened by one if we have no events
565 : to poll on - such that it only includes the kicker */
566 : GRPC_SCHEDULING_START_BLOCKING_REGION;
567 1101825 : r = grpc_poll_function(pfd, nfds, timeout);
568 : GRPC_SCHEDULING_END_BLOCKING_REGION;
569 : GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
570 :
571 1101896 : if (r < 0) {
572 48 : gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
573 48 : if (fd) {
574 48 : grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
575 : }
576 1101848 : } else if (r == 0) {
577 89211 : if (fd) {
578 87450 : grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
579 : }
580 : } else {
581 1012637 : if (pfd[0].revents & POLLIN_CHECK) {
582 71 : grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
583 : }
584 1012667 : if (pfd[1].revents & POLLIN_CHECK) {
585 241454 : grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
586 : }
587 1012660 : if (nfds > 2) {
588 793644 : grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
589 793644 : pfd[2].revents & POLLOUT_CHECK);
590 219016 : } else if (fd) {
591 215724 : grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
592 : }
593 : }
594 :
595 1101845 : if (fd) {
596 1096792 : GRPC_FD_UNREF(fd, "basicpoll_begin");
597 : }
598 1101929 : }
599 :
600 646612 : static void basic_pollset_destroy(grpc_pollset *pollset) {
601 646612 : if (pollset->data.ptr != NULL) {
602 322308 : GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
603 322309 : pollset->data.ptr = NULL;
604 : }
605 646613 : }
606 :
607 : static const grpc_pollset_vtable basic_pollset = {
608 : basic_pollset_add_fd, basic_pollset_del_fd,
609 : basic_pollset_maybe_work_and_unlock, basic_pollset_destroy,
610 : basic_pollset_destroy};
611 :
612 325442 : static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
613 325442 : pollset->vtable = &basic_pollset;
614 325442 : pollset->data.ptr = fd_or_null;
615 325442 : if (fd_or_null != NULL) {
616 0 : GRPC_FD_REF(fd_or_null, "basicpoll");
617 : }
618 325442 : }
619 :
620 : #endif /* GPR_POSIX_POLLSET */
|