Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/iomgr/alarm.h"
35 :
36 : #include "src/core/iomgr/alarm_heap.h"
37 : #include "src/core/iomgr/alarm_internal.h"
38 : #include "src/core/iomgr/time_averaged_stats.h"
39 : #include <grpc/support/log.h>
40 : #include <grpc/support/sync.h>
41 : #include <grpc/support/useful.h>
42 :
43 : #define INVALID_HEAP_INDEX 0xffffffffu
44 :
45 : #define LOG2_NUM_SHARDS 5
46 : #define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
47 : #define ADD_DEADLINE_SCALE 0.33
48 : #define MIN_QUEUE_WINDOW_DURATION 0.01
49 : #define MAX_QUEUE_WINDOW_DURATION 1
50 :
51 : typedef struct {
52 : gpr_mu mu;
53 : grpc_time_averaged_stats stats;
54 : /* All and only alarms with deadlines <= this will be in the heap. */
55 : gpr_timespec queue_deadline_cap;
56 : gpr_timespec min_deadline;
57 : /* Index in the g_shard_queue */
58 : gpr_uint32 shard_queue_index;
59 : /* This holds all alarms with deadlines < queue_deadline_cap. Alarms in this
60 : list have the top bit of their deadline set to 0. */
61 : grpc_alarm_heap heap;
62 : /* This holds alarms whose deadline is >= queue_deadline_cap. */
63 : grpc_alarm list;
64 : } shard_type;
65 :
66 : /* Protects g_shard_queue */
67 : static gpr_mu g_mu;
68 : /* Allow only one run_some_expired_alarms at once */
69 : static gpr_mu g_checker_mu;
70 : static gpr_clock_type g_clock_type;
71 : static shard_type g_shards[NUM_SHARDS];
72 : /* Protected by g_mu */
73 : static shard_type *g_shard_queue[NUM_SHARDS];
74 :
75 : static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now,
76 : gpr_timespec *next, int success);
77 :
78 271527 : static gpr_timespec compute_min_deadline(shard_type *shard) {
79 271955 : return grpc_alarm_heap_is_empty(&shard->heap)
80 : ? shard->queue_deadline_cap
81 428 : : grpc_alarm_heap_top(&shard->heap)->deadline;
82 : }
83 :
84 2508 : void grpc_alarm_list_init(gpr_timespec now) {
85 : gpr_uint32 i;
86 :
87 2508 : gpr_mu_init(&g_mu);
88 2508 : gpr_mu_init(&g_checker_mu);
89 2508 : g_clock_type = now.clock_type;
90 :
91 82764 : for (i = 0; i < NUM_SHARDS; i++) {
92 80256 : shard_type *shard = &g_shards[i];
93 80256 : gpr_mu_init(&shard->mu);
94 80256 : grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
95 : 0.5);
96 80256 : shard->queue_deadline_cap = now;
97 80256 : shard->shard_queue_index = i;
98 80256 : grpc_alarm_heap_init(&shard->heap);
99 80256 : shard->list.next = shard->list.prev = &shard->list;
100 80256 : shard->min_deadline = compute_min_deadline(shard);
101 80256 : g_shard_queue[i] = shard;
102 : }
103 2508 : }
104 :
105 2508 : void grpc_alarm_list_shutdown(grpc_exec_ctx *exec_ctx) {
106 : int i;
107 2508 : run_some_expired_alarms(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0);
108 82764 : for (i = 0; i < NUM_SHARDS; i++) {
109 80256 : shard_type *shard = &g_shards[i];
110 80256 : gpr_mu_destroy(&shard->mu);
111 80256 : grpc_alarm_heap_destroy(&shard->heap);
112 : }
113 2508 : gpr_mu_destroy(&g_mu);
114 2508 : gpr_mu_destroy(&g_checker_mu);
115 2508 : }
116 :
117 : /* This is a cheap, but good enough, pointer hash for sharding the tasks: */
118 21176 : static size_t shard_idx(const grpc_alarm *info) {
119 21176 : size_t x = (size_t)info;
120 21176 : return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1);
121 : }
122 :
123 10846 : static double ts_to_dbl(gpr_timespec ts) {
124 10846 : return (double)ts.tv_sec + 1e-9 * ts.tv_nsec;
125 : }
126 :
127 190963 : static gpr_timespec dbl_to_ts(double d) {
128 : gpr_timespec ts;
129 190963 : ts.tv_sec = (time_t)d;
130 190963 : ts.tv_nsec = (int)(1e9 * (d - (double)ts.tv_sec));
131 190963 : ts.clock_type = GPR_TIMESPAN;
132 190963 : return ts;
133 : }
134 :
135 10479 : static void list_join(grpc_alarm *head, grpc_alarm *alarm) {
136 10479 : alarm->next = head;
137 10479 : alarm->prev = head->prev;
138 10479 : alarm->next->prev = alarm->prev->next = alarm;
139 10479 : }
140 :
141 10479 : static void list_remove(grpc_alarm *alarm) {
142 10479 : alarm->next->prev = alarm->prev;
143 10479 : alarm->prev->next = alarm->next;
144 10479 : }
145 :
146 3020492 : static void swap_adjacent_shards_in_queue(gpr_uint32 first_shard_queue_index) {
147 : shard_type *temp;
148 3020492 : temp = g_shard_queue[first_shard_queue_index];
149 3020492 : g_shard_queue[first_shard_queue_index] =
150 3020492 : g_shard_queue[first_shard_queue_index + 1];
151 3020492 : g_shard_queue[first_shard_queue_index + 1] = temp;
152 3020492 : g_shard_queue[first_shard_queue_index]->shard_queue_index =
153 : first_shard_queue_index;
154 6040984 : g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
155 3020492 : first_shard_queue_index + 1;
156 3020492 : }
157 :
158 191625 : static void note_deadline_change(shard_type *shard) {
159 395226 : while (shard->shard_queue_index > 0 &&
160 6047 : gpr_time_cmp(
161 : shard->min_deadline,
162 6047 : g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) {
163 5929 : swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
164 : }
165 6597483 : while (shard->shard_queue_index < NUM_SHARDS - 1 &&
166 3199670 : gpr_time_cmp(
167 : shard->min_deadline,
168 3199670 : g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) {
169 3014563 : swap_adjacent_shards_in_queue(shard->shard_queue_index);
170 : }
171 191625 : }
172 :
173 10846 : void grpc_alarm_init(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm,
174 : gpr_timespec deadline, grpc_iomgr_cb_func alarm_cb,
175 : void *alarm_cb_arg, gpr_timespec now) {
176 10846 : int is_first_alarm = 0;
177 10846 : shard_type *shard = &g_shards[shard_idx(alarm)];
178 10846 : GPR_ASSERT(deadline.clock_type == g_clock_type);
179 10846 : GPR_ASSERT(now.clock_type == g_clock_type);
180 10846 : grpc_closure_init(&alarm->closure, alarm_cb, alarm_cb_arg);
181 10846 : alarm->deadline = deadline;
182 10846 : alarm->triggered = 0;
183 :
184 : /* TODO(ctiller): check deadline expired */
185 :
186 10846 : gpr_mu_lock(&shard->mu);
187 10846 : grpc_time_averaged_stats_add_sample(&shard->stats,
188 : ts_to_dbl(gpr_time_sub(deadline, now)));
189 10846 : if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) {
190 367 : is_first_alarm = grpc_alarm_heap_add(&shard->heap, alarm);
191 : } else {
192 10478 : alarm->heap_index = INVALID_HEAP_INDEX;
193 10478 : list_join(&shard->list, alarm);
194 : }
195 10845 : gpr_mu_unlock(&shard->mu);
196 :
197 : /* Deadline may have decreased, we need to adjust the master queue. Note
198 : that there is a potential racy unlocked region here. There could be a
199 : reordering of multiple grpc_alarm_init calls, at this point, but the < test
200 : below should ensure that we err on the side of caution. There could
201 : also be a race with grpc_alarm_check, which might beat us to the lock. In
202 : that case, it is possible that the alarm that we added will have already
203 : run by the time we hold the lock, but that too is a safe error.
204 : Finally, it's possible that the grpc_alarm_check that intervened failed to
205 : trigger the new alarm because the min_deadline hadn't yet been reduced.
206 : In that case, the alarm will simply have to wait for the next
207 : grpc_alarm_check. */
208 10846 : if (is_first_alarm) {
209 363 : gpr_mu_lock(&g_mu);
210 363 : if (gpr_time_cmp(deadline, shard->min_deadline) < 0) {
211 354 : gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline;
212 354 : shard->min_deadline = deadline;
213 354 : note_deadline_change(shard);
214 590 : if (shard->shard_queue_index == 0 &&
215 236 : gpr_time_cmp(deadline, old_min_deadline) < 0) {
216 236 : grpc_kick_poller();
217 : }
218 : }
219 363 : gpr_mu_unlock(&g_mu);
220 : }
221 10846 : }
222 :
223 10330 : void grpc_alarm_cancel(grpc_exec_ctx *exec_ctx, grpc_alarm *alarm) {
224 10330 : shard_type *shard = &g_shards[shard_idx(alarm)];
225 10330 : gpr_mu_lock(&shard->mu);
226 10330 : if (!alarm->triggered) {
227 9922 : grpc_exec_ctx_enqueue(exec_ctx, &alarm->closure, 0);
228 9922 : alarm->triggered = 1;
229 9922 : if (alarm->heap_index == INVALID_HEAP_INDEX) {
230 9781 : list_remove(alarm);
231 : } else {
232 141 : grpc_alarm_heap_remove(&shard->heap, alarm);
233 : }
234 : }
235 10330 : gpr_mu_unlock(&shard->mu);
236 10330 : }
237 :
238 : /* This is called when the queue is empty and "now" has reached the
239 : queue_deadline_cap. We compute a new queue deadline and then scan the map
240 : for alarms that fall at or under it. Returns true if the queue is no
241 : longer empty.
242 : REQUIRES: shard->mu locked */
243 190963 : static int refill_queue(shard_type *shard, gpr_timespec now) {
244 : /* Compute the new queue window width and bound by the limits: */
245 190963 : double computed_deadline_delta =
246 190963 : grpc_time_averaged_stats_update_average(&shard->stats) *
247 : ADD_DEADLINE_SCALE;
248 190963 : double deadline_delta =
249 190963 : GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
250 : MAX_QUEUE_WINDOW_DURATION);
251 : grpc_alarm *alarm, *next;
252 :
253 : /* Compute the new cap and put all alarms under it into the queue: */
254 190963 : shard->queue_deadline_cap = gpr_time_add(
255 : gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta));
256 193909 : for (alarm = shard->list.next; alarm != &shard->list; alarm = next) {
257 2946 : next = alarm->next;
258 :
259 2946 : if (gpr_time_cmp(alarm->deadline, shard->queue_deadline_cap) < 0) {
260 698 : list_remove(alarm);
261 698 : grpc_alarm_heap_add(&shard->heap, alarm);
262 : }
263 : }
264 190963 : return !grpc_alarm_heap_is_empty(&shard->heap);
265 : }
266 :
267 : /* This pops the next non-cancelled alarm with deadline <= now from the queue,
268 : or returns NULL if there isn't one.
269 : REQUIRES: shard->mu locked */
270 192195 : static grpc_alarm *pop_one(shard_type *shard, gpr_timespec now) {
271 : grpc_alarm *alarm;
272 : for (;;) {
273 192195 : if (grpc_alarm_heap_is_empty(&shard->heap)) {
274 191516 : if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL;
275 190963 : if (!refill_queue(shard, now)) return NULL;
276 : }
277 1352 : alarm = grpc_alarm_heap_top(&shard->heap);
278 1352 : if (gpr_time_cmp(alarm->deadline, now) > 0) return NULL;
279 924 : alarm->triggered = 1;
280 924 : grpc_alarm_heap_pop(&shard->heap);
281 924 : return alarm;
282 : }
283 : }
284 :
285 : /* REQUIRES: shard->mu unlocked */
286 191271 : static size_t pop_alarms(grpc_exec_ctx *exec_ctx, shard_type *shard,
287 : gpr_timespec now, gpr_timespec *new_min_deadline,
288 : int success) {
289 191271 : size_t n = 0;
290 : grpc_alarm *alarm;
291 191271 : gpr_mu_lock(&shard->mu);
292 383466 : while ((alarm = pop_one(shard, now))) {
293 924 : grpc_exec_ctx_enqueue(exec_ctx, &alarm->closure, success);
294 924 : n++;
295 : }
296 191271 : *new_min_deadline = compute_min_deadline(shard);
297 191271 : gpr_mu_unlock(&shard->mu);
298 191271 : return n;
299 : }
300 :
301 5291642 : static int run_some_expired_alarms(grpc_exec_ctx *exec_ctx, gpr_timespec now,
302 : gpr_timespec *next, int success) {
303 5291642 : size_t n = 0;
304 :
305 : /* TODO(ctiller): verify that there are any alarms (atomically) here */
306 :
307 5291642 : if (gpr_mu_trylock(&g_checker_mu)) {
308 5023126 : gpr_mu_lock(&g_mu);
309 :
310 10237523 : while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
311 : gpr_timespec new_min_deadline;
312 :
313 : /* For efficiency, we pop as many available alarms as we can from the
314 : shard. This may violate perfect alarm deadline ordering, but that
315 : shouldn't be a big deal because we don't make ordering guarantees. */
316 191271 : n += pop_alarms(exec_ctx, g_shard_queue[0], now, &new_min_deadline,
317 : success);
318 :
319 : /* An grpc_alarm_init() on the shard could intervene here, adding a new
320 : alarm that is earlier than new_min_deadline. However,
321 : grpc_alarm_init() will block on the master_lock before it can call
322 : set_min_deadline, so this one will complete first and then the AddAlarm
323 : will reduce the min_deadline (perhaps unnecessarily). */
324 191271 : g_shard_queue[0]->min_deadline = new_min_deadline;
325 191271 : note_deadline_change(g_shard_queue[0]);
326 : }
327 :
328 5023126 : if (next) {
329 5020036 : *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline);
330 : }
331 :
332 5023126 : gpr_mu_unlock(&g_mu);
333 5023126 : gpr_mu_unlock(&g_checker_mu);
334 : }
335 :
336 5282559 : return (int)n;
337 : }
338 :
339 5287825 : int grpc_alarm_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
340 : gpr_timespec *next) {
341 5287825 : GPR_ASSERT(now.clock_type == g_clock_type);
342 5289570 : return run_some_expired_alarms(
343 : exec_ctx, now, next,
344 5287825 : gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
345 : }
346 :
347 0 : gpr_timespec grpc_alarm_list_next_timeout(void) {
348 : gpr_timespec out;
349 0 : gpr_mu_lock(&g_mu);
350 0 : out = g_shard_queue[0]->min_deadline;
351 0 : gpr_mu_unlock(&g_mu);
352 0 : return out;
353 : }
|