LCOV - code coverage report
Current view: top level - src/core/surface - completion_queue.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 173 184 94.0 %
Date: 2015-10-10 Functions: 15 15 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/surface/completion_queue.h"
      35             : 
      36             : #include <stdio.h>
      37             : #include <string.h>
      38             : 
      39             : #include "src/core/iomgr/pollset.h"
      40             : #include "src/core/support/string.h"
      41             : #include "src/core/surface/api_trace.h"
      42             : #include "src/core/surface/call.h"
      43             : #include "src/core/surface/event_string.h"
      44             : #include "src/core/surface/surface_trace.h"
      45             : #include <grpc/support/alloc.h>
      46             : #include <grpc/support/atm.h>
      47             : #include <grpc/support/log.h>
      48             : 
      49             : typedef struct {
      50             :   grpc_pollset_worker *worker;
      51             :   void *tag;
      52             : } plucker;
      53             : 
      54             : /* Completion queue structure */
      55             : struct grpc_completion_queue {
      56             :   /** completed events */
      57             :   grpc_cq_completion completed_head;
      58             :   grpc_cq_completion *completed_tail;
      59             :   /** Number of pending events (+1 if we're not shutdown) */
      60             :   gpr_refcount pending_events;
      61             :   /** Once owning_refs drops to zero, we will destroy the cq */
      62             :   gpr_refcount owning_refs;
      63             :   /** the set of low level i/o things that concern this cq */
      64             :   grpc_pollset pollset;
      65             :   /** 0 initially, 1 once we've begun shutting down */
      66             :   int shutdown;
      67             :   int shutdown_called;
      68             :   int is_server_cq;
      69             :   int num_pluckers;
      70             :   plucker pluckers[GRPC_MAX_COMPLETION_QUEUE_PLUCKERS];
      71             :   grpc_closure pollset_destroy_done;
      72             : };
      73             : 
      74             : static void on_pollset_destroy_done(grpc_exec_ctx *exec_ctx, void *cc,
      75             :                                     int success);
      76             : 
      77      322539 : grpc_completion_queue *grpc_completion_queue_create(void *reserved) {
      78      322539 :   grpc_completion_queue *cc = gpr_malloc(sizeof(grpc_completion_queue));
      79      322537 :   GRPC_API_TRACE("grpc_completion_queue_create(reserved=%p)", 1, (reserved));
      80      322537 :   GPR_ASSERT(!reserved);
      81      322537 :   memset(cc, 0, sizeof(*cc));
      82             :   /* Initial ref is dropped by grpc_completion_queue_shutdown */
      83      322537 :   gpr_ref_init(&cc->pending_events, 1);
      84             :   /* One for destroy(), one for pollset_shutdown */
      85      322522 :   gpr_ref_init(&cc->owning_refs, 2);
      86      322508 :   grpc_pollset_init(&cc->pollset);
      87      322531 :   cc->completed_tail = &cc->completed_head;
      88      322531 :   cc->completed_head.next = (gpr_uintptr)cc->completed_tail;
      89      322531 :   grpc_closure_init(&cc->pollset_destroy_done, on_pollset_destroy_done, cc);
      90      322532 :   return cc;
      91             : }
      92             : 
      93             : #ifdef GRPC_CQ_REF_COUNT_DEBUG
      94             : void grpc_cq_internal_ref(grpc_completion_queue *cc, const char *reason,
      95             :                           const char *file, int line) {
      96             :   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p   ref %d -> %d %s", cc,
      97             :           (int)cc->owning_refs.count, (int)cc->owning_refs.count + 1, reason);
      98             : #else
      99    12466558 : void grpc_cq_internal_ref(grpc_completion_queue *cc) {
     100             : #endif
     101    12466558 :   gpr_ref(&cc->owning_refs);
     102    12497185 : }
     103             : 
     104      322541 : static void on_pollset_destroy_done(grpc_exec_ctx *exec_ctx, void *arg,
     105             :                                     int success) {
     106      322541 :   grpc_completion_queue *cc = arg;
     107      322541 :   GRPC_CQ_INTERNAL_UNREF(cc, "pollset_destroy");
     108      322559 : }
     109             : 
     110             : #ifdef GRPC_CQ_REF_COUNT_DEBUG
     111             : void grpc_cq_internal_unref(grpc_completion_queue *cc, const char *reason,
     112             :                             const char *file, int line) {
     113             :   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CQ:%p unref %d -> %d %s", cc,
     114             :           (int)cc->owning_refs.count, (int)cc->owning_refs.count - 1, reason);
     115             : #else
     116    13115041 : void grpc_cq_internal_unref(grpc_completion_queue *cc) {
     117             : #endif
     118    13115041 :   if (gpr_unref(&cc->owning_refs)) {
     119      322554 :     GPR_ASSERT(cc->completed_head.next == (gpr_uintptr)&cc->completed_head);
     120      322554 :     grpc_pollset_destroy(&cc->pollset);
     121      322549 :     gpr_free(cc);
     122             :   }
     123    13151125 : }
     124             : 
     125     7269651 : void grpc_cq_begin_op(grpc_completion_queue *cc) {
     126             : #ifndef NDEBUG
     127     7269651 :   gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
     128     7277050 :   GPR_ASSERT(!cc->shutdown_called);
     129     7277050 :   gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     130             : #endif
     131     7275433 :   gpr_ref(&cc->pending_events);
     132     7272538 : }
     133             : 
     134             : /* Signal the end of an operation - if this is the last waiting-to-be-queued
     135             :    event, then enter shutdown mode */
     136             : /* Queue a GRPC_OP_COMPLETED operation */
     137     7270618 : void grpc_cq_end_op(grpc_exec_ctx *exec_ctx, grpc_completion_queue *cc,
     138             :                     void *tag, int success,
     139             :                     void (*done)(grpc_exec_ctx *exec_ctx, void *done_arg,
     140             :                                  grpc_cq_completion *storage),
     141             :                     void *done_arg, grpc_cq_completion *storage) {
     142             :   int shutdown;
     143             :   int i;
     144             :   grpc_pollset_worker *pluck_worker;
     145             : 
     146     7270618 :   storage->tag = tag;
     147     7270618 :   storage->done = done;
     148     7270618 :   storage->done_arg = done_arg;
     149     7270618 :   storage->next =
     150     7270618 :       ((gpr_uintptr)&cc->completed_head) | ((gpr_uintptr)(success != 0));
     151             : 
     152     7270618 :   gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
     153     7275898 :   shutdown = gpr_unref(&cc->pending_events);
     154     7278961 :   if (!shutdown) {
     155    14539102 :     cc->completed_tail->next =
     156     7269551 :         ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
     157     7269551 :     cc->completed_tail = storage;
     158     7269551 :     pluck_worker = NULL;
     159     7305306 :     for (i = 0; i < cc->num_pluckers; i++) {
     160      350076 :       if (cc->pluckers[i].tag == tag) {
     161      314321 :         pluck_worker = cc->pluckers[i].worker;
     162      314321 :         break;
     163             :       }
     164             :     }
     165     7269551 :     grpc_pollset_kick(&cc->pollset, pluck_worker);
     166     7261292 :     gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     167             :   } else {
     168       18820 :     cc->completed_tail->next =
     169        9410 :         ((gpr_uintptr)storage) | (1u & (gpr_uintptr)cc->completed_tail->next);
     170        9410 :     cc->completed_tail = storage;
     171        9410 :     GPR_ASSERT(!cc->shutdown);
     172        9410 :     GPR_ASSERT(cc->shutdown_called);
     173        9410 :     cc->shutdown = 1;
     174        9410 :     gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     175        9410 :     grpc_pollset_shutdown(exec_ctx, &cc->pollset, &cc->pollset_destroy_done);
     176             :   }
     177     7276387 : }
     178             : 
     179     9058476 : grpc_event grpc_completion_queue_next(grpc_completion_queue *cc,
     180             :                                       gpr_timespec deadline, void *reserved) {
     181             :   grpc_event ret;
     182             :   grpc_pollset_worker worker;
     183     9058476 :   int first_loop = 1;
     184             :   gpr_timespec now;
     185     9058476 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     186             : 
     187     9058476 :   GRPC_API_TRACE(
     188             :       "grpc_completion_queue_next("
     189             :       "cc=%p, "
     190             :       "deadline=gpr_timespec { tv_sec: %ld, tv_nsec: %d, clock_type: %d }, "
     191             :       "reserved=%p)",
     192             :       5, (cc, (long)deadline.tv_sec, deadline.tv_nsec, (int)deadline.clock_type,
     193             :           reserved));
     194     9078048 :   GPR_ASSERT(!reserved);
     195             : 
     196     9078048 :   deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
     197             : 
     198     9077229 :   GRPC_CQ_INTERNAL_REF(cc, "next");
     199     9090328 :   gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
     200             :   for (;;) {
     201    13660327 :     if (cc->completed_tail != &cc->completed_head) {
     202     6666769 :       grpc_cq_completion *c = (grpc_cq_completion *)cc->completed_head.next;
     203     6666769 :       cc->completed_head.next = c->next & ~(gpr_uintptr)1;
     204     6666769 :       if (c == cc->completed_tail) {
     205     1246621 :         cc->completed_tail = &cc->completed_head;
     206             :       }
     207     6666769 :       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     208     6658244 :       ret.type = GRPC_OP_COMPLETE;
     209     6658244 :       ret.success = c->next & 1u;
     210     6658244 :       ret.tag = c->tag;
     211     6658244 :       c->done(&exec_ctx, c->done_arg, c);
     212     6661471 :       break;
     213             :     }
     214     6993558 :     if (cc->shutdown) {
     215      162060 :       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     216      162059 :       memset(&ret, 0, sizeof(ret));
     217      162059 :       ret.type = GRPC_QUEUE_SHUTDOWN;
     218      162059 :       break;
     219             :     }
     220     6831498 :     now = gpr_now(GPR_CLOCK_MONOTONIC);
     221     6858523 :     if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
     222     2267347 :       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     223     2256442 :       memset(&ret, 0, sizeof(ret));
     224     2256442 :       ret.type = GRPC_QUEUE_TIMEOUT;
     225     2256442 :       break;
     226             :     }
     227     4591074 :     first_loop = 0;
     228     4591074 :     grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, deadline);
     229     4594402 :   }
     230     9079972 :   GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
     231     9079972 :   GRPC_CQ_INTERNAL_UNREF(cc, "next");
     232     9089060 :   grpc_exec_ctx_finish(&exec_ctx);
     233     9087172 :   return ret;
     234             : }
     235             : 
     236      753485 : static int add_plucker(grpc_completion_queue *cc, void *tag,
     237             :                        grpc_pollset_worker *worker) {
     238      753485 :   if (cc->num_pluckers == GRPC_MAX_COMPLETION_QUEUE_PLUCKERS) {
     239           0 :     return 0;
     240             :   }
     241      753485 :   cc->pluckers[cc->num_pluckers].tag = tag;
     242      753485 :   cc->pluckers[cc->num_pluckers].worker = worker;
     243      753485 :   cc->num_pluckers++;
     244      753485 :   return 1;
     245             : }
     246             : 
     247      753907 : static void del_plucker(grpc_completion_queue *cc, void *tag,
     248             :                         grpc_pollset_worker *worker) {
     249             :   int i;
     250      767544 :   for (i = 0; i < cc->num_pluckers; i++) {
     251      767544 :     if (cc->pluckers[i].tag == tag && cc->pluckers[i].worker == worker) {
     252      753907 :       cc->num_pluckers--;
     253      753907 :       GPR_SWAP(plucker, cc->pluckers[i], cc->pluckers[cc->num_pluckers]);
     254     1507814 :       return;
     255             :     }
     256             :   }
     257           0 :   GPR_UNREACHABLE_CODE(return );
     258             : }
     259             : 
     260      711180 : grpc_event grpc_completion_queue_pluck(grpc_completion_queue *cc, void *tag,
     261             :                                        gpr_timespec deadline, void *reserved) {
     262             :   grpc_event ret;
     263             :   grpc_cq_completion *c;
     264             :   grpc_cq_completion *prev;
     265             :   grpc_pollset_worker worker;
     266             :   gpr_timespec now;
     267      711180 :   int first_loop = 1;
     268      711180 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     269             : 
     270      711180 :   GRPC_API_TRACE(
     271             :       "grpc_completion_queue_pluck("
     272             :       "cc=%p, tag=%p, "
     273             :       "deadline=gpr_timespec { tv_sec: %ld, tv_nsec: %d, clock_type: %d }, "
     274             :       "reserved=%p)",
     275             :       6, (cc, tag, (long)deadline.tv_sec, deadline.tv_nsec,
     276             :           (int)deadline.clock_type, reserved));
     277      711460 :   GPR_ASSERT(!reserved);
     278             : 
     279      711460 :   deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
     280             : 
     281      711424 :   GRPC_CQ_INTERNAL_REF(cc, "pluck");
     282      711455 :   gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
     283             :   for (;;) {
     284     1365007 :     prev = &cc->completed_head;
     285     4177079 :     while ((c = (grpc_cq_completion *)(prev->next & ~(gpr_uintptr)1)) !=
     286     1406036 :            &cc->completed_head) {
     287      651479 :       if (c->tag == tag) {
     288      610450 :         prev->next =
     289      610450 :             (prev->next & (gpr_uintptr)1) | (c->next & ~(gpr_uintptr)1);
     290      610450 :         if (c == cc->completed_tail) {
     291      457215 :           cc->completed_tail = prev;
     292             :         }
     293      610450 :         gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     294      610448 :         ret.type = GRPC_OP_COMPLETE;
     295      610448 :         ret.success = c->next & 1u;
     296      610448 :         ret.tag = c->tag;
     297      610448 :         c->done(&exec_ctx, c->done_arg, c);
     298      610465 :         goto done;
     299             :       }
     300       41029 :       prev = c;
     301             :     }
     302      754557 :     if (cc->shutdown) {
     303           0 :       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     304           0 :       memset(&ret, 0, sizeof(ret));
     305           0 :       ret.type = GRPC_QUEUE_SHUTDOWN;
     306           0 :       break;
     307             :     }
     308      754557 :     if (!add_plucker(cc, tag, &worker)) {
     309           0 :       gpr_log(GPR_DEBUG,
     310             :               "Too many outstanding grpc_completion_queue_pluck calls: maximum "
     311             :               "is %d",
     312             :               GRPC_MAX_COMPLETION_QUEUE_PLUCKERS);
     313           0 :       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     314           0 :       memset(&ret, 0, sizeof(ret));
     315             :       /* TODO(ctiller): should we use a different result here */
     316           0 :       ret.type = GRPC_QUEUE_TIMEOUT;
     317           0 :       break;
     318             :     }
     319      754542 :     now = gpr_now(GPR_CLOCK_MONOTONIC);
     320      754574 :     if (!first_loop && gpr_time_cmp(now, deadline) >= 0) {
     321      101025 :       del_plucker(cc, tag, &worker);
     322      101018 :       gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     323      101025 :       memset(&ret, 0, sizeof(ret));
     324      101025 :       ret.type = GRPC_QUEUE_TIMEOUT;
     325      101025 :       break;
     326             :     }
     327      653592 :     first_loop = 0;
     328      653592 :     grpc_pollset_work(&exec_ctx, &cc->pollset, &worker, now, deadline);
     329      653323 :     del_plucker(cc, tag, &worker);
     330      653355 :   }
     331             : done:
     332      711490 :   GRPC_SURFACE_TRACE_RETURNED_EVENT(cc, &ret);
     333      711490 :   GRPC_CQ_INTERNAL_UNREF(cc, "pluck");
     334      711482 :   grpc_exec_ctx_finish(&exec_ctx);
     335      711482 :   return ret;
     336             : }
     337             : 
     338             : /* Shutdown simply drops a ref that we reserved at creation time; if we drop
     339             :    to zero here, then enter shutdown mode and wake up any waiters */
     340      484198 : void grpc_completion_queue_shutdown(grpc_completion_queue *cc) {
     341      484198 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     342      484198 :   GRPC_API_TRACE("grpc_completion_queue_shutdown(cc=%p)", 1, (cc));
     343      484198 :   gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
     344      484326 :   if (cc->shutdown_called) {
     345      161770 :     gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     346      646099 :     return;
     347             :   }
     348      322556 :   cc->shutdown_called = 1;
     349      322556 :   gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     350             : 
     351      322544 :   if (gpr_unref(&cc->pending_events)) {
     352      313148 :     gpr_mu_lock(GRPC_POLLSET_MU(&cc->pollset));
     353      313149 :     GPR_ASSERT(!cc->shutdown);
     354      313149 :     cc->shutdown = 1;
     355      313149 :     gpr_mu_unlock(GRPC_POLLSET_MU(&cc->pollset));
     356      313148 :     grpc_pollset_shutdown(&exec_ctx, &cc->pollset, &cc->pollset_destroy_done);
     357             :   }
     358      322551 :   grpc_exec_ctx_finish(&exec_ctx);
     359             : }
     360             : 
     361      322510 : void grpc_completion_queue_destroy(grpc_completion_queue *cc) {
     362      322510 :   GRPC_API_TRACE("grpc_completion_queue_destroy(cc=%p)", 1, (cc));
     363      322510 :   grpc_completion_queue_shutdown(cc);
     364      322531 :   GRPC_CQ_INTERNAL_UNREF(cc, "destroy");
     365      322551 : }
     366             : 
     367     2708032 : grpc_pollset *grpc_cq_pollset(grpc_completion_queue *cc) {
     368     2708032 :   return &cc->pollset;
     369             : }
     370             : 
     371        2305 : void grpc_cq_mark_server_cq(grpc_completion_queue *cc) { cc->is_server_cq = 1; }
     372             : 
     373     1409323 : int grpc_cq_is_server_cq(grpc_completion_queue *cc) { return cc->is_server_cq; }

Generated by: LCOV version 1.10