Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/iomgr/timer.h"
35 :
36 : #include "src/core/iomgr/timer_heap.h"
37 : #include "src/core/iomgr/timer_internal.h"
38 : #include "src/core/iomgr/time_averaged_stats.h"
39 : #include <grpc/support/log.h>
40 : #include <grpc/support/sync.h>
41 : #include <grpc/support/useful.h>
42 :
43 : #define INVALID_HEAP_INDEX 0xffffffffu
44 :
45 : #define LOG2_NUM_SHARDS 5
46 : #define NUM_SHARDS (1 << LOG2_NUM_SHARDS)
47 : #define ADD_DEADLINE_SCALE 0.33
48 : #define MIN_QUEUE_WINDOW_DURATION 0.01
49 : #define MAX_QUEUE_WINDOW_DURATION 1
50 :
51 : typedef struct {
52 : gpr_mu mu;
53 : grpc_time_averaged_stats stats;
54 : /* All and only timers with deadlines <= this will be in the heap. */
55 : gpr_timespec queue_deadline_cap;
56 : gpr_timespec min_deadline;
57 : /* Index in the g_shard_queue */
58 : gpr_uint32 shard_queue_index;
59 : /* This holds all timers with deadlines < queue_deadline_cap. Timers in this
60 : list have the top bit of their deadline set to 0. */
61 : grpc_timer_heap heap;
62 : /* This holds timers whose deadline is >= queue_deadline_cap. */
63 : grpc_timer list;
64 : } shard_type;
65 :
66 : /* Protects g_shard_queue */
67 : static gpr_mu g_mu;
68 : /* Allow only one run_some_expired_timers at once */
69 : static gpr_mu g_checker_mu;
70 : static gpr_clock_type g_clock_type;
71 : static shard_type g_shards[NUM_SHARDS];
72 : /* Protected by g_mu */
73 : static shard_type *g_shard_queue[NUM_SHARDS];
74 :
75 : static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
76 : gpr_timespec *next, int success);
77 :
78 375434 : static gpr_timespec compute_min_deadline(shard_type *shard) {
79 376071 : return grpc_timer_heap_is_empty(&shard->heap)
80 : ? shard->queue_deadline_cap
81 637 : : grpc_timer_heap_top(&shard->heap)->deadline;
82 : }
83 :
84 3459 : void grpc_timer_list_init(gpr_timespec now) {
85 : gpr_uint32 i;
86 :
87 3459 : gpr_mu_init(&g_mu);
88 3459 : gpr_mu_init(&g_checker_mu);
89 3459 : g_clock_type = now.clock_type;
90 :
91 114147 : for (i = 0; i < NUM_SHARDS; i++) {
92 110688 : shard_type *shard = &g_shards[i];
93 110688 : gpr_mu_init(&shard->mu);
94 110688 : grpc_time_averaged_stats_init(&shard->stats, 1.0 / ADD_DEADLINE_SCALE, 0.1,
95 : 0.5);
96 110688 : shard->queue_deadline_cap = now;
97 110688 : shard->shard_queue_index = i;
98 110688 : grpc_timer_heap_init(&shard->heap);
99 110688 : shard->list.next = shard->list.prev = &shard->list;
100 110688 : shard->min_deadline = compute_min_deadline(shard);
101 110688 : g_shard_queue[i] = shard;
102 : }
103 3459 : }
104 :
105 3457 : void grpc_timer_list_shutdown(grpc_exec_ctx *exec_ctx) {
106 : int i;
107 3457 : run_some_expired_timers(exec_ctx, gpr_inf_future(g_clock_type), NULL, 0);
108 114081 : for (i = 0; i < NUM_SHARDS; i++) {
109 110624 : shard_type *shard = &g_shards[i];
110 110624 : gpr_mu_destroy(&shard->mu);
111 110624 : grpc_timer_heap_destroy(&shard->heap);
112 : }
113 3457 : gpr_mu_destroy(&g_mu);
114 3457 : gpr_mu_destroy(&g_checker_mu);
115 3457 : }
116 :
117 : /* This is a cheap, but good enough, pointer hash for sharding the tasks: */
118 2121540 : static size_t shard_idx(const grpc_timer *info) {
119 2121728 : size_t x = (size_t)info;
120 2121728 : return ((x >> 4) ^ (x >> 9) ^ (x >> 14)) & (NUM_SHARDS - 1);
121 : }
122 :
123 1061011 : static double ts_to_dbl(gpr_timespec ts) {
124 1061139 : return (double)ts.tv_sec + 1e-9 * ts.tv_nsec;
125 : }
126 :
127 263785 : static gpr_timespec dbl_to_ts(double d) {
128 : gpr_timespec ts;
129 264050 : ts.tv_sec = (time_t)d;
130 264050 : ts.tv_nsec = (int)(1e9 * (d - (double)ts.tv_sec));
131 263785 : ts.clock_type = GPR_TIMESPAN;
132 263785 : return ts;
133 : }
134 :
135 1060335 : static void list_join(grpc_timer *head, grpc_timer *timer) {
136 1060457 : timer->next = head;
137 1060457 : timer->prev = head->prev;
138 1060457 : timer->next->prev = timer->prev->next = timer;
139 1060335 : }
140 :
141 1060335 : static void list_remove(grpc_timer *timer) {
142 1060415 : timer->next->prev = timer->prev;
143 1060415 : timer->prev->next = timer->next;
144 1060335 : }
145 :
146 4179258 : static void swap_adjacent_shards_in_queue(gpr_uint32 first_shard_queue_index) {
147 : shard_type *temp;
148 4183449 : temp = g_shard_queue[first_shard_queue_index];
149 4183449 : g_shard_queue[first_shard_queue_index] =
150 4183449 : g_shard_queue[first_shard_queue_index + 1];
151 4183449 : g_shard_queue[first_shard_queue_index + 1] = temp;
152 4183449 : g_shard_queue[first_shard_queue_index]->shard_queue_index =
153 : first_shard_queue_index;
154 8362707 : g_shard_queue[first_shard_queue_index + 1]->shard_queue_index =
155 4179258 : first_shard_queue_index + 1;
156 4179258 : }
157 :
158 265400 : static void note_deadline_change(shard_type *shard) {
159 547151 : while (shard->shard_queue_index > 0 &&
160 8439 : gpr_time_cmp(
161 : shard->min_deadline,
162 8439 : g_shard_queue[shard->shard_queue_index - 1]->min_deadline) < 0) {
163 8209 : swap_adjacent_shards_in_queue(shard->shard_queue_index - 1);
164 : }
165 9136988 : while (shard->shard_queue_index < NUM_SHARDS - 1 &&
166 4431245 : gpr_time_cmp(
167 : shard->min_deadline,
168 4431245 : g_shard_queue[shard->shard_queue_index + 1]->min_deadline) > 0) {
169 4175240 : swap_adjacent_shards_in_queue(shard->shard_queue_index);
170 : }
171 265400 : }
172 :
173 1061139 : void grpc_timer_init(grpc_exec_ctx *exec_ctx, grpc_timer *timer,
174 : gpr_timespec deadline, grpc_iomgr_cb_func timer_cb,
175 : void *timer_cb_arg, gpr_timespec now) {
176 1061011 : int is_first_timer = 0;
177 1061139 : shard_type *shard = &g_shards[shard_idx(timer)];
178 1061139 : GPR_ASSERT(deadline.clock_type == g_clock_type);
179 1061139 : GPR_ASSERT(now.clock_type == g_clock_type);
180 1061139 : grpc_closure_init(&timer->closure, timer_cb, timer_cb_arg);
181 1061139 : timer->deadline = deadline;
182 1061139 : timer->triggered = 0;
183 :
184 : /* TODO(ctiller): check deadline expired */
185 :
186 1061139 : gpr_mu_lock(&shard->mu);
187 1061267 : grpc_time_averaged_stats_add_sample(&shard->stats,
188 : ts_to_dbl(gpr_time_sub(deadline, now)));
189 1061139 : if (gpr_time_cmp(deadline, shard->queue_deadline_cap) < 0) {
190 682 : is_first_timer = grpc_timer_heap_add(&shard->heap, timer);
191 : } else {
192 1060457 : timer->heap_index = INVALID_HEAP_INDEX;
193 1060457 : list_join(&shard->list, timer);
194 : }
195 1061139 : gpr_mu_unlock(&shard->mu);
196 :
197 : /* Deadline may have decreased, we need to adjust the master queue. Note
198 : that there is a potential racy unlocked region here. There could be a
199 : reordering of multiple grpc_timer_init calls, at this point, but the < test
200 : below should ensure that we err on the side of caution. There could
201 : also be a race with grpc_timer_check, which might beat us to the lock. In
202 : that case, it is possible that the timer that we added will have already
203 : run by the time we hold the lock, but that too is a safe error.
204 : Finally, it's possible that the grpc_timer_check that intervened failed to
205 : trigger the new timer because the min_deadline hadn't yet been reduced.
206 : In that case, the timer will simply have to wait for the next
207 : grpc_timer_check. */
208 1061139 : if (is_first_timer) {
209 677 : gpr_mu_lock(&g_mu);
210 677 : if (gpr_time_cmp(deadline, shard->min_deadline) < 0) {
211 654 : gpr_timespec old_min_deadline = g_shard_queue[0]->min_deadline;
212 654 : shard->min_deadline = deadline;
213 654 : note_deadline_change(shard);
214 1078 : if (shard->shard_queue_index == 0 &&
215 424 : gpr_time_cmp(deadline, old_min_deadline) < 0) {
216 424 : grpc_kick_poller();
217 : }
218 : }
219 677 : gpr_mu_unlock(&g_mu);
220 : }
221 1061139 : }
222 :
223 1060589 : void grpc_timer_cancel(grpc_exec_ctx *exec_ctx, grpc_timer *timer) {
224 1060529 : shard_type *shard = &g_shards[shard_idx(timer)];
225 1060589 : gpr_mu_lock(&shard->mu);
226 1060589 : if (!timer->triggered) {
227 1059582 : grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, 0);
228 1059582 : timer->triggered = 1;
229 1059582 : if (timer->heap_index == INVALID_HEAP_INDEX) {
230 1059372 : list_remove(timer);
231 : } else {
232 152 : grpc_timer_heap_remove(&shard->heap, timer);
233 : }
234 : }
235 1060589 : gpr_mu_unlock(&shard->mu);
236 1060589 : }
237 :
238 : /* This is called when the queue is empty and "now" has reached the
239 : queue_deadline_cap. We compute a new queue deadline and then scan the map
240 : for timers that fall at or under it. Returns true if the queue is no
241 : longer empty.
242 : REQUIRES: shard->mu locked */
243 264050 : static int refill_queue(shard_type *shard, gpr_timespec now) {
244 : /* Compute the new queue window width and bound by the limits: */
245 264050 : double computed_deadline_delta =
246 264050 : grpc_time_averaged_stats_update_average(&shard->stats) *
247 : ADD_DEADLINE_SCALE;
248 263785 : double deadline_delta =
249 264050 : GPR_CLAMP(computed_deadline_delta, MIN_QUEUE_WINDOW_DURATION,
250 : MAX_QUEUE_WINDOW_DURATION);
251 : grpc_timer *timer, *next;
252 :
253 : /* Compute the new cap and put all timers under it into the queue: */
254 264050 : shard->queue_deadline_cap = gpr_time_add(
255 : gpr_time_max(now, shard->queue_deadline_cap), dbl_to_ts(deadline_delta));
256 268376 : for (timer = shard->list.next; timer != &shard->list; timer = next) {
257 4061 : next = timer->next;
258 :
259 4061 : if (gpr_time_cmp(timer->deadline, shard->queue_deadline_cap) < 0) {
260 963 : list_remove(timer);
261 985 : grpc_timer_heap_add(&shard->heap, timer);
262 : }
263 : }
264 264050 : return !grpc_timer_heap_is_empty(&shard->heap);
265 : }
266 :
267 : /* This pops the next non-cancelled timer with deadline <= now from the queue,
268 : or returns NULL if there isn't one.
269 : REQUIRES: shard->mu locked */
270 266261 : static grpc_timer *pop_one(shard_type *shard, gpr_timespec now) {
271 : grpc_timer *timer;
272 : for (;;) {
273 266261 : if (grpc_timer_heap_is_empty(&shard->heap)) {
274 265072 : if (gpr_time_cmp(now, shard->queue_deadline_cap) < 0) return NULL;
275 264050 : if (!refill_queue(shard, now)) return NULL;
276 : }
277 2152 : timer = grpc_timer_heap_top(&shard->heap);
278 2152 : if (gpr_time_cmp(timer->deadline, now) > 0) return NULL;
279 1515 : timer->triggered = 1;
280 1515 : grpc_timer_heap_pop(&shard->heap);
281 1515 : return timer;
282 : }
283 : }
284 :
285 : /* REQUIRES: shard->mu unlocked */
286 264746 : static size_t pop_timers(grpc_exec_ctx *exec_ctx, shard_type *shard,
287 : gpr_timespec now, gpr_timespec *new_min_deadline,
288 : int success) {
289 264455 : size_t n = 0;
290 : grpc_timer *timer;
291 264746 : gpr_mu_lock(&shard->mu);
292 531007 : while ((timer = pop_one(shard, now))) {
293 1515 : grpc_exec_ctx_enqueue(exec_ctx, &timer->closure, success);
294 1515 : n++;
295 : }
296 264746 : *new_min_deadline = compute_min_deadline(shard);
297 264746 : gpr_mu_unlock(&shard->mu);
298 264746 : return n;
299 : }
300 :
301 6940654 : static int run_some_expired_timers(grpc_exec_ctx *exec_ctx, gpr_timespec now,
302 : gpr_timespec *next, int success) {
303 6910738 : size_t n = 0;
304 :
305 : /* TODO(ctiller): verify that there are any timers (atomically) here */
306 :
307 6940654 : if (gpr_mu_trylock(&g_checker_mu)) {
308 6933307 : gpr_mu_lock(&g_mu);
309 :
310 14131360 : while (gpr_time_cmp(g_shard_queue[0]->min_deadline, now) < 0) {
311 : gpr_timespec new_min_deadline;
312 :
313 : /* For efficiency, we pop as many available timers as we can from the
314 : shard. This may violate perfect timer deadline ordering, but that
315 : shouldn't be a big deal because we don't make ordering guarantees. */
316 264746 : n += pop_timers(exec_ctx, g_shard_queue[0], now, &new_min_deadline,
317 : success);
318 :
319 : /* An grpc_timer_init() on the shard could intervene here, adding a new
320 : timer that is earlier than new_min_deadline. However,
321 : grpc_timer_init() will block on the master_lock before it can call
322 : set_min_deadline, so this one will complete first and then the Addtimer
323 : will reduce the min_deadline (perhaps unnecessarily). */
324 264746 : g_shard_queue[0]->min_deadline = new_min_deadline;
325 264746 : note_deadline_change(g_shard_queue[0]);
326 : }
327 :
328 6933307 : if (next) {
329 6929471 : *next = gpr_time_min(*next, g_shard_queue[0]->min_deadline);
330 : }
331 :
332 6933307 : gpr_mu_unlock(&g_mu);
333 6933307 : gpr_mu_unlock(&g_checker_mu);
334 : }
335 :
336 6940485 : return (int)n;
337 : }
338 :
339 6937013 : int grpc_timer_check(grpc_exec_ctx *exec_ctx, gpr_timespec now,
340 : gpr_timespec *next) {
341 6937013 : GPR_ASSERT(now.clock_type == g_clock_type);
342 6937295 : return run_some_expired_timers(
343 : exec_ctx, now, next,
344 6937013 : gpr_time_cmp(now, gpr_inf_future(now.clock_type)) != 0);
345 : }
346 :
347 0 : gpr_timespec grpc_timer_list_next_timeout(void) {
348 : gpr_timespec out;
349 0 : gpr_mu_lock(&g_mu);
350 0 : out = g_shard_queue[0]->min_deadline;
351 0 : gpr_mu_unlock(&g_mu);
352 0 : return out;
353 : }
|