Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/surface/completion_queue.h"
35 :
36 : #include <stdio.h>
37 : #include <string.h>
38 :
39 : #include "src/core/iomgr/pollset.h"
40 : #include "src/core/support/string.h"
41 : #include "src/core/surface/api_trace.h"
42 : #include "src/core/surface/call.h"
43 : #include "src/core/surface/event_string.h"
44 : #include "src/core/surface/surface_trace.h"
45 : #include <grpc/support/alloc.h>
46 : #include <grpc/support/atm.h>
47 : #include <grpc/support/log.h>
48 :
49 : typedef struct {
50 : grpc_pollset_worker *worker;
51 : void *tag;
52 : } plucker;
53 :
54 : /* Completion queue structure */
55 : struct grpc_completion_queue {
56 : /** completed events */
57 : grpc_cq_completion completed_head;
58 : grpc_cq_completion *completed_tail;
59 : /** Number of pending events (+1 if we're not shutdown) */
60 : gpr_refcount pending_events;
61 : /** Once owning_refs drops to zero, we will destroy the cq */
62 : gpr_refcount owning_refs;
63 : /** the set of low level i/o things that concern this cq */
64 : grpc_pollset pollset;
65 : /** 0 initially, 1 once we've begun shutting down */
66 : int shutdown;
67 : int shutdown_called;
68 : int is_server_cq;
69 : int num_pluckers;
70 : plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
71 : grpc_closure pollset_destroy_done;
72 : };
73 :
74 : static void on_pollset_destroy_done(grpc_exec_ctx *exec_ctx, void *cc,
75 : int success);
76 :
77 322539 : grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
78 322539 : grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
79 322537 : GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved));
80 322537 : GPR_ASSERT(!reserved);
81 322537 : memset(cc, 0, sizeof(*cc));
82 : /* Initial ref is dropped by grpc_completion_queue_shutdown */
83 322537 : gpr_ref_init(&cc->pending_events, 1);
84 : /* One for destroy(), one for pollset_shutdown */
85 322522 : gpr_ref_init(&cc->owning_refs, 2);
86 322508 : grpc_pollset_init(&cc->pollset);
87 322531 : cc->completed_tail = &cc->completed_head;
88 322531 : cc->completed_head.next = (gpr_uintptr)cc->completed_tail;
89 322531 : grpc_closure_init(&cc->pollset_destroy_done, on_pollset_destroy_done, cc);
90 322532 : return cc;
91 : }
92 :
93 : #ifdef GRPC_CQ_REF_COUNT_DEBUG
94 : void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
95 : const char *file, int line) {
96 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p ref %d -> %d %s", cc,
97 : (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason);
98 : #else
99 12466558 : void grpc_cq_internal_ref(grpc_completion_queue *cc) {
100 : #endif
101 12466558 : gpr_ref(&cc->owning_refs);
102 12497185 : }
103 :
104 322541 : static void on_pollset_destroy_done(grpc_exec_ctx *exec_ctx, void *arg,
105 : int success) {
106 322541 : grpc_completion_queue *cc = arg;
107 322541 : GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy");
108 322559 : }
109 :
110 : #ifdef GRPC_CQ_REF_COUNT_DEBUG
111 : void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
112 : const char *file, int line) {
113 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc,
114 : (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason);
115 : #else
116 13115041 : void grpc_cq_internal_unref(grpc_completion_queue *cc) {
117 : #endif
118 13115041 : if (gpr_unref(&cc->owning_refs)) {
119 322554 : GPR_ASSERT(cc->completed_head.next == (gpr_uintptr)&cc->completed_head);
120 322554 : grpc_pollset_destroy(&cc->pollset);
121 322549 : gpr_free(cc);
122 : }
123 13151125 : }
124 :
125 7269651 : void grpc_cq_begin_op(grpc_completion_queue *cc) {
126 : #ifndef NDEBUG
127 7269651 : gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
128 7277050 : GPR_ASSERT(!cc->shutdown_called);
129 7277050 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
130 : #endif
131 7275433 : gpr_ref(&cc->pending_events);
132 7272538 : }
133 :
134 : /* Signal the end of an operation - if this is the last waiting-to-be-queued
135 : event, then enter shutdown mode */
136 : /* Queue a GRPC_OP_COMPLETED operation */
137 7270618 : void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
138 : void *tag, int success,
139 : void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
140 : grpc_cq_completion *storage),
141 : void *done_arg, grpc_cq_completion *storage) {
142 : int shutdown;
143 : int i;
144 : grpc_pollset_worker *pluck_worker;
145 :
146 7270618 : storage->tag = tag;
147 7270618 : storage->done = done;
148 7270618 : storage->done_arg = done_arg;
149 7270618 : storage->next =
150 7270618 : ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
151 :
152 7270618 : gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
153 7275898 : shutdown = gpr_unref(&cc->pending_events);
154 7278961 : if (!shutdown) {
155 14539102 : cc->completed_tail->next =
156 7269551 : ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
157 7269551 : cc->completed_tail = storage;
158 7269551 : pluck_worker = NULL;
159 7305306 : for (i = 0; i < cc->num_pluckers; i++) {
160 350076 : if (cc->pluckers[i].tag == tag) {
161 314321 : pluck_worker = cc->pluckers[i].worker;
162 314321 : break;
163 : }
164 : }
165 7269551 : grpc_pollset_kick(&cc->pollset, pluck_worker);
166 7261292 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
167 : } else {
168 18820 : cc->completed_tail->next =
169 9410 : ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
170 9410 : cc->completed_tail = storage;
171 9410 : GPR_ASSERT(!cc->shutdown);
172 9410 : GPR_ASSERT(cc->shutdown_called);
173 9410 : cc->shutdown = 1;
174 9410 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
175 9410 : grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_destroy_done);
176 : }
177 7276387 : }
178 :
179 9058476 : grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
180 : gpr_timespec deadline, void *reserved) {
181 : grpc_event ret;
182 : grpc_pollset_worker worker;
183 9058476 : int first_loop = 1;
184 : gpr_timespec now;
185 9058476 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
186 :
187 9058476 : GRPC_API_TRACE(
188 : "grpc_completion_queue_next("
189 : "cc=%p, "
190 : "deadline=gpr_timespec { tv_sec: %ld, tv_nsec: %d, clock_type: %d }, "
191 : "reserved=%p)",
192 : 5, (cc, (long)deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
193 : reserved));
194 9078048 : GPR_ASSERT(!reserved);
195 :
196 9078048 : deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
197 :
198 9077229 : GRPC_CQ_INTERNAL_REF(cc, "next");
199 9090328 : gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
200 : for (;;) {
201 13660327 : if (cc->completed_tail != &cc->completed_head) {
202 6666769 : grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
203 6666769 : cc->completed_head.next = c->next & ~(gpr_uintptr)1;
204 6666769 : if (c == cc->completed_tail) {
205 1246621 : cc->completed_tail = &cc->completed_head;
206 : }
207 6666769 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
208 6658244 : ret.type = GRPC_OP_COMPLETE;
209 6658244 : ret.success = c->next & 1u;
210 6658244 : ret.tag = c->tag;
211 6658244 : c->done(&exec_ctx, c->done_arg, c);
212 6661471 : break;
213 : }
214 6993558 : if (cc->shutdown) {
215 162060 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
216 162059 : memset(&ret, 0, sizeof(ret));
217 162059 : ret.type = GRPC_QUEUE_SHUTDOWN;
218 162059 : break;
219 : }
220 6831498 : now = gpr_now(GPR_CLOCK_MONOTONIC);
221 6858523 : if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
222 2267347 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
223 2256442 : memset(&ret, 0, sizeof(ret));
224 2256442 : ret.type = GRPC_QUEUE_TIMEOUT;
225 2256442 : break;
226 : }
227 4591074 : first_loop = 0;
228 4591074 : grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, deadline);
229 4594402 : }
230 9079972 : GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
231 9079972 : GRPC_CQ_INTERNAL_UNREF(cc, "next");
232 9089060 : grpc_exec_ctx_finish(&exec_ctx);
233 9087172 : return ret;
234 : }
235 :
236 753485 : static int add_plucker(grpc_completion_queue *cc, void *tag,
237 : grpc_pollset_worker *worker) {
238 753485 : if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
239 0 : return 0;
240 : }
241 753485 : cc->pluckers[cc->num_pluckers].tag = tag;
242 753485 : cc->pluckers[cc->num_pluckers].worker = worker;
243 753485 : cc->num_pluckers++;
244 753485 : return 1;
245 : }
246 :
247 753907 : static void del_plucker(grpc_completion_queue *cc, void *tag,
248 : grpc_pollset_worker *worker) {
249 : int i;
250 767544 : for (i = 0; i < cc->num_pluckers; i++) {
251 767544 : if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
252 753907 : cc->num_pluckers--;
253 753907 : GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]);
254 1507814 : return;
255 : }
256 : }
257 0 : GPR_UNREACHABLE_CODE(return );
258 : }
259 :
260 711180 : grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
261 : gpr_timespec deadline, void *reserved) {
262 : grpc_event ret;
263 : grpc_cq_completion *c;
264 : grpc_cq_completion *prev;
265 : grpc_pollset_worker worker;
266 : gpr_timespec now;
267 711180 : int first_loop = 1;
268 711180 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
269 :
270 711180 : GRPC_API_TRACE(
271 : "grpc_completion_queue_pluck("
272 : "cc=%p, tag=%p, "
273 : "deadline=gpr_timespec { tv_sec: %ld, tv_nsec: %d, clock_type: %d }, "
274 : "reserved=%p)",
275 : 6, (cc, tag, (long)deadline.tv_sec, deadline.tv_nsec,
276 : (int)deadline.clock_type, reserved));
277 711460 : GPR_ASSERT(!reserved);
278 :
279 711460 : deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
280 :
281 711424 : GRPC_CQ_INTERNAL_REF(cc, "pluck");
282 711455 : gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
283 : for (;;) {
284 1365007 : prev = &cc->completed_head;
285 4177079 : while ((c = (grpc_cq_completion *)(prev->next & ~(gpr_uintptr)1)) !=
286 1406036 : &cc->completed_head) {
287 651479 : if (c->tag == tag) {
288 610450 : prev->next =
289 610450 : (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1);
290 610450 : if (c == cc->completed_tail) {
291 457215 : cc->completed_tail = prev;
292 : }
293 610450 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
294 610448 : ret.type = GRPC_OP_COMPLETE;
295 610448 : ret.success = c->next & 1u;
296 610448 : ret.tag = c->tag;
297 610448 : c->done(&exec_ctx, c->done_arg, c);
298 610465 : goto done;
299 : }
300 41029 : prev = c;
301 : }
302 754557 : if (cc->shutdown) {
303 0 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
304 0 : memset(&ret, 0, sizeof(ret));
305 0 : ret.type = GRPC_QUEUE_SHUTDOWN;
306 0 : break;
307 : }
308 754557 : if (!add_plucker(cc, tag, &worker)) {
309 0 : gpr_log(GPR_DEBUG,
310 : "Too many outstanding grpc_completion_queue_pluck calls: maximum "
311 : "is %d",
312 : GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
313 0 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
314 0 : memset(&ret, 0, sizeof(ret));
315 : /* TODO(ctiller): should we use a different result here */
316 0 : ret.type = GRPC_QUEUE_TIMEOUT;
317 0 : break;
318 : }
319 754542 : now = gpr_now(GPR_CLOCK_MONOTONIC);
320 754574 : if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
321 101025 : del_plucker(cc, tag, &worker);
322 101018 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
323 101025 : memset(&ret, 0, sizeof(ret));
324 101025 : ret.type = GRPC_QUEUE_TIMEOUT;
325 101025 : break;
326 : }
327 653592 : first_loop = 0;
328 653592 : grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, deadline);
329 653323 : del_plucker(cc, tag, &worker);
330 653355 : }
331 : done:
332 711490 : GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
333 711490 : GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
334 711482 : grpc_exec_ctx_finish(&exec_ctx);
335 711482 : return ret;
336 : }
337 :
338 : /* Shutdown simply drops a ref that we reserved at creation time; if we drop
339 : to zero here, then enter shutdown mode and wake up any waiters */
340 484198 : void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
341 484198 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
342 484198 : GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
343 484198 : gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
344 484326 : if (cc->shutdown_called) {
345 161770 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
346 646099 : return;
347 : }
348 322556 : cc->shutdown_called = 1;
349 322556 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
350 :
351 322544 : if (gpr_unref(&cc->pending_events)) {
352 313148 : gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
353 313149 : GPR_ASSERT(!cc->shutdown);
354 313149 : cc->shutdown = 1;
355 313149 : gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
356 313148 : grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_destroy_done);
357 : }
358 322551 : grpc_exec_ctx_finish(&exec_ctx);
359 : }
360 :
361 322510 : void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
362 322510 : GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc));
363 322510 : grpc_completion_queue_shutdown(cc);
364 322531 : GRPC_CQ_INTERNAL_UNREF(cc, "destroy");
365 322551 : }
366 :
367 2708032 : grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
368 2708032 : return &cc->pollset;
369 : }
370 :
371 2305 : void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
372 :
373 1409323 : int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }
|