LCOV - code coverage report
Current view: top level - core/iomgr - timer.c (source / functions) Hit Total Coverage
Test: tmp.CaZ6RjdVn2 Lines: 149 154 96.8 %
Date: 2015-12-10 22:15:08 Functions: 18 19 94.7 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/iomgr/timer.h"
      35             : 
      36             : #include "src/core/iomgr/timer_heap.h"
      37             : #include "src/core/iomgr/timer_internal.h"
      38             : #include "src/core/iomgr/time_averaged_stats.h"
      39             : #include <grpc/support/log.h>
      40             : #include <grpc/support/sync.h>
      41             : #include <grpc/support/useful.h>
      42             : 
      43             : #define INVALID_HEAP_INDEX 0xffffffffu
      44             : 
      45             : #define LOG2_NUM_SHARDS 5
      46             : #define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
      47             : #define ADD_DEADLINE_SCALE 0.33
      48             : #define MIN_QUEUE_WINDOW_DURATION 0.01
      49             : #define MAX_QUEUE_WINDOW_DURATION 1
      50             : 
      51             : typedef struct {
      52             :   gpr_mu mu;
      53             :   grpc_time_averaged_stats stats;
      54             :   /* All and only timers with deadlines <= this will be in the heap. */
      55             :   gpr_timespec queue_deadline_cap;
      56             :   gpr_timespec min_deadline;
      57             :   /* Index in the g_shard_queue */
      58             :   gpr_uint32 shard_queue_index;
      59             :   /* This holds all timers with deadlines < queue_deadline_cap. Timers in this
      60             :      list have the top bit of their deadline set to 0. */
      61             :   grpc_timer_heap heap;
      62             :   /* This holds timers whose deadline is >= queue_deadline_cap. */
      63             :   grpc_timer list;
      64             : } shard_type;
      65             : 
      66             : /* Protects g_shard_queue */
      67             : static gpr_mu g_mu;
      68             : /* Allow only one run_some_expired_timers at once */
      69             : static gpr_mu g_checker_mu;
      70             : static gpr_clock_type g_clock_type;
      71             : static shard_type g_shards[NUM_SHARDS];
      72             : /* Protected by g_mu */
      73             : static shard_type *g_shard_queue[NUM_SHARDS];
      74             : 
      75             : static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
      76             :                                    gpr_timespec *next, int success);
      77             : 
      78      375434 : static gpr_timespec compute_min_deadline(shard_type *shard) {
      79      376071 :   return grpc_timer_heap_is_empty(&shard->heap)
      80             :              ? shard->queue_deadline_cap
      81         637 :              : grpc_timer_heap_top(&shard->heap)->deadline;
      82             : }
      83             : 
      84        3459 : void grpc_timer_list_init(gpr_timespec now) {
      85             :   gpr_uint32 i;
      86             : 
      87        3459 :   gpr_mu_init(&g_mu);
      88        3459 :   gpr_mu_init(&g_checker_mu);
      89        3459 :   g_clock_type = now.clock_type;
      90             : 
      91      114147 :   for (i = 0; i < NUM_SHARDS; i++) {
      92      110688 :     shard_type *shard = &g_shards[i];
      93      110688 :     gpr_mu_init(&shard->mu);
      94      110688 :     grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
      95             :                                   0.5);
      96      110688 :     shard->queue_deadline_cap = now;
      97      110688 :     shard->shard_queue_index = i;
      98      110688 :     grpc_timer_heap_init(&shard->heap);
      99      110688 :     shard->list.next = shard->list.prev = &shard->list;
     100      110688 :     shard->min_deadline = compute_min_deadline(shard);
     101      110688 :     g_shard_queue[i] = shard;
     102             :   }
     103        3459 : }
     104             : 
     105        3457 : void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
     106             :   int i;
     107        3457 :   run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0);
     108      114081 :   for (i = 0; i < NUM_SHARDS; i++) {
     109      110624 :     shard_type *shard = &g_shards[i];
     110      110624 :     gpr_mu_destroy(&shard->mu);
     111      110624 :     grpc_timer_heap_destroy(&shard->heap);
     112             :   }
     113        3457 :   gpr_mu_destroy(&g_mu);
     114        3457 :   gpr_mu_destroy(&g_checker_mu);
     115        3457 : }
     116             : 
     117             : /* This is a cheap, but good enough, pointer hash for sharding the tasks: */
     118     2121540 : static size_t shard_idx(const grpc_timer *info) {
     119     2121728 :   size_t x = (size_t)info;
     120     2121728 :   return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1);
     121             : }
     122             : 
     123     1061011 : static double ts_to_dbl(gpr_timespec ts) {
     124     1061139 :   return (double)ts.tv_sec + 1e-9 * ts.tv_nsec;
     125             : }
     126             : 
     127      263785 : static gpr_timespec dbl_to_ts(double d) {
     128             :   gpr_timespec ts;
     129      264050 :   ts.tv_sec = (time_t)d;
     130      264050 :   ts.tv_nsec = (int)(1e9 * (d - (double)ts.tv_sec));
     131      263785 :   ts.clock_type = GPR_TIMESPAN;
     132      263785 :   return ts;
     133             : }
     134             : 
     135     1060335 : static void list_join(grpc_timer *head, grpc_timer *timer) {
     136     1060457 :   timer->next = head;
     137     1060457 :   timer->prev = head->prev;
     138     1060457 :   timer->next->prev = timer->prev->next = timer;
     139     1060335 : }
     140             : 
     141     1060335 : static void list_remove(grpc_timer *timer) {
     142     1060415 :   timer->next->prev = timer->prev;
     143     1060415 :   timer->prev->next = timer->next;
     144     1060335 : }
     145             : 
     146     4179258 : static void swap_adjacent_shards_in_queue(gpr_uint32 first_shard_queue_index) {
     147             :   shard_type *temp;
     148     4183449 :   temp = g_shard_queue[first_shard_queue_index];
     149     4183449 :   g_shard_queue[first_shard_queue_index] =
     150     4183449 :       g_shard_queue[first_shard_queue_index + 1];
     151     4183449 :   g_shard_queue[first_shard_queue_index + 1] = temp;
     152     4183449 :   g_shard_queue[first_shard_queue_index]->shard_queue_index =
     153             :       first_shard_queue_index;
     154     8362707 :   g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
     155     4179258 :       first_shard_queue_index + 1;
     156     4179258 : }
     157             : 
     158      265400 : static void note_deadline_change(shard_type *shard) {
     159      547151 :   while (shard->shard_queue_index > 0 &&
     160        8439 :          gpr_time_cmp(
     161             :              shard->min_deadline,
     162        8439 :              g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) {
     163        8209 :     swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
     164             :   }
     165     9136988 :   while (shard->shard_queue_index < NUM_SHARDS - 1 &&
     166     4431245 :          gpr_time_cmp(
     167             :              shard->min_deadline,
     168     4431245 :              g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) {
     169     4175240 :     swap_adjacent_shards_in_queue(shard->shard_queue_index);
     170             :   }
     171      265400 : }
     172             : 
     173     1061139 : void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
     174             :                      gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
     175             :                      void *timer_cb_arg, gpr_timespec now) {
     176     1061011 :   int is_first_timer = 0;
     177     1061139 :   shard_type *shard = &g_shards[shard_idx(timer)];
     178     1061139 :   GPR_ASSERT(deadline.clock_type == g_clock_type);
     179     1061139 :   GPR_ASSERT(now.clock_type == g_clock_type);
     180     1061139 :   grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg);
     181     1061139 :   timer->deadline = deadline;
     182     1061139 :   timer->triggered = 0;
     183             : 
     184             :   /* TODO(ctiller): check deadline expired */
     185             : 
     186     1061139 :   gpr_mu_lock(&shard->mu);
     187     1061267 :   grpc_time_averaged_stats_add_sample(&shard->stats,
     188             :                                       ts_to_dbl(gpr_time_sub(deadline, now)));
     189     1061139 :   if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) {
     190         682 :     is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
     191             :   } else {
     192     1060457 :     timer->heap_index = INVALID_HEAP_INDEX;
     193     1060457 :     list_join(&shard->list, timer);
     194             :   }
     195     1061139 :   gpr_mu_unlock(&shard->mu);
     196             : 
     197             :   /* Deadline may have decreased, we need to adjust the master queue.  Note
     198             :      that there is a potential racy unlocked region here.  There could be a
     199             :      reordering of multiple grpc_timer_init calls, at this point, but the < test
     200             :      below should ensure that we err on the side of caution.  There could
     201             :      also be a race with grpc_timer_check, which might beat us to the lock.  In
     202             :      that case, it is possible that the timer that we added will have already
     203             :      run by the time we hold the lock, but that too is a safe error.
     204             :      Finally, it's possible that the grpc_timer_check that intervened failed to
     205             :      trigger the new timer because the min_deadline hadn't yet been reduced.
     206             :      In that case, the timer will simply have to wait for the next
     207             :      grpc_timer_check. */
     208     1061139 :   if (is_first_timer) {
     209         677 :     gpr_mu_lock(&g_mu);
     210         677 :     if (gpr_time_cmp(deadline, shard->min_deadline) < 0) {
     211         654 :       gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline;
     212         654 :       shard->min_deadline = deadline;
     213         654 :       note_deadline_change(shard);
     214        1078 :       if (shard->shard_queue_index == 0 &&
     215         424 :           gpr_time_cmp(deadline, old_min_deadline) < 0) {
     216         424 :         grpc_kick_poller();
     217             :       }
     218             :     }
     219         677 :     gpr_mu_unlock(&g_mu);
     220             :   }
     221     1061139 : }
     222             : 
     223     1060589 : void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
     224     1060529 :   shard_type *shard = &g_shards[shard_idx(timer)];
     225     1060589 :   gpr_mu_lock(&shard->mu);
     226     1060589 :   if (!timer->triggered) {
     227     1059582 :     grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, 0);
     228     1059582 :     timer->triggered = 1;
     229     1059582 :     if (timer->heap_index == INVALID_HEAP_INDEX) {
     230     1059372 :       list_remove(timer);
     231             :     } else {
     232         152 :       grpc_timer_heap_remove(&shard->heap, timer);
     233             :     }
     234             :   }
     235     1060589 :   gpr_mu_unlock(&shard->mu);
     236     1060589 : }
     237             : 
     238             : /* This is called when the queue is empty and "now" has reached the
     239             :    queue_deadline_cap.  We compute a new queue deadline and then scan the map
     240             :    for timers that fall at or under it.  Returns true if the queue is no
     241             :    longer empty.
     242             :    REQUIRES: shard->mu locked */
     243      264050 : static int refill_queue(shard_type *shard, gpr_timespec now) {
     244             :   /* Compute the new queue window width and bound by the limits: */
     245      264050 :   double computed_deadline_delta =
     246      264050 :       grpc_time_averaged_stats_update_average(&shard->stats) *
     247             :       ADD_DEADLINE_SCALE;
     248      263785 :   double deadline_delta =
     249      264050 :       GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
     250             :                 MAX_QUEUE_WINDOW_DURATION);
     251             :   grpc_timer *timer, *next;
     252             : 
     253             :   /* Compute the new cap and put all timers under it into the queue: */
     254      264050 :   shard->queue_deadline_cap = gpr_time_add(
     255             :       gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta));
     256      268376 :   for (timer = shard->list.next; timer != &shard->list; timer = next) {
     257        4061 :     next = timer->next;
     258             : 
     259        4061 :     if (gpr_time_cmp(timer->deadline, shard->queue_deadline_cap) < 0) {
     260         963 :       list_remove(timer);
     261         985 :       grpc_timer_heap_add(&shard->heap, timer);
     262             :     }
     263             :   }
     264      264050 :   return !grpc_timer_heap_is_empty(&shard->heap);
     265             : }
     266             : 
     267             : /* This pops the next non-cancelled timer with deadline <= now from the queue,
     268             :    or returns NULL if there isn't one.
     269             :    REQUIRES: shard->mu locked */
     270      266261 : static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) {
     271             :   grpc_timer *timer;
     272             :   for (;;) {
     273      266261 :     if (grpc_timer_heap_is_empty(&shard->heap)) {
     274      265072 :       if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL;
     275      264050 :       if (!refill_queue(shard, now)) return NULL;
     276             :     }
     277        2152 :     timer = grpc_timer_heap_top(&shard->heap);
     278        2152 :     if (gpr_time_cmp(timer->deadline, now) > 0) return NULL;
     279        1515 :     timer->triggered = 1;
     280        1515 :     grpc_timer_heap_pop(&shard->heap);
     281        1515 :     return timer;
     282             :   }
     283             : }
     284             : 
     285             : /* REQUIRES: shard->mu unlocked */
     286      264746 : static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
     287             :                          gpr_timespec now, gpr_timespec *new_min_deadline,
     288             :                          int success) {
     289      264455 :   size_t n = 0;
     290             :   grpc_timer *timer;
     291      264746 :   gpr_mu_lock(&shard->mu);
     292      531007 :   while ((timer = pop_one(shard, now))) {
     293        1515 :     grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success);
     294        1515 :     n++;
     295             :   }
     296      264746 :   *new_min_deadline = compute_min_deadline(shard);
     297      264746 :   gpr_mu_unlock(&shard->mu);
     298      264746 :   return n;
     299             : }
     300             : 
     301     6940654 : static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
     302             :                                    gpr_timespec *next, int success) {
     303     6910738 :   size_t n = 0;
     304             : 
     305             :   /* TODO(ctiller): verify that there are any timers (atomically) here */
     306             : 
     307     6940654 :   if (gpr_mu_trylock(&g_checker_mu)) {
     308     6933307 :     gpr_mu_lock(&g_mu);
     309             : 
     310    14131360 :     while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
     311             :       gpr_timespec new_min_deadline;
     312             : 
     313             :       /* For efficiency, we pop as many available timers as we can from the
     314             :          shard.  This may violate perfect timer deadline ordering, but that
     315             :          shouldn't be a big deal because we don't make ordering guarantees. */
     316      264746 :       n += pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline,
     317             :                       success);
     318             : 
     319             :       /* An grpc_timer_init() on the shard could intervene here, adding a new
     320             :          timer that is earlier than new_min_deadline.  However,
     321             :          grpc_timer_init() will block on the master_lock before it can call
     322             :          set_min_deadline, so this one will complete first and then the Addtimer
     323             :          will reduce the min_deadline (perhaps unnecessarily). */
     324      264746 :       g_shard_queue[0]->min_deadline = new_min_deadline;
     325      264746 :       note_deadline_change(g_shard_queue[0]);
     326             :     }
     327             : 
     328     6933307 :     if (next) {
     329     6929471 :       *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline);
     330             :     }
     331             : 
     332     6933307 :     gpr_mu_unlock(&g_mu);
     333     6933307 :     gpr_mu_unlock(&g_checker_mu);
     334             :   }
     335             : 
     336     6940485 :   return (int)n;
     337             : }
     338             : 
     339     6937013 : int grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
     340             :                      gpr_timespec *next) {
     341     6937013 :   GPR_ASSERT(now.clock_type == g_clock_type);
     342     6937295 :   return run_some_expired_timers(
     343             :       exec_ctx, now, next,
     344     6937013 :       gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
     345             : }
     346             : 
     347           0 : gpr_timespec grpc_timer_list_next_timeout(void) {
     348             :   gpr_timespec out;
     349           0 :   gpr_mu_lock(&g_mu);
     350           0 :   out = g_shard_queue[0]->min_deadline;
     351           0 :   gpr_mu_unlock(&g_mu);
     352           0 :   return out;
     353             : }

Generated by: LCOV version 1.11