Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/surface/completion_queue.h"
35 :
36 : #include <stdio.h>
37 : #include <string.h>
38 :
39 : #include "src/core/iomgr/timer.h"
40 : #include "src/core/iomgr/pollset.h"
41 : #include "src/core/support/string.h"
42 : #include "src/core/surface/api_trace.h"
43 : #include "src/core/surface/call.h"
44 : #include "src/core/surface/event_string.h"
45 : #include "src/core/surface/surface_trace.h"
46 : #include "src/core/profiling/timers.h"
47 : #include <grpc/support/alloc.h>
48 : #include <grpc/support/atm.h>
49 : #include <grpc/support/log.h>
50 : #include <grpc/support/time.h>
51 :
52 : typedef struct {
53 : grpc_pollset_worker *worker;
54 : void *tag;
55 : } plucker;
56 :
57 : /* Completion queue structure */
58 : struct grpc_completion_queue {
59 : /** completed events */
60 : grpc_cq_completion completed_head;
61 : grpc_cq_completion *completed_tail;
62 : /** Number of pending events (+1 if we're not shutdown) */
63 : gpr_refcount pending_events;
64 : /** Once owning_refs drops to zero, we will destroy the cq */
65 : gpr_refcount owning_refs;
66 : /** the set of low level i/o things that concern this cq */
67 : grpc_pollset pollset;
68 : /** 0 initially, 1 once we've begun shutting down */
69 : int shutdown;
70 : int shutdown_called;
71 : int is_server_cq;
72 : int num_pluckers;
73 : plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
74 : grpc_closure pollset_shutdown_done;
75 :
76 : grpc_completion_queue *next_free;
77 : };
78 :
79 : static gpr_mu g_freelist_mu;
80 : grpc_completion_queue *g_freelist;
81 :
82 : static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *cc,
83 : int success);
84 :
85 3452 : void grpc_cq_global_init(void) { gpr_mu_init(&g_freelist_mu); }
86 :
87 3450 : void grpc_cq_global_shutdown(void) {
88 3450 : gpr_mu_destroy(&g_freelist_mu);
89 9950 : while (g_freelist) {
90 3050 : grpc_completion_queue *next = g_freelist->next_free;
91 3050 : grpc_pollset_destroy(&g_freelist->pollset);
92 3050 : gpr_free(g_freelist);
93 3050 : g_freelist = next;
94 : }
95 3450 : }
96 :
97 : struct grpc_cq_alarm {
98 : grpc_timer alarm;
99 : grpc_cq_completion completion;
100 : /** completion queue where events about this alarm will be posted */
101 : grpc_completion_queue *cq;
102 : /** user supplied tag */
103 : void *tag;
104 : };
105 :
106 423738 : grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
107 : grpc_completion_queue *cc;
108 423738 : GPR_ASSERT(!reserved);
109 :
110 : GPR_TIMER_BEGIN("grpc_completion_queue_create", 0);
111 :
112 423738 : GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved));
113 :
114 423738 : gpr_mu_lock(&g_freelist_mu);
115 423848 : if (g_freelist == NULL) {
116 3097 : gpr_mu_unlock(&g_freelist_mu);
117 :
118 3097 : cc = gpr_malloc(sizeof(grpc_completion_queue));
119 3097 : grpc_pollset_init(&cc->pollset);
120 : } else {
121 420751 : cc = g_freelist;
122 420751 : g_freelist = g_freelist->next_free;
123 420751 : gpr_mu_unlock(&g_freelist_mu);
124 : /* pollset already initialized */
125 : }
126 :
127 : /* Initial ref is dropped by grpc_completion_queue_shutdown */
128 423848 : gpr_ref_init(&cc->pending_events, 1);
129 : /* One for destroy(), one for pollset_shutdown */
130 423848 : gpr_ref_init(&cc->owning_refs, 2);
131 423848 : cc->completed_tail = &cc->completed_head;
132 423848 : cc->completed_head.next = (gpr_uintptr)cc->completed_tail;
133 423848 : cc->shutdown = 0;
134 423848 : cc->shutdown_called = 0;
135 423848 : cc->is_server_cq = 0;
136 423848 : cc->num_pluckers = 0;
137 423848 : grpc_closure_init(&cc->pollset_shutdown_done, on_pollset_shutdown_done, cc);
138 :
139 : GPR_TIMER_END("grpc_completion_queue_create", 0);
140 :
141 423847 : return cc;
142 : }
143 :
144 : #ifdef GRPC_CQ_REF_COUNT_DEBUG
145 : void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
146 : const char *file, int line) {
147 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc,
148 : (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason);
149 : #else
150 15844648 : void grpc_cq_internal_ref(grpc_completion_queue *cc) {
151 : #endif
152 15844648 : gpr_ref(&cc->owning_refs);
153 15872959 : }
154 :
155 423812 : static void on_pollset_shutdown_done(grpc_exec_ctx *exec_ctx, void *arg,
156 : int success) {
157 423803 : grpc_completion_queue *cc = arg;
158 423812 : GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy");
159 423815 : }
160 :
161 : #ifdef GRPC_CQ_REF_COUNT_DEBUG
162 : void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
163 : const char *file, int line) {
164 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc,
165 : (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason);
166 : #else
167 16703237 : void grpc_cq_internal_unref(grpc_completion_queue *cc) {
168 : #endif
169 16703237 : if (gpr_unref(&cc->owning_refs)) {
170 423808 : GPR_ASSERT(cc->completed_head.next == (gpr_uintptr)&cc->completed_head);
171 423808 : grpc_pollset_reset(&cc->pollset);
172 423801 : gpr_mu_lock(&g_freelist_mu);
173 423812 : cc->next_free = g_freelist;
174 423812 : g_freelist = cc;
175 423812 : gpr_mu_unlock(&g_freelist_mu);
176 : }
177 16731189 : }
178 :
179 10999932 : void grpc_cq_begin_op(grpc_completion_queue *cc) {
180 : #ifndef NDEBUG
181 10999932 : gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
182 11031020 : GPR_ASSERT(!cc->shutdown_called);
183 11031020 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
184 : #endif
185 11030558 : gpr_ref(&cc->pending_events);
186 11030768 : }
187 :
188 : /* Signal the end of an operation - if this is the last waiting-to-be-queued
189 : event, then enter shutdown mode */
190 : /* Queue a GRPC_OP_COMPLETED operation */
191 11024739 : void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
192 : void *tag, int success,
193 : void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
194 : grpc_cq_completion *storage),
195 : void *done_arg, grpc_cq_completion *storage) {
196 : int shutdown;
197 : int i;
198 : grpc_pollset_worker *pluck_worker;
199 :
200 : GPR_TIMER_BEGIN("grpc_cq_end_op", 0);
201 :
202 11024739 : storage->tag = tag;
203 11024739 : storage->done = done;
204 11024739 : storage->done_arg = done_arg;
205 11024739 : storage->next =
206 11024739 : ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
207 :
208 11024739 : gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
209 11031214 : shutdown = gpr_unref(&cc->pending_events);
210 11032646 : if (!shutdown) {
211 22056817 : cc->completed_tail->next =
212 11028826 : ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
213 11028826 : cc->completed_tail = storage;
214 11027991 : pluck_worker = NULL;
215 11061204 : for (i = 0; i < cc->num_pluckers; i++) {
216 452693 : if (cc->pluckers[i].tag == tag) {
217 420315 : pluck_worker = cc->pluckers[i].worker;
218 420315 : break;
219 : }
220 : }
221 11028826 : grpc_pollset_kick(&cc->pollset, pluck_worker);
222 11016812 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
223 : } else {
224 7640 : cc->completed_tail->next =
225 3820 : ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
226 3820 : cc->completed_tail = storage;
227 3820 : GPR_ASSERT(!cc->shutdown);
228 3820 : GPR_ASSERT(cc->shutdown_called);
229 3820 : cc->shutdown = 1;
230 3820 : grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_shutdown_done);
231 3820 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
232 : }
233 :
234 : GPR_TIMER_END("grpc_cq_end_op", 0);
235 11028644 : }
236 :
237 10466298 : grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
238 : gpr_timespec deadline, void *reserved) {
239 : grpc_event ret;
240 : grpc_pollset_worker worker;
241 10465511 : int first_loop = 1;
242 : gpr_timespec now;
243 10466298 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
244 :
245 : GPR_TIMER_BEGIN("grpc_completion_queue_next", 0);
246 :
247 10466298 : GRPC_API_TRACE(
248 : "grpc_completion_queue_next("
249 : "cc=%p, "
250 : "deadline=gpr_timespec { tv_sec: %ld, tv_nsec: %d, clock_type: %d }, "
251 : "reserved=%p)",
252 : 5, (cc, (long)deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
253 : reserved));
254 10478315 : GPR_ASSERT(!reserved);
255 :
256 10478315 : deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
257 :
258 10483895 : GRPC_CQ_INTERNAL_REF(cc, "next");
259 10485765 : gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
260 : for (;;) {
261 16572668 : if (cc->completed_tail != &cc->completed_head) {
262 10179139 : grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
263 10179139 : cc->completed_head.next = c->next & ~(gpr_uintptr)1;
264 10179139 : if (c == cc->completed_tail) {
265 3229711 : cc->completed_tail = &cc->completed_head;
266 : }
267 10179139 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
268 10181708 : ret.type = GRPC_OP_COMPLETE;
269 10181708 : ret.success = c->next & 1u;
270 10181708 : ret.tag = c->tag;
271 10181708 : c->done(&exec_ctx, c->done_arg, c);
272 10175170 : break;
273 : }
274 6393529 : if (cc->shutdown) {
275 213364 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
276 213364 : memset(&ret, 0, sizeof(ret));
277 213364 : ret.type = GRPC_QUEUE_SHUTDOWN;
278 213364 : break;
279 : }
280 6180165 : now = gpr_now(GPR_CLOCK_MONOTONIC);
281 6180482 : if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
282 95624 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
283 95624 : memset(&ret, 0, sizeof(ret));
284 95624 : ret.type = GRPC_QUEUE_TIMEOUT;
285 95624 : break;
286 : }
287 6054925 : first_loop = 0;
288 6084842 : grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, deadline);
289 6084554 : }
290 10484158 : GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
291 10484158 : GRPC_CQ_INTERNAL_UNREF(cc, "next");
292 10487549 : grpc_exec_ctx_finish(&exec_ctx);
293 :
294 : GPR_TIMER_END("grpc_completion_queue_next", 0);
295 :
296 10478812 : return ret;
297 : }
298 :
299 900212 : static int add_plucker(grpc_completion_queue *cc, void *tag,
300 : grpc_pollset_worker *worker) {
301 900212 : if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
302 1 : return 0;
303 : }
304 900211 : cc->pluckers[cc->num_pluckers].tag = tag;
305 900211 : cc->pluckers[cc->num_pluckers].worker = worker;
306 900211 : cc->num_pluckers++;
307 900211 : return 1;
308 : }
309 :
310 900134 : static void del_plucker(grpc_completion_queue *cc, void *tag,
311 : grpc_pollset_worker *worker) {
312 : int i;
313 903497 : for (i = 0; i < cc->num_pluckers; i++) {
314 903505 : if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
315 900142 : cc->num_pluckers--;
316 900142 : GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]);
317 1800276 : return;
318 : }
319 : }
320 0 : GPR_UNREACHABLE_CODE(return );
321 : }
322 :
323 956245 : grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
324 : gpr_timespec deadline, void *reserved) {
325 : grpc_event ret;
326 : grpc_cq_completion *c;
327 : grpc_cq_completion *prev;
328 : grpc_pollset_worker worker;
329 : gpr_timespec now;
330 956197 : int first_loop = 1;
331 956245 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
332 :
333 : GPR_TIMER_BEGIN("grpc_completion_queue_pluck", 0);
334 :
335 956245 : GRPC_API_TRACE(
336 : "grpc_completion_queue_pluck("
337 : "cc=%p, tag=%p, "
338 : "deadline=gpr_timespec { tv_sec: %ld, tv_nsec: %d, clock_type: %d }, "
339 : "reserved=%p)",
340 : 6, (cc, tag, (long)deadline.tv_sec, deadline.tv_nsec,
341 : (int)deadline.clock_type, reserved));
342 956243 : GPR_ASSERT(!reserved);
343 :
344 956243 : deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
345 :
346 956225 : GRPC_CQ_INTERNAL_REF(cc, "pluck");
347 956267 : gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
348 : for (;;) {
349 1750557 : prev = &cc->completed_head;
350 5299519 : while ((c = (grpc_cq_completion *)(prev->next & ~(gpr_uintptr)1)) !=
351 1774457 : &cc->completed_head) {
352 874593 : if (c->tag == tag) {
353 850645 : prev->next =
354 850645 : (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1);
355 850645 : if (c == cc->completed_tail) {
356 644852 : cc->completed_tail = prev;
357 : }
358 850645 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
359 850673 : ret.type = GRPC_OP_COMPLETE;
360 850673 : ret.success = c->next & 1u;
361 850673 : ret.tag = c->tag;
362 850673 : c->done(&exec_ctx, c->done_arg, c);
363 850469 : goto done;
364 : }
365 23948 : prev = c;
366 : }
367 899912 : if (cc->shutdown) {
368 1 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
369 1 : memset(&ret, 0, sizeof(ret));
370 1 : ret.type = GRPC_QUEUE_SHUTDOWN;
371 1 : break;
372 : }
373 899911 : if (!add_plucker(cc, tag, &worker)) {
374 1 : gpr_log(GPR_DEBUG,
375 : "Too many outstanding grpc_completion_queue_pluck calls: maximum "
376 : "is %d",
377 : GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
378 1 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
379 1 : memset(&ret, 0, sizeof(ret));
380 : /* TODO(ctiller): should we use a different result here */
381 1 : ret.type = GRPC_QUEUE_TIMEOUT;
382 1 : break;
383 : }
384 900217 : now = gpr_now(GPR_CLOCK_MONOTONIC);
385 900239 : if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
386 105615 : del_plucker(cc, tag, &worker);
387 105615 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
388 105616 : memset(&ret, 0, sizeof(ret));
389 105616 : ret.type = GRPC_QUEUE_TIMEOUT;
390 105616 : break;
391 : }
392 794624 : first_loop = 0;
393 794624 : grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, deadline);
394 794653 : del_plucker(cc, tag, &worker);
395 794650 : }
396 : done:
397 956087 : GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
398 956087 : GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
399 956293 : grpc_exec_ctx_finish(&exec_ctx);
400 :
401 : GPR_TIMER_END("grpc_completion_queue_pluck", 0);
402 :
403 956292 : return ret;
404 : }
405 :
406 : /* Shutdown simply drops a ref that we reserved at creation time; if we drop
407 : to zero here, then enter shutdown mode and wake up any waiters */
408 636507 : void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
409 636507 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
410 : GPR_TIMER_BEGIN("grpc_completion_queue_shutdown", 0);
411 636507 : GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
412 636507 : gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
413 636712 : if (cc->shutdown_called) {
414 212910 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
415 : GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
416 849631 : return;
417 : }
418 423802 : cc->shutdown_called = 1;
419 423802 : if (gpr_unref(&cc->pending_events)) {
420 419989 : GPR_ASSERT(!cc->shutdown);
421 419989 : cc->shutdown = 1;
422 419989 : grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_shutdown_done);
423 : }
424 423815 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
425 423815 : grpc_exec_ctx_finish(&exec_ctx);
426 : GPR_TIMER_END("grpc_completion_queue_shutdown", 0);
427 : }
428 :
429 423786 : void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
430 423786 : GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc));
431 : GPR_TIMER_BEGIN("grpc_completion_queue_destroy", 0);
432 423786 : grpc_completion_queue_shutdown(cc);
433 423773 : GRPC_CQ_INTERNAL_UNREF(cc, "destroy");
434 : GPR_TIMER_END("grpc_completion_queue_destroy", 0);
435 423807 : }
436 :
437 4448967 : grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
438 4448967 : return &cc->pollset;
439 : }
440 :
441 3596 : void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
442 :
443 2243883 : int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
|