LCOV - code coverage report
Current view: top level - core/iomgr - pollset_posix.c (source / functions) Hit Total Coverage
Test: tmp.CaZ6RjdVn2 Lines: 307 332 92.5 %
Date: 2015-12-10 22:15:08 Functions: 23 25 92.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include <grpc/support/port_platform.h>
      35             : 
      36             : #ifdef GPR_POSIX_SOCKET
      37             : 
      38             : #include "src/core/iomgr/pollset_posix.h"
      39             : 
      40             : #include <errno.h>
      41             : #include <stdlib.h>
      42             : #include <string.h>
      43             : #include <unistd.h>
      44             : 
      45             : #include "src/core/iomgr/timer_internal.h"
      46             : #include "src/core/iomgr/fd_posix.h"
      47             : #include "src/core/iomgr/iomgr_internal.h"
      48             : #include "src/core/iomgr/socket_utils_posix.h"
      49             : #include "src/core/profiling/timers.h"
      50             : #include "src/core/support/block_annotate.h"
      51             : #include <grpc/support/alloc.h>
      52             : #include <grpc/support/log.h>
      53             : #include <grpc/support/thd.h>
      54             : #include <grpc/support/tls.h>
      55             : #include <grpc/support/useful.h>
      56             : 
      57             : GPR_TLS_DECL(g_current_thread_poller);
      58             : GPR_TLS_DECL(g_current_thread_worker);
      59             : 
      60             : /** Default poll() function - a pointer so that it can be overridden by some
      61             :  *  tests */
      62             : grpc_poll_function_type grpc_poll_function = poll;
      63             : 
      64             : /** The alarm system needs to be able to wakeup 'some poller' sometimes
      65             :  *  (specifically when a new alarm needs to be triggered earlier than the next
      66             :  *  alarm 'epoch').
      67             :  *  This wakeup_fd gives us something to alert on when such a case occurs. */
      68             : grpc_wakeup_fd grpc_global_wakeup_fd;
      69             : 
      70    10601190 : static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
      71    10603296 :   worker->prev->next = worker->next;
      72    10603296 :   worker->next->prev = worker->prev;
      73    10601190 : }
      74             : 
      75    22443209 : int grpc_pollset_has_workers(grpc_pollset *p) {
      76    22443209 :   return p->root_worker.next != &p->root_worker;
      77             : }
      78             : 
      79    13766310 : static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
      80    13766310 :   if (grpc_pollset_has_workers(p)) {
      81     7168076 :     grpc_pollset_worker *w = p->root_worker.next;
      82     7167155 :     remove_worker(p, w);
      83     7168688 :     return w;
      84             :   } else {
      85     6611840 :     return NULL;
      86             :   }
      87             : }
      88             : 
      89     7167796 : static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
      90     7168717 :   worker->next = &p->root_worker;
      91     7168717 :   worker->prev = worker->next->prev;
      92     7168717 :   worker->prev->next = worker->next->prev = worker;
      93     7167796 : }
      94             : 
      95     3435105 : static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
      96     3436290 :   worker->prev = &p->root_worker;
      97     3436290 :   worker->next = worker->prev->next;
      98     3436290 :   worker->prev->next = worker->next->prev = worker;
      99     3435105 : }
     100             : 
     101    11553061 : void grpc_pollset_kick_ext(grpc_pollset *p,
     102             :                            grpc_pollset_worker *specific_worker,
     103             :                            gpr_uint32 flags) {
     104             :   GPR_TIMER_BEGIN("grpc_pollset_kick_ext", 0);
     105             : 
     106             :   /* pollset->mu already held */
     107    11553061 :   if (specific_worker != NULL) {
     108      946823 :     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
     109             :       GPR_TIMER_BEGIN("grpc_pollset_kick_ext.broadcast", 0);
     110      437477 :       GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
     111      888642 :       for (specific_worker = p->root_worker.next;
     112      451165 :            specific_worker != &p->root_worker;
     113       13688 :            specific_worker = specific_worker->next) {
     114       13688 :         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
     115             :       }
     116      437477 :       p->kicked_without_pollers = 1;
     117             :       GPR_TIMER_END("grpc_pollset_kick_ext.broadcast", 0);
     118     1018692 :     } else if (gpr_tls_get(&g_current_thread_worker) !=
     119      509346 :                (gpr_intptr)specific_worker) {
     120             :       GPR_TIMER_MARK("different_thread_worker", 0);
     121      272817 :       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
     122       89045 :         specific_worker->reevaluate_polling_on_wakeup = 1;
     123             :       }
     124      272817 :       specific_worker->kicked_specifically = 1;
     125      272817 :       grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
     126      236529 :     } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
     127             :       GPR_TIMER_MARK("kick_yoself", 0);
     128           0 :       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
     129           0 :         specific_worker->reevaluate_polling_on_wakeup = 1;
     130             :       }
     131           0 :       specific_worker->kicked_specifically = 1;
     132           0 :       grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
     133             :     }
     134    10606238 :   } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
     135    10611630 :     GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
     136             :     GPR_TIMER_MARK("kick_anonymous", 0);
     137    10611630 :     specific_worker = pop_front_worker(p);
     138    10609756 :     if (specific_worker != NULL) {
     139     8001376 :       if (gpr_tls_get(&g_current_thread_worker) ==
     140     4000688 :           (gpr_intptr)specific_worker) {
     141             :         GPR_TIMER_MARK("kick_anonymous_not_self", 0);
     142     3171364 :         push_back_worker(p, specific_worker);
     143     3171576 :         specific_worker = pop_front_worker(p);
     144     6342983 :         if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
     145     3171496 :             gpr_tls_get(&g_current_thread_worker) ==
     146     3171496 :                 (gpr_intptr)specific_worker) {
     147     3171226 :           push_back_worker(p, specific_worker);
     148     3171228 :           specific_worker = NULL;
     149             :         }
     150             :       }
     151     4000590 :       if (specific_worker != NULL) {
     152             :         GPR_TIMER_MARK("finally_kick", 0);
     153      828921 :         push_back_worker(p, specific_worker);
     154      829461 :         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd->fd);
     155             :       }
     156             :     } else {
     157             :       GPR_TIMER_MARK("kicked_no_pollers", 0);
     158     6609068 :       p->kicked_without_pollers = 1;
     159             :     }
     160             :   }
     161             : 
     162             :   GPR_TIMER_END("grpc_pollset_kick_ext", 0);
     163    11550843 : }
     164             : 
     165    11460860 : void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
     166    11460860 :   grpc_pollset_kick_ext(p, specific_worker, 0);
     167    11458715 : }
     168             : 
     169             : /* global state management */
     170             : 
     171        3457 : void grpc_pollset_global_init(void) {
     172             :   gpr_tls_init(&g_current_thread_poller);
     173             :   gpr_tls_init(&g_current_thread_worker);
     174        3457 :   grpc_wakeup_fd_global_init();
     175        3457 :   grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
     176        3457 : }
     177             : 
     178        3455 : void grpc_pollset_global_shutdown(void) {
     179        3455 :   grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
     180             :   gpr_tls_destroy(&g_current_thread_poller);
     181             :   gpr_tls_destroy(&g_current_thread_worker);
     182        3455 :   grpc_wakeup_fd_global_destroy();
     183        3455 : }
     184             : 
     185         424 : void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
     186             : 
     187             : /* main interface */
     188             : 
     189             : static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
     190             : 
     191        7448 : void grpc_pollset_init(grpc_pollset *pollset) {
     192        7448 :   gpr_mu_init(&pollset->mu);
     193        7448 :   pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
     194        7448 :   pollset->in_flight_cbs = 0;
     195        7448 :   pollset->shutting_down = 0;
     196        7448 :   pollset->called_shutdown = 0;
     197        7448 :   pollset->kicked_without_pollers = 0;
     198        7448 :   pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
     199        7448 :   pollset->local_wakeup_cache = NULL;
     200        7406 :   pollset->kicked_without_pollers = 0;
     201        7406 :   become_basic_pollset(pollset, NULL);
     202        7448 : }
     203             : 
     204        7401 : void grpc_pollset_destroy(grpc_pollset *pollset) {
     205        7401 :   GPR_ASSERT(pollset->in_flight_cbs == 0);
     206        7401 :   GPR_ASSERT(!grpc_pollset_has_workers(pollset));
     207        7401 :   GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
     208        7401 :   pollset->vtable->destroy(pollset);
     209        7401 :   gpr_mu_destroy(&pollset->mu);
     210       20841 :   while (pollset->local_wakeup_cache) {
     211        6039 :     grpc_cached_wakeup_fd *next = pollset->local_wakeup_cache->next;
     212        6039 :     grpc_wakeup_fd_destroy(&pollset->local_wakeup_cache->fd);
     213        6039 :     gpr_free(pollset->local_wakeup_cache);
     214        6039 :     pollset->local_wakeup_cache = next;
     215             :   }
     216        7401 : }
     217             : 
     218      423706 : void grpc_pollset_reset(grpc_pollset *pollset) {
     219      423706 :   GPR_ASSERT(pollset->shutting_down);
     220      423706 :   GPR_ASSERT(pollset->in_flight_cbs == 0);
     221      423706 :   GPR_ASSERT(!grpc_pollset_has_workers(pollset));
     222      423797 :   GPR_ASSERT(pollset->idle_jobs.head == pollset->idle_jobs.tail);
     223      423797 :   pollset->vtable->destroy(pollset);
     224      423799 :   pollset->shutting_down = 0;
     225      423799 :   pollset->called_shutdown = 0;
     226      423799 :   pollset->kicked_without_pollers = 0;
     227      423790 :   become_basic_pollset(pollset, NULL);
     228      423801 : }
     229             : 
     230     4481626 : void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     231             :                          grpc_fd *fd) {
     232     4481626 :   gpr_mu_lock(&pollset->mu);
     233     4484136 :   pollset->vtable->add_fd(exec_ctx, pollset, fd, 1);
     234             : /* the following (enabled only in debug) will reacquire and then release
     235             :    our lock - meaning that if the unlocking flag passed to del_fd above is
     236             :    not respected, the code will deadlock (in a way that we have a chance of
     237             :    debugging) */
     238             : #ifndef NDEBUG
     239     4483761 :   gpr_mu_lock(&pollset->mu);
     240     4484129 :   gpr_mu_unlock(&pollset->mu);
     241             : #endif
     242     4484029 : }
     243             : 
     244           0 : void grpc_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     245             :                          grpc_fd *fd) {
     246           0 :   gpr_mu_lock(&pollset->mu);
     247           0 :   pollset->vtable->del_fd(exec_ctx, pollset, fd, 1);
     248             : /* the following (enabled only in debug) will reacquire and then release
     249             :    our lock - meaning that if the unlocking flag passed to del_fd above is
     250             :    not respected, the code will deadlock (in a way that we have a chance of
     251             :    debugging) */
     252             : #ifndef NDEBUG
     253           0 :   gpr_mu_lock(&pollset->mu);
     254           0 :   gpr_mu_unlock(&pollset->mu);
     255             : #endif
     256           0 : }
     257             : 
     258      427999 : static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
     259      427999 :   GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
     260      428143 :   pollset->vtable->finish_shutdown(pollset);
     261      428165 :   grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, 1);
     262      428158 : }
     263             : 
     264     6945723 : void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     265             :                        grpc_pollset_worker *worker, gpr_timespec now,
     266             :                        gpr_timespec deadline) {
     267             :   /* pollset->mu already held */
     268     6915806 :   int added_worker = 0;
     269     6915806 :   int locked = 1;
     270     6915806 :   int queued_work = 0;
     271     6915806 :   int keep_polling = 0;
     272             :   GPR_TIMER_BEGIN("grpc_pollset_work", 0);
     273             :   /* this must happen before we (potentially) drop pollset->mu */
     274     6945723 :   worker->next = worker->prev = NULL;
     275     6945723 :   worker->reevaluate_polling_on_wakeup = 0;
     276     6945723 :   if (pollset->local_wakeup_cache != NULL) {
     277     6939669 :     worker->wakeup_fd = pollset->local_wakeup_cache;
     278     6939669 :     pollset->local_wakeup_cache = worker->wakeup_fd->next;
     279             :   } else {
     280        6054 :     worker->wakeup_fd = gpr_malloc(sizeof(*worker->wakeup_fd));
     281        6055 :     grpc_wakeup_fd_init(&worker->wakeup_fd->fd);
     282             :   }
     283     6945724 :   worker->kicked_specifically = 0;
     284             :   /* If there's work waiting for the pollset to be idle, and the
     285             :      pollset is idle, then do that work */
     286    12084024 :   if (!grpc_pollset_has_workers(pollset) &&
     287     5138310 :       !grpc_closure_list_empty(pollset->idle_jobs)) {
     288             :     GPR_TIMER_MARK("grpc_pollset_work.idle_jobs", 0);
     289        9205 :     grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
     290        9576 :     goto done;
     291             :   }
     292             :   /* Check alarms - these are a global resource so we just ping
     293             :      each time through on every pollset.
     294             :      May update deadline to ensure timely wakeups.
     295             :      TODO(ctiller): can this work be localized? */
     296     6936675 :   if (grpc_timer_check(exec_ctx, now, &deadline)) {
     297             :     GPR_TIMER_MARK("grpc_pollset_work.alarm_triggered", 0);
     298         668 :     gpr_mu_unlock(&pollset->mu);
     299         657 :     locked = 0;
     300         668 :     goto done;
     301             :   }
     302             :   /* If we're shutting down then we don't execute any extended work */
     303     6936042 :   if (pollset->shutting_down) {
     304             :     GPR_TIMER_MARK("grpc_pollset_work.shutting_down", 0);
     305           0 :     goto done;
     306             :   }
     307             :   /* Give do_promote priority so we don't starve it out */
     308     6936042 :   if (pollset->in_flight_cbs) {
     309             :     GPR_TIMER_MARK("grpc_pollset_work.in_flight_cbs", 0);
     310     2266658 :     gpr_mu_unlock(&pollset->mu);
     311     2237203 :     locked = 0;
     312     2265871 :     goto done;
     313             :   }
     314             :   /* Start polling, and keep doing so while we're being asked to
     315             :      re-evaluate our pollers (this allows poll() based pollers to
     316             :      ensure they don't miss wakeups) */
     317     4668147 :   keep_polling = 1;
     318    16359842 :   while (keep_polling) {
     319     4744510 :     keep_polling = 0;
     320     4745748 :     if (!pollset->kicked_without_pollers) {
     321     3512701 :       if (!added_worker) {
     322     3435184 :         push_front_worker(pollset, worker);
     323     3435177 :         added_worker = 1;
     324     3436362 :         gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker);
     325             :       }
     326     3512694 :       gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
     327             :       GPR_TIMER_BEGIN("maybe_work_and_unlock", 0);
     328     3512694 :       pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker,
     329             :                                              deadline, now);
     330             :       GPR_TIMER_END("maybe_work_and_unlock", 0);
     331     3511569 :       locked = 0;
     332     3512752 :       gpr_tls_set(&g_current_thread_poller, 0);
     333             :     } else {
     334             :       GPR_TIMER_MARK("grpc_pollset_work.kicked_without_pollers", 0);
     335     1233047 :       pollset->kicked_without_pollers = 0;
     336             :     }
     337             :   /* Finished execution - start cleaning up.
     338             :      Note that we may arrive here from outside the enclosing while() loop.
     339             :      In that case we won't loop though as we haven't added worker to the
     340             :      worker list, which means nobody could ask us to re-evaluate polling). */
     341             :   done:
     342     7021914 :     if (!locked) {
     343     5779857 :       queued_work |= grpc_exec_ctx_flush(exec_ctx);
     344     5779698 :       gpr_mu_lock(&pollset->mu);
     345     5750387 :       locked = 1;
     346             :     }
     347             :     /* If we're forced to re-evaluate polling (via grpc_pollset_kick with
     348             :        GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
     349             :        a loop */
     350     7022311 :     if (worker->reevaluate_polling_on_wakeup) {
     351       76487 :       worker->reevaluate_polling_on_wakeup = 0;
     352       76487 :       pollset->kicked_without_pollers = 0;
     353       76487 :       if (queued_work || worker->kicked_specifically) {
     354             :         /* If there's queued work on the list, then set the deadline to be
     355             :            immediate so we get back out of the polling loop quickly */
     356       76487 :         deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
     357             :       }
     358       76486 :       keep_polling = 1;
     359             :     }
     360             :   }
     361     6945947 :   if (added_worker) {
     362     3435284 :     remove_worker(pollset, worker);
     363     3436442 :     gpr_tls_set(&g_current_thread_worker, 0);
     364             :   }
     365             :   /* release wakeup fd to the local pool */
     366     6945920 :   worker->wakeup_fd->next = pollset->local_wakeup_cache;
     367     6945920 :   pollset->local_wakeup_cache = worker->wakeup_fd;
     368             :   /* check shutdown conditions */
     369     6945920 :   if (pollset->shutting_down) {
     370        4092 :     if (grpc_pollset_has_workers(pollset)) {
     371         174 :       grpc_pollset_kick(pollset, NULL);
     372        3918 :     } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
     373        3914 :       pollset->called_shutdown = 1;
     374        3914 :       gpr_mu_unlock(&pollset->mu);
     375        3914 :       finish_shutdown(exec_ctx, pollset);
     376        3914 :       grpc_exec_ctx_flush(exec_ctx);
     377             :       /* Continuing to access pollset here is safe -- it is the caller's
     378             :        * responsibility to not destroy when it has outstanding calls to
     379             :        * grpc_pollset_work.
     380             :        * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
     381        3914 :       gpr_mu_lock(&pollset->mu);
     382           4 :     } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
     383           0 :       grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
     384           0 :       gpr_mu_unlock(&pollset->mu);
     385           0 :       grpc_exec_ctx_flush(exec_ctx);
     386           0 :       gpr_mu_lock(&pollset->mu);
     387             :     }
     388             :   }
     389             :   GPR_TIMER_END("grpc_pollset_work", 0);
     390     6945920 : }
     391             : 
     392      428099 : void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     393             :                            grpc_closure *closure) {
     394      428099 :   GPR_ASSERT(!pollset->shutting_down);
     395      428099 :   pollset->shutting_down = 1;
     396      428099 :   pollset->shutdown_done = closure;
     397      428099 :   grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
     398      428155 :   if (!grpc_pollset_has_workers(pollset)) {
     399      424246 :     grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
     400             :   }
     401      856292 :   if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
     402      428151 :       !grpc_pollset_has_workers(pollset)) {
     403      424230 :     pollset->called_shutdown = 1;
     404      424230 :     finish_shutdown(exec_ctx, pollset);
     405             :   }
     406      428156 : }
     407             : 
     408     3512749 : int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
     409             :                                          gpr_timespec now) {
     410             :   gpr_timespec timeout;
     411             :   static const int max_spin_polling_us = 10;
     412     3512749 :   if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
     413        6213 :     return -1;
     414             :   }
     415     3506804 :   if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
     416             :                                                    max_spin_polling_us,
     417             :                                                    GPR_TIMESPAN))) <= 0) {
     418      277717 :     return 0;
     419             :   }
     420     3229069 :   timeout = gpr_time_sub(deadline, now);
     421     3229037 :   return gpr_time_to_millis(gpr_time_add(
     422             :       timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
     423             : }
     424             : 
     425             : /*
     426             :  * basic_pollset - a vtable that provides polling for zero or one file
     427             :  *                 descriptor via poll()
     428             :  */
     429             : 
     430             : typedef struct grpc_unary_promote_args {
     431             :   const grpc_pollset_vtable *original_vtable;
     432             :   grpc_pollset *pollset;
     433             :   grpc_fd *fd;
     434             :   grpc_closure promotion_closure;
     435             : } grpc_unary_promote_args;
     436             : 
     437        9328 : static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, int success) {
     438        9326 :   grpc_unary_promote_args *up_args = args;
     439        9328 :   const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
     440        9328 :   grpc_pollset *pollset = up_args->pollset;
     441        9328 :   grpc_fd *fd = up_args->fd;
     442             : 
     443             :   /*
     444             :    * This is quite tricky. There are a number of cases to keep in mind here:
     445             :    * 1. fd may have been orphaned
     446             :    * 2. The pollset may no longer be a unary poller (and we can't let case #1
     447             :    * leak to other pollset types!)
     448             :    * 3. pollset's fd (which may have changed) may have been orphaned
     449             :    * 4. The pollset may be shutting down.
     450             :    */
     451             : 
     452        9328 :   gpr_mu_lock(&pollset->mu);
     453             :   /* First we need to ensure that nobody is polling concurrently */
     454        9328 :   GPR_ASSERT(!grpc_pollset_has_workers(pollset));
     455             : 
     456        9328 :   gpr_free(up_args);
     457             :   /* At this point the pollset may no longer be a unary poller. In that case
     458             :    * we should just call the right add function and be done. */
     459             :   /* TODO(klempner): If we're not careful this could cause infinite recursion.
     460             :    * That's not a problem for now because empty_pollset has a trivial poller
     461             :    * and we don't have any mechanism to unbecome multipoller. */
     462        9328 :   pollset->in_flight_cbs--;
     463        9328 :   if (pollset->shutting_down) {
     464             :     /* We don't care about this pollset anymore. */
     465           1 :     if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
     466           1 :       pollset->called_shutdown = 1;
     467           1 :       finish_shutdown(exec_ctx, pollset);
     468             :     }
     469        9327 :   } else if (grpc_fd_is_orphaned(fd)) {
     470             :     /* Don't try to add it to anything, we'll drop our ref on it below */
     471        9317 :   } else if (pollset->vtable != original_vtable) {
     472         106 :     pollset->vtable->add_fd(exec_ctx, pollset, fd, 0);
     473        9211 :   } else if (fd != pollset->data.ptr) {
     474             :     grpc_fd *fds[2];
     475        9209 :     fds[0] = pollset->data.ptr;
     476        9209 :     fds[1] = fd;
     477             : 
     478        9209 :     if (fds[0] && !grpc_fd_is_orphaned(fds[0])) {
     479         167 :       grpc_platform_become_multipoller(exec_ctx, pollset, fds,
     480             :                                        GPR_ARRAY_SIZE(fds));
     481         167 :       GRPC_FD_UNREF(fds[0], "basicpoll");
     482             :     } else {
     483             :       /* old fd is orphaned and we haven't cleaned it up until now, so remain a
     484             :        * unary poller */
     485             :       /* Note that it is possible that fds[1] is also orphaned at this point.
     486             :        * That's okay, we'll correct it at the next add or poll. */
     487        9042 :       if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll");
     488        9042 :       pollset->data.ptr = fd;
     489        9042 :       GRPC_FD_REF(fd, "basicpoll");
     490             :     }
     491             :   }
     492             : 
     493        9328 :   gpr_mu_unlock(&pollset->mu);
     494             : 
     495             :   /* Matching ref in basic_pollset_add_fd */
     496        9328 :   GRPC_FD_UNREF(fd, "basicpoll_add");
     497        9328 : }
     498             : 
     499     1989000 : static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     500             :                                  grpc_fd *fd, int and_unlock_pollset) {
     501             :   grpc_unary_promote_args *up_args;
     502     1989000 :   GPR_ASSERT(fd);
     503     1989000 :   if (fd == pollset->data.ptr) goto exit;
     504             : 
     505      434790 :   if (!grpc_pollset_has_workers(pollset)) {
     506             :     /* Fast path -- no in flight cbs */
     507             :     /* TODO(klempner): Comment this out and fix any test failures or establish
     508             :      * they are due to timing issues */
     509             :     grpc_fd *fds[2];
     510      425463 :     fds[0] = pollset->data.ptr;
     511      425463 :     fds[1] = fd;
     512             : 
     513      425463 :     if (fds[0] == NULL) {
     514      422221 :       pollset->data.ptr = fd;
     515      422221 :       GRPC_FD_REF(fd, "basicpoll");
     516        3242 :     } else if (!grpc_fd_is_orphaned(fds[0])) {
     517        3191 :       grpc_platform_become_multipoller(exec_ctx, pollset, fds,
     518             :                                        GPR_ARRAY_SIZE(fds));
     519        3191 :       GRPC_FD_UNREF(fds[0], "basicpoll");
     520             :     } else {
     521             :       /* old fd is orphaned and we haven't cleaned it up until now, so remain a
     522             :        * unary poller */
     523          51 :       GRPC_FD_UNREF(fds[0], "basicpoll");
     524          51 :       pollset->data.ptr = fd;
     525          51 :       GRPC_FD_REF(fd, "basicpoll");
     526             :     }
     527      425412 :     goto exit;
     528             :   }
     529             : 
     530             :   /* Now we need to promote. This needs to happen when we're not polling. Since
     531             :    * this may be called from poll, the wait needs to happen asynchronously. */
     532        9328 :   GRPC_FD_REF(fd, "basicpoll_add");
     533        9328 :   pollset->in_flight_cbs++;
     534        9328 :   up_args = gpr_malloc(sizeof(*up_args));
     535        9328 :   up_args->fd = fd;
     536        9328 :   up_args->original_vtable = pollset->vtable;
     537        9328 :   up_args->pollset = pollset;
     538        9328 :   up_args->promotion_closure.cb = basic_do_promote;
     539        9328 :   up_args->promotion_closure.cb_arg = up_args;
     540             : 
     541        9328 :   grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1);
     542        9328 :   grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
     543             : 
     544             : exit:
     545     1988796 :   if (and_unlock_pollset) {
     546     1988796 :     gpr_mu_unlock(&pollset->mu);
     547             :   }
     548     1989408 : }
     549             : 
     550           0 : static void basic_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     551             :                                  grpc_fd *fd, int and_unlock_pollset) {
     552           0 :   GPR_ASSERT(fd);
     553           0 :   if (fd == pollset->data.ptr) {
     554           0 :     GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
     555           0 :     pollset->data.ptr = NULL;
     556             :   }
     557             : 
     558           0 :   if (and_unlock_pollset) {
     559           0 :     gpr_mu_unlock(&pollset->mu);
     560             :   }
     561           0 : }
     562             : 
     563     1247195 : static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
     564             :                                                 grpc_pollset *pollset,
     565             :                                                 grpc_pollset_worker *worker,
     566             :                                                 gpr_timespec deadline,
     567             :                                                 gpr_timespec now) {
     568             : #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
     569             : #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
     570             : 
     571             :   struct pollfd pfd[3];
     572             :   grpc_fd *fd;
     573             :   grpc_fd_watcher fd_watcher;
     574             :   int timeout;
     575             :   int r;
     576             :   nfds_t nfds;
     577             : 
     578     1247195 :   fd = pollset->data.ptr;
     579     1247195 :   if (fd && grpc_fd_is_orphaned(fd)) {
     580        1406 :     GRPC_FD_UNREF(fd, "basicpoll");
     581        1406 :     fd = pollset->data.ptr = NULL;
     582             :   }
     583     1247195 :   timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
     584     1247261 :   pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
     585     1247261 :   pfd[0].events = POLLIN;
     586     1247261 :   pfd[0].revents = 0;
     587     1247261 :   pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd->fd);
     588     1247261 :   pfd[1].events = POLLIN;
     589     1247261 :   pfd[1].revents = 0;
     590     1247258 :   nfds = 2;
     591     1247261 :   if (fd) {
     592     1238858 :     pfd[2].fd = fd->fd;
     593     1238858 :     pfd[2].revents = 0;
     594     1238858 :     GRPC_FD_REF(fd, "basicpoll_begin");
     595     1238903 :     gpr_mu_unlock(&pollset->mu);
     596     1238803 :     pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN,
     597             :                                               POLLOUT, &fd_watcher);
     598     1238901 :     if (pfd[2].events != 0) {
     599      956569 :       nfds++;
     600             :     }
     601             :   } else {
     602        8403 :     gpr_mu_unlock(&pollset->mu);
     603             :   }
     604             : 
     605             :   /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
     606             :      even going into the blocking annotation if possible */
     607             :   /* poll fd count (argument 2) is shortened by one if we have no events
     608             :      to poll on - such that it only includes the kicker */
     609             :   GPR_TIMER_BEGIN("poll", 0);
     610             :   GRPC_SCHEDULING_START_BLOCKING_REGION;
     611     1247304 :   r = grpc_poll_function(pfd, nfds, timeout);
     612             :   GRPC_SCHEDULING_END_BLOCKING_REGION;
     613             :   GPR_TIMER_END("poll", 0);
     614             : 
     615     1247249 :   if (r < 0) {
     616          10 :     if (errno != EINTR) {
     617           0 :       gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
     618             :     }
     619          10 :     if (fd) {
     620           6 :       grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
     621             :     }
     622     1247239 :   } else if (r == 0) {
     623      158332 :     if (fd) {
     624      156354 :       grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
     625             :     }
     626             :   } else {
     627     1088907 :     if (pfd[0].revents & POLLIN_CHECK) {
     628         101 :       grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
     629             :     }
     630     1088938 :     if (pfd[1].revents & POLLIN_CHECK) {
     631      244094 :       grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd->fd);
     632             :     }
     633     1088936 :     if (nfds > 2) {
     634      913567 :       grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
     635      913567 :                        pfd[2].revents & POLLOUT_CHECK);
     636      175369 :     } else if (fd) {
     637      168951 :       grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
     638             :     }
     639             :   }
     640             : 
     641     1247306 :   if (fd) {
     642     1238903 :     GRPC_FD_UNREF(fd, "basicpoll_begin");
     643             :   }
     644     1247297 : }
     645             : 
     646      852356 : static void basic_pollset_destroy(grpc_pollset *pollset) {
     647      852356 :   if (pollset->data.ptr != NULL) {
     648      423501 :     GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
     649      423523 :     pollset->data.ptr = NULL;
     650             :   }
     651      852378 : }
     652             : 
     653             : static const grpc_pollset_vtable basic_pollset = {
     654             :     basic_pollset_add_fd, basic_pollset_del_fd,
     655             :     basic_pollset_maybe_work_and_unlock, basic_pollset_destroy,
     656             :     basic_pollset_destroy};
     657             : 
     658      431198 : static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
     659      431249 :   pollset->vtable = &basic_pollset;
     660      431249 :   pollset->data.ptr = fd_or_null;
     661      431198 :   if (fd_or_null != NULL) {
     662           0 :     GRPC_FD_REF(fd_or_null, "basicpoll");
     663             :   }
     664      431198 : }
     665             : 
     666             : #endif /* GPR_POSIX_POLLSET */

Generated by: LCOV version 1.11