LCOV - code coverage report
Current view: top level - src/core/iomgr - pollset_posix.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 283 312 90.7 %
Date: 2015-10-10 Functions: 22 24 91.7 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include <grpc/support/port_platform.h>
      35             : 
      36             : #ifdef GPR_POSIX_SOCKET
      37             : 
      38             : #include "src/core/iomgr/pollset_posix.h"
      39             : 
      40             : #include <errno.h>
      41             : #include <stdlib.h>
      42             : #include <string.h>
      43             : #include <unistd.h>
      44             : 
      45             : #include "src/core/iomgr/alarm_internal.h"
      46             : #include "src/core/iomgr/fd_posix.h"
      47             : #include "src/core/iomgr/iomgr_internal.h"
      48             : #include "src/core/iomgr/socket_utils_posix.h"
      49             : #include "src/core/profiling/timers.h"
      50             : #include "src/core/support/block_annotate.h"
      51             : #include <grpc/support/alloc.h>
      52             : #include <grpc/support/log.h>
      53             : #include <grpc/support/thd.h>
      54             : #include <grpc/support/tls.h>
      55             : #include <grpc/support/useful.h>
      56             : 
      57             : GPR_TLS_DECL(g_current_thread_poller);
      58             : GPR_TLS_DECL(g_current_thread_worker);
      59             : 
      60             : /** Default poll() function - a pointer so that it can be overridden by some
      61             :  *  tests */
      62             : grpc_poll_function_type grpc_poll_function = poll;
      63             : 
      64             : /** The alarm system needs to be able to wakeup 'some poller' sometimes
      65             :  *  (specifically when a new alarm needs to be triggered earlier than the next
      66             :  *  alarm 'epoch').
      67             :  *  This wakeup_fd gives us something to alert on when such a case occurs. */
      68             : grpc_wakeup_fd grpc_global_wakeup_fd;
      69             : 
      70     7498195 : static void remove_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
      71     7498195 :   worker->prev->next = worker->next;
      72     7498195 :   worker->next->prev = worker->prev;
      73     7498195 : }
      74             : 
      75    13542693 : int grpc_pollset_has_workers(grpc_pollset *p) {
      76    13542693 :   return p->root_worker.next != &p->root_worker;
      77             : }
      78             : 
      79     6957610 : static grpc_pollset_worker *pop_front_worker(grpc_pollset *p) {
      80     6957610 :   if (grpc_pollset_has_workers(p)) {
      81     2598781 :     grpc_pollset_worker *w = p->root_worker.next;
      82     2598781 :     remove_worker(p, w);
      83     2598646 :     return w;
      84             :   } else {
      85     4358927 :     return NULL;
      86             :   }
      87             : }
      88             : 
      89     2598578 : static void push_back_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
      90     2598578 :   worker->next = &p->root_worker;
      91     2598578 :   worker->prev = worker->next->prev;
      92     2598578 :   worker->prev->next = worker->next->prev = worker;
      93     2598578 : }
      94             : 
      95     4897253 : static void push_front_worker(grpc_pollset *p, grpc_pollset_worker *worker) {
      96     4897253 :   worker->prev = &p->root_worker;
      97     4897253 :   worker->next = worker->prev->next;
      98     4897253 :   worker->prev->next = worker->next->prev = worker;
      99     4897253 : }
     100             : 
     101     7717423 : void grpc_pollset_kick_ext(grpc_pollset *p,
     102             :                            grpc_pollset_worker *specific_worker,
     103             :                            gpr_uint32 flags) {
     104             :   /* pollset->mu already held */
     105     7717423 :   if (specific_worker != NULL) {
     106      760504 :     if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) {
     107      330747 :       GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
     108      676266 :       for (specific_worker = p->root_worker.next;
     109      345519 :            specific_worker != &p->root_worker;
     110       14772 :            specific_worker = specific_worker->next) {
     111       14772 :         grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
     112             :       }
     113      330747 :       p->kicked_without_pollers = 1;
     114      330747 :       return;
     115      859514 :     } else if (gpr_tls_get(&g_current_thread_worker) !=
     116      429757 :                (gpr_intptr)specific_worker) {
     117      429757 :       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
     118      115613 :         specific_worker->reevaluate_polling_on_wakeup = 1;
     119             :       }
     120      429757 :       grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
     121      429929 :       return;
     122           0 :     } else if ((flags & GRPC_POLLSET_CAN_KICK_SELF) != 0) {
     123           0 :       if ((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) != 0) {
     124           0 :         specific_worker->reevaluate_polling_on_wakeup = 1;
     125             :       }
     126           0 :       grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
     127           0 :       return;
     128             :     }
     129     6956919 :   } else if (gpr_tls_get(&g_current_thread_poller) != (gpr_intptr)p) {
     130     6956942 :     GPR_ASSERT((flags & GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) == 0);
     131     6956942 :     specific_worker = pop_front_worker(p);
     132     6954541 :     if (specific_worker != NULL) {
     133     5196220 :       if (gpr_tls_get(&g_current_thread_worker) ==
     134     2598110 :           (gpr_intptr)specific_worker) {
     135           0 :         push_back_worker(p, specific_worker);
     136           0 :         specific_worker = pop_front_worker(p);
     137           0 :         if ((flags & GRPC_POLLSET_CAN_KICK_SELF) == 0 &&
     138           0 :             gpr_tls_get(&g_current_thread_worker) ==
     139           0 :                 (gpr_intptr)specific_worker) {
     140           0 :           push_back_worker(p, specific_worker);
     141           0 :           return;
     142             :         }
     143             :       }
     144     2598110 :       push_back_worker(p, specific_worker);
     145     2598548 :       grpc_wakeup_fd_wakeup(&specific_worker->wakeup_fd);
     146     2598066 :       return;
     147             :     } else {
     148     4356431 :       p->kicked_without_pollers = 1;
     149     4356431 :       return;
     150             :     }
     151             :   }
     152             : }
     153             : 
     154     7597688 : void grpc_pollset_kick(grpc_pollset *p, grpc_pollset_worker *specific_worker) {
     155     7597688 :   grpc_pollset_kick_ext(p, specific_worker, 0);
     156     7595970 : }
     157             : 
     158             : /* global state management */
     159             : 
     160        2506 : void grpc_pollset_global_init(void) {
     161             :   gpr_tls_init(&g_current_thread_poller);
     162             :   gpr_tls_init(&g_current_thread_worker);
     163        2506 :   grpc_wakeup_fd_global_init();
     164        2506 :   grpc_wakeup_fd_init(&grpc_global_wakeup_fd);
     165        2506 : }
     166             : 
     167        2506 : void grpc_pollset_global_shutdown(void) {
     168        2506 :   grpc_wakeup_fd_destroy(&grpc_global_wakeup_fd);
     169        2506 :   grpc_wakeup_fd_global_destroy();
     170             :   gpr_tls_destroy(&g_current_thread_poller);
     171             :   gpr_tls_destroy(&g_current_thread_worker);
     172        2506 : }
     173             : 
     174         236 : void grpc_kick_poller(void) { grpc_wakeup_fd_wakeup(&grpc_global_wakeup_fd); }
     175             : 
     176             : /* main interface */
     177             : 
     178             : static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null);
     179             : 
     180      325418 : void grpc_pollset_init(grpc_pollset *pollset) {
     181      325418 :   gpr_mu_init(&pollset->mu);
     182      325443 :   pollset->root_worker.next = pollset->root_worker.prev = &pollset->root_worker;
     183      325443 :   pollset->in_flight_cbs = 0;
     184      325443 :   pollset->shutting_down = 0;
     185      325443 :   pollset->called_shutdown = 0;
     186      325443 :   pollset->idle_jobs.head = pollset->idle_jobs.tail = NULL;
     187      325443 :   become_basic_pollset(pollset, NULL);
     188      325441 : }
     189             : 
     190     2731404 : void grpc_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     191             :                          grpc_fd *fd) {
     192     2731404 :   gpr_mu_lock(&pollset->mu);
     193     2733602 :   pollset->vtable->add_fd(exec_ctx, pollset, fd, 1);
     194             : /* the following (enabled only in debug) will reacquire and then release
     195             :    our lock - meaning that if the unlocking flag passed to del_fd above is
     196             :    not respected, the code will deadlock (in a way that we have a chance of
     197             :    debugging) */
     198             : #ifndef NDEBUG
     199     2733514 :   gpr_mu_lock(&pollset->mu);
     200     2733260 :   gpr_mu_unlock(&pollset->mu);
     201             : #endif
     202     2733089 : }
     203             : 
     204           0 : void grpc_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     205             :                          grpc_fd *fd) {
     206           0 :   gpr_mu_lock(&pollset->mu);
     207           0 :   pollset->vtable->del_fd(exec_ctx, pollset, fd, 1);
     208             : /* the following (enabled only in debug) will reacquire and then release
     209             :    our lock - meaning that if the unlocking flag passed to del_fd above is
     210             :    not respected, the code will deadlock (in a way that we have a chance of
     211             :    debugging) */
     212             : #ifndef NDEBUG
     213           0 :   gpr_mu_lock(&pollset->mu);
     214           0 :   gpr_mu_unlock(&pollset->mu);
     215             : #endif
     216           0 : }
     217             : 
     218      325469 : static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset) {
     219      325469 :   GPR_ASSERT(grpc_closure_list_empty(pollset->idle_jobs));
     220      325469 :   pollset->vtable->finish_shutdown(pollset);
     221      325469 :   grpc_exec_ctx_enqueue(exec_ctx, pollset->shutdown_done, 1);
     222      325461 : }
     223             : 
     224     5276078 : void grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     225             :                        grpc_pollset_worker *worker, gpr_timespec now,
     226             :                        gpr_timespec deadline) {
     227             :   /* pollset->mu already held */
     228     5276078 :   int added_worker = 0;
     229     5276078 :   int locked = 1;
     230     5276078 :   int queued_work = 0;
     231     5276078 :   int keep_polling = 0;
     232             :   /* this must happen before we (potentially) drop pollset->mu */
     233     5276078 :   worker->next = worker->prev = NULL;
     234     5276078 :   worker->reevaluate_polling_on_wakeup = 0;
     235             :   /* TODO(ctiller): pool these */
     236     5276078 :   grpc_wakeup_fd_init(&worker->wakeup_fd);
     237             :   /* If there's work waiting for the pollset to be idle, and the
     238             :      pollset is idle, then do that work */
     239    10532191 :   if (!grpc_pollset_has_workers(pollset) &&
     240     5240046 :       !grpc_closure_list_empty(pollset->idle_jobs)) {
     241        4251 :     grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
     242           0 :     goto done;
     243             :   }
     244             :   /* Check alarms - these are a global resource so we just ping
     245             :      each time through on every pollset.
     246             :      May update deadline to ensure timely wakeups.
     247             :      TODO(ctiller): can this work be localized? */
     248     5287742 :   if (grpc_alarm_check(exec_ctx, now, &deadline)) {
     249         318 :     gpr_mu_unlock(&pollset->mu);
     250         318 :     locked = 0;
     251         318 :     goto done;
     252             :   }
     253             :   /* If we're shutting down then we don't execute any extended work */
     254     5278839 :   if (pollset->shutting_down) {
     255           0 :     goto done;
     256             :   }
     257             :   /* Give do_promote priority so we don't starve it out */
     258     5278839 :   if (pollset->in_flight_cbs) {
     259        4753 :     gpr_mu_unlock(&pollset->mu);
     260        4753 :     locked = 0;
     261        4753 :     goto done;
     262             :   }
     263             :   /* Start polling, and keep doing so while we're being asked to
     264             :      re-evaluate our pollers (this allows poll() based pollers to
     265             :      ensure they don't miss wakeups) */
     266     5274086 :   keep_polling = 1;
     267    15929639 :   while (keep_polling) {
     268     5368666 :     keep_polling = 0;
     269     5368666 :     if (!pollset->kicked_without_pollers) {
     270     4989074 :       if (!added_worker) {
     271     4892075 :         push_front_worker(pollset, worker);
     272     4897496 :         added_worker = 1;
     273             :       }
     274     4994495 :       gpr_tls_set(&g_current_thread_poller, (gpr_intptr)pollset);
     275     4994495 :       gpr_tls_set(&g_current_thread_worker, (gpr_intptr)worker);
     276     4994495 :       pollset->vtable->maybe_work_and_unlock(exec_ctx, pollset, worker,
     277             :                                              deadline, now);
     278     4994115 :       locked = 0;
     279     4994115 :       gpr_tls_set(&g_current_thread_poller, 0);
     280     4994115 :       gpr_tls_set(&g_current_thread_worker, 0);
     281             :     } else {
     282      379592 :       pollset->kicked_without_pollers = 0;
     283             :     }
     284             :   /* Finished execution - start cleaning up.
     285             :      Note that we may arrive here from outside the enclosing while() loop.
     286             :      In that case we won't loop though as we haven't added worker to the
     287             :      worker list, which means nobody could ask us to re-evaluate polling). */
     288             :   done:
     289     5374907 :     if (!locked) {
     290     4999945 :       queued_work |= grpc_exec_ctx_flush(exec_ctx);
     291     4999903 :       gpr_mu_lock(&pollset->mu);
     292     5006505 :       locked = 1;
     293             :     }
     294             :     /* If we're forced to re-evaluate polling (via grpc_pollset_kick with
     295             :        GRPC_POLLSET_REEVALUATE_POLLING_ON_WAKEUP) then we land here and force
     296             :        a loop */
     297     5381467 :     if (worker->reevaluate_polling_on_wakeup) {
     298       98476 :       worker->reevaluate_polling_on_wakeup = 0;
     299       98476 :       pollset->kicked_without_pollers = 0;
     300       98476 :       if (queued_work) {
     301             :         /* If there's queued work on the list, then set the deadline to be
     302             :            immediate so we get back out of the polling loop quickly */
     303          43 :         deadline = gpr_inf_past(GPR_CLOCK_MONOTONIC);
     304             :       }
     305       98476 :       keep_polling = 1;
     306             :     }
     307             :   }
     308     5286887 :   if (added_worker) {
     309     4903335 :     remove_worker(pollset, worker);
     310             :   }
     311     5286703 :   grpc_wakeup_fd_destroy(&worker->wakeup_fd);
     312     5276799 :   if (pollset->shutting_down) {
     313        9417 :     if (grpc_pollset_has_workers(pollset)) {
     314           7 :       grpc_pollset_kick(pollset, NULL);
     315        9410 :     } else if (!pollset->called_shutdown && pollset->in_flight_cbs == 0) {
     316        9409 :       pollset->called_shutdown = 1;
     317        9409 :       gpr_mu_unlock(&pollset->mu);
     318        9409 :       finish_shutdown(exec_ctx, pollset);
     319        9409 :       grpc_exec_ctx_flush(exec_ctx);
     320             :       /* Continuing to access pollset here is safe -- it is the caller's
     321             :        * responsibility to not destroy when it has outstanding calls to
     322             :        * grpc_pollset_work.
     323             :        * TODO(dklempner): Can we refactor the shutdown logic to avoid this? */
     324        9409 :       gpr_mu_lock(&pollset->mu);
     325           1 :     } else if (!grpc_closure_list_empty(pollset->idle_jobs)) {
     326           1 :       gpr_mu_unlock(&pollset->mu);
     327           1 :       grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
     328           1 :       grpc_exec_ctx_flush(exec_ctx);
     329           1 :       gpr_mu_lock(&pollset->mu);
     330             :     }
     331             :   }
     332     5276799 : }
     333             : 
     334      325468 : void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     335             :                            grpc_closure *closure) {
     336      325468 :   int call_shutdown = 0;
     337      325468 :   gpr_mu_lock(&pollset->mu);
     338      325468 :   GPR_ASSERT(!pollset->shutting_down);
     339      325468 :   pollset->shutting_down = 1;
     340      650932 :   if (!pollset->called_shutdown && pollset->in_flight_cbs == 0 &&
     341      325465 :       !grpc_pollset_has_workers(pollset)) {
     342      316057 :     pollset->called_shutdown = 1;
     343      316057 :     call_shutdown = 1;
     344             :   }
     345      325467 :   if (!grpc_pollset_has_workers(pollset)) {
     346      316059 :     grpc_exec_ctx_enqueue_list(exec_ctx, &pollset->idle_jobs);
     347             :   }
     348      325461 :   pollset->shutdown_done = closure;
     349      325461 :   grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
     350      325467 :   gpr_mu_unlock(&pollset->mu);
     351             : 
     352      325469 :   if (call_shutdown) {
     353      316058 :     finish_shutdown(exec_ctx, pollset);
     354             :   }
     355      325461 : }
     356             : 
     357      325421 : void grpc_pollset_destroy(grpc_pollset *pollset) {
     358      325421 :   GPR_ASSERT(pollset->shutting_down);
     359      325421 :   GPR_ASSERT(pollset->in_flight_cbs == 0);
     360      325421 :   GPR_ASSERT(!grpc_pollset_has_workers(pollset));
     361      325463 :   pollset->vtable->destroy(pollset);
     362      325459 :   gpr_mu_destroy(&pollset->mu);
     363      325460 : }
     364             : 
     365     4999463 : int grpc_poll_deadline_to_millis_timeout(gpr_timespec deadline,
     366             :                                          gpr_timespec now) {
     367             :   gpr_timespec timeout;
     368             :   static const int max_spin_polling_us = 10;
     369     4999463 :   if (gpr_time_cmp(deadline, gpr_inf_future(deadline.clock_type)) == 0) {
     370       13883 :     return -1;
     371             :   }
     372     4988745 :   if (gpr_time_cmp(deadline, gpr_time_add(now, gpr_time_from_micros(
     373             :                                                    max_spin_polling_us,
     374             :                                                    GPR_TIMESPAN))) <= 0) {
     375     2358600 :     return 0;
     376             :   }
     377     2629477 :   timeout = gpr_time_sub(deadline, now);
     378     2629408 :   return gpr_time_to_millis(gpr_time_add(
     379             :       timeout, gpr_time_from_nanos(GPR_NS_PER_MS - 1, GPR_TIMESPAN)));
     380             : }
     381             : 
     382             : /*
     383             :  * basic_pollset - a vtable that provides polling for zero or one file
     384             :  *                 descriptor via poll()
     385             :  */
     386             : 
     387             : typedef struct grpc_unary_promote_args {
     388             :   const grpc_pollset_vtable *original_vtable;
     389             :   grpc_pollset *pollset;
     390             :   grpc_fd *fd;
     391             :   grpc_closure promotion_closure;
     392             : } grpc_unary_promote_args;
     393             : 
     394        5356 : static void basic_do_promote(grpc_exec_ctx *exec_ctx, void *args, int success) {
     395        5356 :   grpc_unary_promote_args *up_args = args;
     396        5356 :   const grpc_pollset_vtable *original_vtable = up_args->original_vtable;
     397        5356 :   grpc_pollset *pollset = up_args->pollset;
     398        5356 :   grpc_fd *fd = up_args->fd;
     399             : 
     400             :   /*
     401             :    * This is quite tricky. There are a number of cases to keep in mind here:
     402             :    * 1. fd may have been orphaned
     403             :    * 2. The pollset may no longer be a unary poller (and we can't let case #1
     404             :    * leak to other pollset types!)
     405             :    * 3. pollset's fd (which may have changed) may have been orphaned
     406             :    * 4. The pollset may be shutting down.
     407             :    */
     408             : 
     409        5356 :   gpr_mu_lock(&pollset->mu);
     410             :   /* First we need to ensure that nobody is polling concurrently */
     411        5356 :   GPR_ASSERT(!grpc_pollset_has_workers(pollset));
     412             : 
     413        5356 :   gpr_free(up_args);
     414             :   /* At this point the pollset may no longer be a unary poller. In that case
     415             :    * we should just call the right add function and be done. */
     416             :   /* TODO(klempner): If we're not careful this could cause infinite recursion.
     417             :    * That's not a problem for now because empty_pollset has a trivial poller
     418             :    * and we don't have any mechanism to unbecome multipoller. */
     419        5356 :   pollset->in_flight_cbs--;
     420        5356 :   if (pollset->shutting_down) {
     421             :     /* We don't care about this pollset anymore. */
     422           4 :     if (pollset->in_flight_cbs == 0 && !pollset->called_shutdown) {
     423           3 :       pollset->called_shutdown = 1;
     424           3 :       finish_shutdown(exec_ctx, pollset);
     425             :     }
     426        5352 :   } else if (grpc_fd_is_orphaned(fd)) {
     427             :     /* Don't try to add it to anything, we'll drop our ref on it below */
     428        5342 :   } else if (pollset->vtable != original_vtable) {
     429        1060 :     pollset->vtable->add_fd(exec_ctx, pollset, fd, 0);
     430        4282 :   } else if (fd != pollset->data.ptr) {
     431             :     grpc_fd *fds[2];
     432        4236 :     fds[0] = pollset->data.ptr;
     433        4236 :     fds[1] = fd;
     434             : 
     435        4236 :     if (fds[0] && !grpc_fd_is_orphaned(fds[0])) {
     436        1049 :       grpc_platform_become_multipoller(exec_ctx, pollset, fds,
     437             :                                        GPR_ARRAY_SIZE(fds));
     438        1049 :       GRPC_FD_UNREF(fds[0], "basicpoll");
     439             :     } else {
     440             :       /* old fd is orphaned and we haven't cleaned it up until now, so remain a
     441             :        * unary poller */
     442             :       /* Note that it is possible that fds[1] is also orphaned at this point.
     443             :        * That's okay, we'll correct it at the next add or poll. */
     444        3187 :       if (fds[0]) GRPC_FD_UNREF(fds[0], "basicpoll");
     445        3187 :       pollset->data.ptr = fd;
     446        3187 :       GRPC_FD_REF(fd, "basicpoll");
     447             :     }
     448             :   }
     449             : 
     450        5356 :   gpr_mu_unlock(&pollset->mu);
     451             : 
     452             :   /* Matching ref in basic_pollset_add_fd */
     453        5356 :   GRPC_FD_UNREF(fd, "basicpoll_add");
     454        5356 : }
     455             : 
     456     1338525 : static void basic_pollset_add_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     457             :                                  grpc_fd *fd, int and_unlock_pollset) {
     458             :   grpc_unary_promote_args *up_args;
     459     1338525 :   GPR_ASSERT(fd);
     460     1338525 :   if (fd == pollset->data.ptr) goto exit;
     461             : 
     462      327306 :   if (!grpc_pollset_has_workers(pollset)) {
     463             :     /* Fast path -- no in flight cbs */
     464             :     /* TODO(klempner): Comment this out and fix any test failures or establish
     465             :      * they are due to timing issues */
     466             :     grpc_fd *fds[2];
     467      321952 :     fds[0] = pollset->data.ptr;
     468      321952 :     fds[1] = fd;
     469             : 
     470      321952 :     if (fds[0] == NULL) {
     471      321061 :       pollset->data.ptr = fd;
     472      321061 :       GRPC_FD_REF(fd, "basicpoll");
     473         891 :     } else if (!grpc_fd_is_orphaned(fds[0])) {
     474         839 :       grpc_platform_become_multipoller(exec_ctx, pollset, fds,
     475             :                                        GPR_ARRAY_SIZE(fds));
     476         839 :       GRPC_FD_UNREF(fds[0], "basicpoll");
     477             :     } else {
     478             :       /* old fd is orphaned and we haven't cleaned it up until now, so remain a
     479             :        * unary poller */
     480          52 :       GRPC_FD_UNREF(fds[0], "basicpoll");
     481          52 :       pollset->data.ptr = fd;
     482          52 :       GRPC_FD_REF(fd, "basicpoll");
     483             :     }
     484      321926 :     goto exit;
     485             :   }
     486             : 
     487             :   /* Now we need to promote. This needs to happen when we're not polling. Since
     488             :    * this may be called from poll, the wait needs to happen asynchronously. */
     489        5356 :   GRPC_FD_REF(fd, "basicpoll_add");
     490        5356 :   pollset->in_flight_cbs++;
     491        5356 :   up_args = gpr_malloc(sizeof(*up_args));
     492        5356 :   up_args->fd = fd;
     493        5356 :   up_args->original_vtable = pollset->vtable;
     494        5356 :   up_args->pollset = pollset;
     495        5356 :   up_args->promotion_closure.cb = basic_do_promote;
     496        5356 :   up_args->promotion_closure.cb_arg = up_args;
     497             : 
     498        5356 :   grpc_closure_list_add(&pollset->idle_jobs, &up_args->promotion_closure, 1);
     499        5356 :   grpc_pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST);
     500             : 
     501             : exit:
     502     1337577 :   if (and_unlock_pollset) {
     503     1337577 :     gpr_mu_unlock(&pollset->mu);
     504             :   }
     505     1338589 : }
     506             : 
     507           0 : static void basic_pollset_del_fd(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
     508             :                                  grpc_fd *fd, int and_unlock_pollset) {
     509           0 :   GPR_ASSERT(fd);
     510           0 :   if (fd == pollset->data.ptr) {
     511           0 :     GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
     512           0 :     pollset->data.ptr = NULL;
     513             :   }
     514             : 
     515           0 :   if (and_unlock_pollset) {
     516           0 :     gpr_mu_unlock(&pollset->mu);
     517             :   }
     518           0 : }
     519             : 
     520     1101850 : static void basic_pollset_maybe_work_and_unlock(grpc_exec_ctx *exec_ctx,
     521             :                                                 grpc_pollset *pollset,
     522             :                                                 grpc_pollset_worker *worker,
     523             :                                                 gpr_timespec deadline,
     524             :                                                 gpr_timespec now) {
     525             : #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
     526             : #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
     527             : 
     528             :   struct pollfd pfd[3];
     529             :   grpc_fd *fd;
     530             :   grpc_fd_watcher fd_watcher;
     531             :   int timeout;
     532             :   int r;
     533             :   nfds_t nfds;
     534             : 
     535     1101850 :   fd = pollset->data.ptr;
     536     1101850 :   if (fd && grpc_fd_is_orphaned(fd)) {
     537          56 :     GRPC_FD_UNREF(fd, "basicpoll");
     538          56 :     fd = pollset->data.ptr = NULL;
     539             :   }
     540     1101852 :   timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
     541     1101886 :   pfd[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
     542     1101886 :   pfd[0].events = POLLIN;
     543     1101886 :   pfd[0].revents = 0;
     544     1101886 :   pfd[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
     545     1101886 :   pfd[1].events = POLLIN;
     546     1101886 :   pfd[1].revents = 0;
     547     1101886 :   nfds = 2;
     548     1101886 :   if (fd) {
     549     1096833 :     pfd[2].fd = fd->fd;
     550     1096833 :     pfd[2].revents = 0;
     551     1096833 :     GRPC_FD_REF(fd, "basicpoll_begin");
     552     1096882 :     gpr_mu_unlock(&pollset->mu);
     553     1096881 :     pfd[2].events = (short)grpc_fd_begin_poll(fd, pollset, worker, POLLIN,
     554             :                                               POLLOUT, &fd_watcher);
     555     1096772 :     if (pfd[2].events != 0) {
     556      814266 :       nfds++;
     557             :     }
     558             :   } else {
     559        5053 :     gpr_mu_unlock(&pollset->mu);
     560             :   }
     561             : 
     562             :   /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
     563             :      even going into the blocking annotation if possible */
     564             :   /* poll fd count (argument 2) is shortened by one if we have no events
     565             :      to poll on - such that it only includes the kicker */
     566             :   GRPC_SCHEDULING_START_BLOCKING_REGION;
     567     1101825 :   r = grpc_poll_function(pfd, nfds, timeout);
     568             :   GRPC_SCHEDULING_END_BLOCKING_REGION;
     569             :   GRPC_TIMER_MARK(GRPC_PTAG_POLL_FINISHED, r);
     570             : 
     571     1101896 :   if (r < 0) {
     572          48 :     gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
     573          48 :     if (fd) {
     574          48 :       grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
     575             :     }
     576     1101848 :   } else if (r == 0) {
     577       89211 :     if (fd) {
     578       87450 :       grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
     579             :     }
     580             :   } else {
     581     1012637 :     if (pfd[0].revents & POLLIN_CHECK) {
     582          71 :       grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
     583             :     }
     584     1012667 :     if (pfd[1].revents & POLLIN_CHECK) {
     585      241454 :       grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
     586             :     }
     587     1012660 :     if (nfds > 2) {
     588      793644 :       grpc_fd_end_poll(exec_ctx, &fd_watcher, pfd[2].revents & POLLIN_CHECK,
     589      793644 :                        pfd[2].revents & POLLOUT_CHECK);
     590      219016 :     } else if (fd) {
     591      215724 :       grpc_fd_end_poll(exec_ctx, &fd_watcher, 0, 0);
     592             :     }
     593             :   }
     594             : 
     595     1101845 :   if (fd) {
     596     1096792 :     GRPC_FD_UNREF(fd, "basicpoll_begin");
     597             :   }
     598     1101929 : }
     599             : 
     600      646612 : static void basic_pollset_destroy(grpc_pollset *pollset) {
     601      646612 :   if (pollset->data.ptr != NULL) {
     602      322308 :     GRPC_FD_UNREF(pollset->data.ptr, "basicpoll");
     603      322309 :     pollset->data.ptr = NULL;
     604             :   }
     605      646613 : }
     606             : 
     607             : static const grpc_pollset_vtable basic_pollset = {
     608             :     basic_pollset_add_fd, basic_pollset_del_fd,
     609             :     basic_pollset_maybe_work_and_unlock, basic_pollset_destroy,
     610             :     basic_pollset_destroy};
     611             : 
     612      325442 : static void become_basic_pollset(grpc_pollset *pollset, grpc_fd *fd_or_null) {
     613      325442 :   pollset->vtable = &basic_pollset;
     614      325442 :   pollset->data.ptr = fd_or_null;
     615      325442 :   if (fd_or_null != NULL) {
     616           0 :     GRPC_FD_REF(fd_or_null, "basicpoll");
     617             :   }
     618      325442 : }
     619             : 
     620             : #endif /* GPR_POSIX_POLLSET */

Generated by: LCOV version 1.10