LCOV - code coverage report
Current view: top level - core/channel - subchannel_call_holder.c (source / functions) Hit Total Coverage
Test: tmp.CaZ6RjdVn2 Lines: 127 132 96.2 %
Date: 2015-12-10 22:15:08 Functions: 11 11 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/channel/subchannel_call_holder.h"
      35             : 
      36             : #include <grpc/support/alloc.h>
      37             : 
      38             : #include "src/core/profiling/timers.h"
      39             : 
      40             : #define GET_CALL(holder) \
      41             :   ((grpc_subchannel_call *)(gpr_atm_acq_load(&(holder)->subchannel_call)))
      42             : 
      43             : #define CANCELLED_CALL ((grpc_subchannel_call *)1)
      44             : 
      45             : static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
      46             :                              int success);
      47             : static void call_ready(grpc_exec_ctx *exec_ctx, void *holder, int success);
      48             : static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
      49             :                       int success);
      50             : 
      51             : static void add_waiting_locked(grpc_subchannel_call_holder *holder,
      52             :                                grpc_transport_stream_op *op);
      53             : static void fail_locked(grpc_exec_ctx *exec_ctx,
      54             :                         grpc_subchannel_call_holder *holder);
      55             : static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
      56             :                                  grpc_subchannel_call_holder *holder);
      57             : 
      58     2166623 : void grpc_subchannel_call_holder_init(
      59             :     grpc_subchannel_call_holder *holder,
      60             :     grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
      61             :     void *pick_subchannel_arg) {
      62     2166623 :   gpr_atm_rel_store(&holder->subchannel_call, 0);
      63     2166623 :   holder->pick_subchannel = pick_subchannel;
      64     2166623 :   holder->pick_subchannel_arg = pick_subchannel_arg;
      65     2166623 :   gpr_mu_init(&holder->mu);
      66     2167689 :   holder->subchannel = NULL;
      67     2167689 :   holder->waiting_ops = NULL;
      68     2167689 :   holder->waiting_ops_count = 0;
      69     2167689 :   holder->waiting_ops_capacity = 0;
      70     2167689 :   holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
      71     2167689 : }
      72             : 
      73     2167547 : void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
      74             :                                          grpc_subchannel_call_holder *holder) {
      75     2167547 :   grpc_subchannel_call *call = GET_CALL(holder);
      76     2167547 :   if (call != NULL && call != CANCELLED_CALL) {
      77     2167052 :     GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "holder");
      78             :   }
      79     2167724 :   GPR_ASSERT(holder->creation_phase ==
      80             :              GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
      81     2167724 :   gpr_mu_destroy(&holder->mu);
      82     2167719 :   GPR_ASSERT(holder->waiting_ops_count == 0);
      83     2167719 :   gpr_free(holder->waiting_ops);
      84     2167720 : }
      85             : 
      86     4173230 : void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
      87             :                                             grpc_subchannel_call_holder *holder,
      88             :                                             grpc_transport_stream_op *op) {
      89             :   /* try to (atomically) get the call */
      90     4173230 :   grpc_subchannel_call *call = GET_CALL(holder);
      91             :   GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0);
      92     4173230 :   if (call == CANCELLED_CALL) {
      93         156 :     grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
      94             :     GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
      95         156 :     return;
      96             :   }
      97     4173074 :   if (call != NULL) {
      98     1997030 :     grpc_subchannel_call_process_op(exec_ctx, call, op);
      99             :     GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
     100     1997311 :     return;
     101             :   }
     102             :   /* we failed; lock and figure out what to do */
     103     2176044 :   gpr_mu_lock(&holder->mu);
     104             : retry:
     105             :   /* need to recheck that another thread hasn't set the call */
     106     4333500 :   call = GET_CALL(holder);
     107     4333500 :   if (call == CANCELLED_CALL) {
     108           0 :     gpr_mu_unlock(&holder->mu);
     109           0 :     grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
     110             :     GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
     111           0 :     return;
     112             :   }
     113     4333500 :   if (call != NULL) {
     114     2156930 :     gpr_mu_unlock(&holder->mu);
     115     2156950 :     grpc_subchannel_call_process_op(exec_ctx, call, op);
     116             :     GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
     117     2156944 :     return;
     118             :   }
     119             :   /* if this is a cancellation, then we can raise our cancelled flag */
     120     2176570 :   if (op->cancel_with_status != GRPC_STATUS_OK) {
     121         546 :     if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) {
     122           0 :       goto retry;
     123             :     } else {
     124         546 :       switch (holder->creation_phase) {
     125             :         case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
     126         359 :           fail_locked(exec_ctx, holder);
     127         359 :           break;
     128             :         case GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL:
     129          22 :           grpc_subchannel_cancel_create_call(exec_ctx, holder->subchannel,
     130             :                                              &holder->subchannel_call);
     131          22 :           break;
     132             :         case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
     133         165 :           holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
     134             :                                   &holder->subchannel, NULL);
     135         165 :           break;
     136             :       }
     137         546 :       gpr_mu_unlock(&holder->mu);
     138         546 :       grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
     139             :       GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
     140         546 :       return;
     141             :     }
     142             :   }
     143             :   /* if we don't have a subchannel, try to get one */
     144     4343843 :   if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
     145     4335236 :       holder->subchannel == NULL && op->send_initial_metadata != NULL) {
     146     2167517 :     holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
     147     2167517 :     grpc_closure_init(&holder->next_step, subchannel_ready, holder);
     148     2167520 :     if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg,
     149             :                                 op->send_initial_metadata, &holder->subchannel,
     150             :                                 &holder->next_step)) {
     151     2157279 :       holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
     152             :     }
     153             :   }
     154             :   /* if we've got a subchannel, then let's ask it to create a call */
     155     4333634 :   if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
     156     2157629 :       holder->subchannel != NULL) {
     157     2157265 :     holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL;
     158     2157265 :     grpc_closure_init(&holder->next_step, call_ready, holder);
     159     2157263 :     if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
     160             :                                     holder->pollset, &holder->subchannel_call,
     161             :                                     &holder->next_step)) {
     162             :       /* got one immediately - continue the op (and any waiting ops) */
     163     2156955 :       holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
     164     2156955 :       retry_waiting_locked(exec_ctx, holder);
     165     2156932 :       goto retry;
     166             :     }
     167             :   }
     168             :   /* nothing to be done but wait */
     169       19080 :   add_waiting_locked(holder, op);
     170       19361 :   gpr_mu_unlock(&holder->mu);
     171             :   GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
     172             : }
     173             : 
     174       10214 : static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
     175       10172 :   grpc_subchannel_call_holder *holder = arg;
     176             :   grpc_subchannel_call *call;
     177       10214 :   gpr_mu_lock(&holder->mu);
     178       10219 :   GPR_ASSERT(holder->creation_phase ==
     179             :              GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
     180       10219 :   call = GET_CALL(holder);
     181       10219 :   GPR_ASSERT(call == NULL || call == CANCELLED_CALL);
     182       10219 :   if (holder->subchannel == NULL) {
     183         164 :     holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
     184         164 :     fail_locked(exec_ctx, holder);
     185             :   } else {
     186       10055 :     grpc_closure_init(&holder->next_step, call_ready, holder);
     187       10052 :     if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
     188             :                                     holder->pollset, &holder->subchannel_call,
     189             :                                     &holder->next_step)) {
     190       10059 :       holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
     191             :       /* got one immediately - continue the op (and any waiting ops) */
     192       10059 :       retry_waiting_locked(exec_ctx, holder);
     193             :     }
     194             :   }
     195       10200 :   gpr_mu_unlock(&holder->mu);
     196       10218 : }
     197             : 
     198         348 : static void call_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
     199         348 :   grpc_subchannel_call_holder *holder = arg;
     200             :   GPR_TIMER_BEGIN("call_ready", 0);
     201         348 :   gpr_mu_lock(&holder->mu);
     202         348 :   GPR_ASSERT(holder->creation_phase ==
     203             :              GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL);
     204         348 :   holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
     205         348 :   if (GET_CALL(holder) != NULL) {
     206         348 :     retry_waiting_locked(exec_ctx, holder);
     207             :   } else {
     208           0 :     fail_locked(exec_ctx, holder);
     209             :   }
     210         348 :   gpr_mu_unlock(&holder->mu);
     211             :   GPR_TIMER_END("call_ready", 0);
     212         348 : }
     213             : 
     214             : typedef struct {
     215             :   grpc_transport_stream_op *ops;
     216             :   size_t nops;
     217             :   grpc_subchannel_call *call;
     218             : } retry_ops_args;
     219             : 
     220     2167187 : static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
     221             :                                  grpc_subchannel_call_holder *holder) {
     222     2167187 :   retry_ops_args *a = gpr_malloc(sizeof(*a));
     223     2167307 :   a->ops = holder->waiting_ops;
     224     2167307 :   a->nops = holder->waiting_ops_count;
     225     2167307 :   a->call = GET_CALL(holder);
     226     2167307 :   if (a->call == CANCELLED_CALL) {
     227          23 :     gpr_free(a);
     228          23 :     fail_locked(exec_ctx, holder);
     229     2167290 :     return;
     230             :   }
     231     2167284 :   holder->waiting_ops = NULL;
     232     2167284 :   holder->waiting_ops_count = 0;
     233     2167284 :   holder->waiting_ops_capacity = 0;
     234     2167284 :   GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
     235     2167304 :   grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), 1);
     236             : }
     237             : 
     238     2167243 : static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, int success) {
     239     2167154 :   retry_ops_args *a = args;
     240             :   size_t i;
     241     2186315 :   for (i = 0; i < a->nops; i++) {
     242       19016 :     grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
     243             :   }
     244     2167299 :   GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
     245     2167336 :   gpr_free(a->ops);
     246     2167212 :   gpr_free(a);
     247     2167326 : }
     248             : 
     249       19361 : static void add_waiting_locked(grpc_subchannel_call_holder *holder,
     250             :                                grpc_transport_stream_op *op) {
     251             :   GPR_TIMER_BEGIN("add_waiting_locked", 0);
     252       19361 :   if (holder->waiting_ops_count == holder->waiting_ops_capacity) {
     253       10924 :     holder->waiting_ops_capacity = GPR_MAX(3, 2 * holder->waiting_ops_capacity);
     254       10924 :     holder->waiting_ops =
     255       10924 :         gpr_realloc(holder->waiting_ops, holder->waiting_ops_capacity *
     256             :                                              sizeof(*holder->waiting_ops));
     257             :   }
     258       19361 :   holder->waiting_ops[holder->waiting_ops_count++] = *op;
     259             :   GPR_TIMER_END("add_waiting_locked", 0);
     260       19361 : }
     261             : 
     262         546 : static void fail_locked(grpc_exec_ctx *exec_ctx,
     263             :                         grpc_subchannel_call_holder *holder) {
     264             :   size_t i;
     265         795 :   for (i = 0; i < holder->waiting_ops_count; i++) {
     266         271 :     grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, 0);
     267         271 :     grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready,
     268             :                           0);
     269             :   }
     270         546 :   holder->waiting_ops_count = 0;
     271         546 : }
     272             : 
     273         816 : char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
     274             :                                            grpc_subchannel_call_holder *holder,
     275             :                                            grpc_channel *master) {
     276         816 :   grpc_subchannel_call *subchannel_call = GET_CALL(holder);
     277             : 
     278         816 :   if (subchannel_call) {
     279         434 :     return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
     280             :   } else {
     281         382 :     return grpc_channel_get_target(master);
     282             :   }
     283             : }

Generated by: LCOV version 1.11