LCOV - code coverage report
Current view: top level - src/core/iomgr - alarm.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 149 154 96.8 %
Date: 2015-10-10 Functions: 17 18 94.4 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/iomgr/alarm.h"
      35             : 
      36             : #include "src/core/iomgr/alarm_heap.h"
      37             : #include "src/core/iomgr/alarm_internal.h"
      38             : #include "src/core/iomgr/time_averaged_stats.h"
      39             : #include <grpc/support/log.h>
      40             : #include <grpc/support/sync.h>
      41             : #include <grpc/support/useful.h>
      42             : 
      43             : #define INVALID_HEAP_INDEX 0xffffffffu
      44             : 
      45             : #define LOG2_NUM_SHARDS 5
      46             : #define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
      47             : #define ADD_DEADLINE_SCALE 0.33
      48             : #define MIN_QUEUE_WINDOW_DURATION 0.01
      49             : #define MAX_QUEUE_WINDOW_DURATION 1
      50             : 
      51             : typedef struct {
      52             :   gpr_mu mu;
      53             :   grpc_time_averaged_stats stats;
      54             :   /* All and only alarms with deadlines <= this will be in the heap. */
      55             :   gpr_timespec queue_deadline_cap;
      56             :   gpr_timespec min_deadline;
      57             :   /* Index in the g_shard_queue */
      58             :   gpr_uint32 shard_queue_index;
      59             :   /* This holds all alarms with deadlines < queue_deadline_cap.  Alarms in this
      60             :      list have the top bit of their deadline set to 0. */
      61             :   grpc_alarm_heap heap;
      62             :   /* This holds alarms whose deadline is >= queue_deadline_cap. */
      63             :   grpc_alarm list;
      64             : } shard_type;
      65             : 
      66             : /* Protects g_shard_queue */
      67             : static gpr_mu g_mu;
      68             : /* Allow only one run_some_expired_alarms at once */
      69             : static gpr_mu g_checker_mu;
      70             : static gpr_clock_type g_clock_type;
      71             : static shard_type g_shards[NUM_SHARDS];
      72             : /* Protected by g_mu */
      73             : static shard_type *g_shard_queue[NUM_SHARDS];
      74             : 
      75             : static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now,
      76             :                                    gpr_timespec *next, int success);
      77             : 
      78      271527 : static gpr_timespec compute_min_deadline(shard_type *shard) {
      79      271955 :   return grpc_alarm_heap_is_empty(&shard->heap)
      80             :              ? shard->queue_deadline_cap
      81         428 :              : grpc_alarm_heap_top(&shard->heap)->deadline;
      82             : }
      83             : 
      84        2508 : void grpc_alarm_list_init(gpr_timespec now) {
      85             :   gpr_uint32 i;
      86             : 
      87        2508 :   gpr_mu_init(&g_mu);
      88        2508 :   gpr_mu_init(&g_checker_mu);
      89        2508 :   g_clock_type = now.clock_type;
      90             : 
      91       82764 :   for (i = 0; i < NUM_SHARDS; i++) {
      92       80256 :     shard_type *shard = &g_shards[i];
      93       80256 :     gpr_mu_init(&shard->mu);
      94       80256 :     grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
      95             :                                   0.5);
      96       80256 :     shard->queue_deadline_cap = now;
      97       80256 :     shard->shard_queue_index = i;
      98       80256 :     grpc_alarm_heap_init(&shard->heap);
      99       80256 :     shard->list.next = shard->list.prev = &shard->list;
     100       80256 :     shard->min_deadline = compute_min_deadline(shard);
     101       80256 :     g_shard_queue[i] = shard;
     102             :   }
     103        2508 : }
     104             : 
     105        2508 : void grpc_alarm_list_shutdown(grpc_exec_ctx *exec_ctx) {
     106             :   int i;
     107        2508 :   run_some_expired_alarms(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0);
     108       82764 :   for (i = 0; i < NUM_SHARDS; i++) {
     109       80256 :     shard_type *shard = &g_shards[i];
     110       80256 :     gpr_mu_destroy(&shard->mu);
     111       80256 :     grpc_alarm_heap_destroy(&shard->heap);
     112             :   }
     113        2508 :   gpr_mu_destroy(&g_mu);
     114        2508 :   gpr_mu_destroy(&g_checker_mu);
     115        2508 : }
     116             : 
     117             : /* This is a cheap, but good enough, pointer hash for sharding the tasks: */
     118       21176 : static size_t shard_idx(const grpc_alarm *info) {
     119       21176 :   size_t x = (size_t)info;
     120       21176 :   return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1);
     121             : }
     122             : 
     123       10846 : static double ts_to_dbl(gpr_timespec ts) {
     124       10846 :   return (double)ts.tv_sec + 1e-9 * ts.tv_nsec;
     125             : }
     126             : 
     127      190963 : static gpr_timespec dbl_to_ts(double d) {
     128             :   gpr_timespec ts;
     129      190963 :   ts.tv_sec = (time_t)d;
     130      190963 :   ts.tv_nsec = (int)(1e9 * (d - (double)ts.tv_sec));
     131      190963 :   ts.clock_type = GPR_TIMESPAN;
     132      190963 :   return ts;
     133             : }
     134             : 
     135       10479 : static void list_join(grpc_alarm *head, grpc_alarm *alarm) {
     136       10479 :   alarm->next = head;
     137       10479 :   alarm->prev = head->prev;
     138       10479 :   alarm->next->prev = alarm->prev->next = alarm;
     139       10479 : }
     140             : 
     141       10479 : static void list_remove(grpc_alarm *alarm) {
     142       10479 :   alarm->next->prev = alarm->prev;
     143       10479 :   alarm->prev->next = alarm->next;
     144       10479 : }
     145             : 
     146     3020492 : static void swap_adjacent_shards_in_queue(gpr_uint32 first_shard_queue_index) {
     147             :   shard_type *temp;
     148     3020492 :   temp = g_shard_queue[first_shard_queue_index];
     149     3020492 :   g_shard_queue[first_shard_queue_index] =
     150     3020492 :       g_shard_queue[first_shard_queue_index + 1];
     151     3020492 :   g_shard_queue[first_shard_queue_index + 1] = temp;
     152     3020492 :   g_shard_queue[first_shard_queue_index]->shard_queue_index =
     153             :       first_shard_queue_index;
     154     6040984 :   g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
     155     3020492 :       first_shard_queue_index + 1;
     156     3020492 : }
     157             : 
     158      191625 : static void note_deadline_change(shard_type *shard) {
     159      395226 :   while (shard->shard_queue_index > 0 &&
     160        6047 :          gpr_time_cmp(
     161             :              shard->min_deadline,
     162        6047 :              g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) {
     163        5929 :     swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
     164             :   }
     165     6597483 :   while (shard->shard_queue_index < NUM_SHARDS - 1 &&
     166     3199670 :          gpr_time_cmp(
     167             :              shard->min_deadline,
     168     3199670 :              g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) {
     169     3014563 :     swap_adjacent_shards_in_queue(shard->shard_queue_index);
     170             :   }
     171      191625 : }
     172             : 
     173       10846 : void grpc_alarm_init(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm,
     174             :                      gpr_timespec deadline, grpc_iomgr_cb_func alarm_cb,
     175             :                      void *alarm_cb_arg, gpr_timespec now) {
     176       10846 :   int is_first_alarm = 0;
     177       10846 :   shard_type *shard = &g_shards[shard_idx(alarm)];
     178       10846 :   GPR_ASSERT(deadline.clock_type == g_clock_type);
     179       10846 :   GPR_ASSERT(now.clock_type == g_clock_type);
     180       10846 :   grpc_closure_init(&alarm->closure, alarm_cb, alarm_cb_arg);
     181       10846 :   alarm->deadline = deadline;
     182       10846 :   alarm->triggered = 0;
     183             : 
     184             :   /* TODO(ctiller): check deadline expired */
     185             : 
     186       10846 :   gpr_mu_lock(&shard->mu);
     187       10846 :   grpc_time_averaged_stats_add_sample(&shard->stats,
     188             :                                       ts_to_dbl(gpr_time_sub(deadline, now)));
     189       10846 :   if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) {
     190         367 :     is_first_alarm = grpc_alarm_heap_add(&shard->heap, alarm);
     191             :   } else {
     192       10478 :     alarm->heap_index = INVALID_HEAP_INDEX;
     193       10478 :     list_join(&shard->list, alarm);
     194             :   }
     195       10845 :   gpr_mu_unlock(&shard->mu);
     196             : 
     197             :   /* Deadline may have decreased, we need to adjust the master queue.  Note
     198             :      that there is a potential racy unlocked region here.  There could be a
     199             :      reordering of multiple grpc_alarm_init calls, at this point, but the < test
     200             :      below should ensure that we err on the side of caution.  There could
     201             :      also be a race with grpc_alarm_check, which might beat us to the lock.  In
     202             :      that case, it is possible that the alarm that we added will have already
     203             :      run by the time we hold the lock, but that too is a safe error.
     204             :      Finally, it's possible that the grpc_alarm_check that intervened failed to
     205             :      trigger the new alarm because the min_deadline hadn't yet been reduced.
     206             :      In that case, the alarm will simply have to wait for the next
     207             :      grpc_alarm_check. */
     208       10846 :   if (is_first_alarm) {
     209         363 :     gpr_mu_lock(&g_mu);
     210         363 :     if (gpr_time_cmp(deadline, shard->min_deadline) < 0) {
     211         354 :       gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline;
     212         354 :       shard->min_deadline = deadline;
     213         354 :       note_deadline_change(shard);
     214         590 :       if (shard->shard_queue_index == 0 &&
     215         236 :           gpr_time_cmp(deadline, old_min_deadline) < 0) {
     216         236 :         grpc_kick_poller();
     217             :       }
     218             :     }
     219         363 :     gpr_mu_unlock(&g_mu);
     220             :   }
     221       10846 : }
     222             : 
     223       10330 : void grpc_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm) {
     224       10330 :   shard_type *shard = &g_shards[shard_idx(alarm)];
     225       10330 :   gpr_mu_lock(&shard->mu);
     226       10330 :   if (!alarm->triggered) {
     227        9922 :     grpc_exec_ctx_enqueue(exec_ctx, &alarm->closure, 0);
     228        9922 :     alarm->triggered = 1;
     229        9922 :     if (alarm->heap_index == INVALID_HEAP_INDEX) {
     230        9781 :       list_remove(alarm);
     231             :     } else {
     232         141 :       grpc_alarm_heap_remove(&shard->heap, alarm);
     233             :     }
     234             :   }
     235       10330 :   gpr_mu_unlock(&shard->mu);
     236       10330 : }
     237             : 
     238             : /* This is called when the queue is empty and "now" has reached the
     239             :    queue_deadline_cap.  We compute a new queue deadline and then scan the map
     240             :    for alarms that fall at or under it.  Returns true if the queue is no
     241             :    longer empty.
     242             :    REQUIRES: shard->mu locked */
     243      190963 : static int refill_queue(shard_type *shard, gpr_timespec now) {
     244             :   /* Compute the new queue window width and bound by the limits: */
     245      190963 :   double computed_deadline_delta =
     246      190963 :       grpc_time_averaged_stats_update_average(&shard->stats) *
     247             :       ADD_DEADLINE_SCALE;
     248      190963 :   double deadline_delta =
     249      190963 :       GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
     250             :                 MAX_QUEUE_WINDOW_DURATION);
     251             :   grpc_alarm *alarm, *next;
     252             : 
     253             :   /* Compute the new cap and put all alarms under it into the queue: */
     254      190963 :   shard->queue_deadline_cap = gpr_time_add(
     255             :       gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta));
     256      193909 :   for (alarm = shard->list.next; alarm != &shard->list; alarm = next) {
     257        2946 :     next = alarm->next;
     258             : 
     259        2946 :     if (gpr_time_cmp(alarm->deadline, shard->queue_deadline_cap) < 0) {
     260         698 :       list_remove(alarm);
     261         698 :       grpc_alarm_heap_add(&shard->heap, alarm);
     262             :     }
     263             :   }
     264      190963 :   return !grpc_alarm_heap_is_empty(&shard->heap);
     265             : }
     266             : 
     267             : /* This pops the next non-cancelled alarm with deadline <= now from the queue,
     268             :    or returns NULL if there isn't one.
     269             :    REQUIRES: shard->mu locked */
     270      192195 : static grpc_alarm *pop_one(shard_type *shard, gpr_timespec now) {
     271             :   grpc_alarm *alarm;
     272             :   for (;;) {
     273      192195 :     if (grpc_alarm_heap_is_empty(&shard->heap)) {
     274      191516 :       if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL;
     275      190963 :       if (!refill_queue(shard, now)) return NULL;
     276             :     }
     277        1352 :     alarm = grpc_alarm_heap_top(&shard->heap);
     278        1352 :     if (gpr_time_cmp(alarm->deadline, now) > 0) return NULL;
     279         924 :     alarm->triggered = 1;
     280         924 :     grpc_alarm_heap_pop(&shard->heap);
     281         924 :     return alarm;
     282             :   }
     283             : }
     284             : 
     285             : /* REQUIRES: shard->mu unlocked */
     286      191271 : static size_t pop_alarms(grpc_exec_ctx *exec_ctx, shard_type *shard,
     287             :                          gpr_timespec now, gpr_timespec *new_min_deadline,
     288             :                          int success) {
     289      191271 :   size_t n = 0;
     290             :   grpc_alarm *alarm;
     291      191271 :   gpr_mu_lock(&shard->mu);
     292      383466 :   while ((alarm = pop_one(shard, now))) {
     293         924 :     grpc_exec_ctx_enqueue(exec_ctx, &alarm->closure, success);
     294         924 :     n++;
     295             :   }
     296      191271 :   *new_min_deadline = compute_min_deadline(shard);
     297      191271 :   gpr_mu_unlock(&shard->mu);
     298      191271 :   return n;
     299             : }
     300             : 
     301     5291642 : static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now,
     302             :                                    gpr_timespec *next, int success) {
     303     5291642 :   size_t n = 0;
     304             : 
     305             :   /* TODO(ctiller): verify that there are any alarms (atomically) here */
     306             : 
     307     5291642 :   if (gpr_mu_trylock(&g_checker_mu)) {
     308     5023126 :     gpr_mu_lock(&g_mu);
     309             : 
     310    10237523 :     while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
     311             :       gpr_timespec new_min_deadline;
     312             : 
     313             :       /* For efficiency, we pop as many available alarms as we can from the
     314             :          shard.  This may violate perfect alarm deadline ordering, but that
     315             :          shouldn't be a big deal because we don't make ordering guarantees. */
     316      191271 :       n += pop_alarms(exec_ctx, g_shard_queue[0], now, &new_min_deadline,
     317             :                       success);
     318             : 
     319             :       /* An grpc_alarm_init() on the shard could intervene here, adding a new
     320             :          alarm that is earlier than new_min_deadline.  However,
     321             :          grpc_alarm_init() will block on the master_lock before it can call
     322             :          set_min_deadline, so this one will complete first and then the AddAlarm
     323             :          will reduce the min_deadline (perhaps unnecessarily). */
     324      191271 :       g_shard_queue[0]->min_deadline = new_min_deadline;
     325      191271 :       note_deadline_change(g_shard_queue[0]);
     326             :     }
     327             : 
     328     5023126 :     if (next) {
     329     5020036 :       *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline);
     330             :     }
     331             : 
     332     5023126 :     gpr_mu_unlock(&g_mu);
     333     5023126 :     gpr_mu_unlock(&g_checker_mu);
     334             :   }
     335             : 
     336     5282559 :   return (int)n;
     337             : }
     338             : 
     339     5287825 : int grpc_alarm_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
     340             :                      gpr_timespec *next) {
     341     5287825 :   GPR_ASSERT(now.clock_type == g_clock_type);
     342     5289570 :   return run_some_expired_alarms(
     343             :       exec_ctx, now, next,
     344     5287825 :       gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
     345             : }
     346             : 
     347           0 : gpr_timespec grpc_alarm_list_next_timeout(void) {
     348             :   gpr_timespec out;
     349           0 :   gpr_mu_lock(&g_mu);
     350           0 :   out = g_shard_queue[0]->min_deadline;
     351           0 :   gpr_mu_unlock(&g_mu);
     352           0 :   return out;
     353             : }

Generated by: LCOV version 1.10