LCOV - code coverage report
Current view: top level - core/client_config - subchannel.c (source / functions) Hit Total Coverage
Test: tmp.CaZ6RjdVn2 Lines: 376 405 92.8 %
Date: 2015-12-10 22:15:08 Functions: 38 39 97.4 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/client_config/subchannel.h"
      35             : 
      36             : #include <string.h>
      37             : 
      38             : #include <grpc/support/alloc.h>
      39             : 
      40             : #include "src/core/channel/channel_args.h"
      41             : #include "src/core/channel/client_channel.h"
      42             : #include "src/core/channel/connected_channel.h"
      43             : #include "src/core/client_config/initial_connect_string.h"
      44             : #include "src/core/iomgr/timer.h"
      45             : #include "src/core/profiling/timers.h"
      46             : #include "src/core/surface/channel.h"
      47             : #include "src/core/transport/connectivity_state.h"
      48             : #include "src/core/transport/connectivity_state.h"
      49             : 
      50             : #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
      51             : #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
      52             : #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
      53             : #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
      54             : #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
      55             : 
      56             : typedef struct {
      57             :   /* all fields protected by subchannel->mu */
      58             :   /** refcount */
      59             :   int refs;
      60             :   /** parent subchannel */
      61             :   grpc_subchannel *subchannel;
      62             : } connection;
      63             : 
      64             : typedef struct {
      65             :   grpc_closure closure;
      66             :   size_t version;
      67             :   grpc_subchannel *subchannel;
      68             :   grpc_connectivity_state connectivity_state;
      69             : } state_watcher;
      70             : 
      71             : typedef struct waiting_for_connect {
      72             :   struct waiting_for_connect *next;
      73             :   grpc_closure *notify;
      74             :   grpc_pollset *pollset;
      75             :   gpr_atm *target;
      76             :   grpc_subchannel *subchannel;
      77             :   grpc_closure continuation;
      78             : } waiting_for_connect;
      79             : 
      80             : struct grpc_subchannel {
      81             :   grpc_connector *connector;
      82             : 
      83             :   /** non-transport related channel filters */
      84             :   const grpc_channel_filter **filters;
      85             :   size_t num_filters;
      86             :   /** channel arguments */
      87             :   grpc_channel_args *args;
      88             :   /** address to connect to */
      89             :   struct sockaddr *addr;
      90             :   size_t addr_len;
      91             :   /** initial string to send to peer */
      92             :   gpr_slice initial_connect_string;
      93             :   /** master channel - the grpc_channel instance that ultimately owns
      94             :       this channel_data via its channel stack.
      95             :       We occasionally use this to bump the refcount on the master channel
      96             :       to keep ourselves alive through an asynchronous operation. */
      97             :   grpc_channel *master;
      98             :   /** have we seen a disconnection? */
      99             :   int disconnected;
     100             : 
     101             :   /** set during connection */
     102             :   grpc_connect_out_args connecting_result;
     103             : 
     104             :   /** callback for connection finishing */
     105             :   grpc_closure connected;
     106             : 
     107             :   /** pollset_set tracking who's interested in a connection
     108             :       being setup - owned by the master channel (in particular the
     109             :      client_channel
     110             :       filter there-in) */
     111             :   grpc_pollset_set *pollset_set;
     112             : 
     113             :   /** mutex protecting remaining elements */
     114             :   gpr_mu mu;
     115             : 
     116             :   /** active connection */
     117             :   connection *active;
     118             :   /** version number for the active connection */
     119             :   size_t active_version;
     120             :   /** refcount */
     121             :   int refs;
     122             :   /** are we connecting */
     123             :   int connecting;
     124             :   /** things waiting for a connection */
     125             :   waiting_for_connect *waiting;
     126             :   /** connectivity state tracking */
     127             :   grpc_connectivity_state_tracker state_tracker;
     128             : 
     129             :   /** next connect attempt time */
     130             :   gpr_timespec next_attempt;
     131             :   /** amount to backoff each failure */
     132             :   gpr_timespec backoff_delta;
     133             :   /** do we have an active alarm? */
     134             :   int have_alarm;
     135             :   /** our alarm */
     136             :   grpc_timer alarm;
     137             :   /** current random value */
     138             :   gpr_uint32 random;
     139             : };
     140             : 
     141             : struct grpc_subchannel_call {
     142             :   connection *connection;
     143             : };
     144             : 
     145             : #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
     146             : #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
     147             : #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
     148             :   (((grpc_subchannel_call *)(callstack)) - 1)
     149             : 
     150             : static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
     151             :                                          connection *con,
     152             :                                          grpc_pollset *pollset);
     153             : static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
     154             :                                               grpc_subchannel *c,
     155             :                                               const char *reason);
     156             : static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
     157             : static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
     158             : static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
     159             :                                  int iomgr_success);
     160             : 
     161             : static void subchannel_ref_locked(grpc_subchannel *c
     162             :                                       GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
     163             : static int subchannel_unref_locked(
     164             :     grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
     165             : static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
     166             : static grpc_subchannel *connection_unref_locked(
     167             :     grpc_exec_ctx *exec_ctx,
     168             :     connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
     169             : static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
     170             : 
     171             : #ifdef GRPC_STREAM_REFCOUNT_DEBUG
     172             : #define SUBCHANNEL_REF_LOCKED(p, r) \
     173             :   subchannel_ref_locked((p), __FILE__, __LINE__, (r))
     174             : #define SUBCHANNEL_UNREF_LOCKED(p, r) \
     175             :   subchannel_unref_locked((p), __FILE__, __LINE__, (r))
     176             : #define CONNECTION_REF_LOCKED(p, r) \
     177             :   connection_ref_locked((p), __FILE__, __LINE__, (r))
     178             : #define CONNECTION_UNREF_LOCKED(cl, p, r) \
     179             :   connection_unref_locked((cl), (p), __FILE__, __LINE__, (r))
     180             : #define REF_PASS_ARGS , file, line, reason
     181             : #define REF_PASS_REASON , reason
     182             : #define REF_LOG(name, p)                                                  \
     183             :   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p   ref %d -> %d %s", \
     184             :           (name), (p), (p)->refs, (p)->refs + 1, reason)
     185             : #define UNREF_LOG(name, p)                                                \
     186             :   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
     187             :           (name), (p), (p)->refs, (p)->refs - 1, reason)
     188             : #else
     189             : #define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
     190             : #define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
     191             : #define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
     192             : #define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p))
     193             : #define REF_PASS_ARGS
     194             : #define REF_PASS_REASON
     195             : #define REF_LOG(name, p) \
     196             :   do {                   \
     197             :   } while (0)
     198             : #define UNREF_LOG(name, p) \
     199             :   do {                     \
     200             :   } while (0)
     201             : #endif
     202             : 
     203             : /*
     204             :  * connection implementation
     205             :  */
     206             : 
     207        2303 : static void connection_destroy(grpc_exec_ctx *exec_ctx, connection *c) {
     208        2303 :   GPR_ASSERT(c->refs == 0);
     209        2303 :   grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
     210        2304 :   gpr_free(c);
     211        2304 : }
     212             : 
     213     2168123 : static void connection_ref_locked(connection *c
     214             :                                       GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     215             :   REF_LOG("CONNECTION", c);
     216     2168213 :   subchannel_ref_locked(c->subchannel REF_PASS_ARGS);
     217     2169204 :   ++c->refs;
     218     2169114 : }
     219             : 
     220     2169130 : static grpc_subchannel *connection_unref_locked(
     221             :     grpc_exec_ctx *exec_ctx, connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     222     2169111 :   grpc_subchannel *destroy = NULL;
     223             :   UNREF_LOG("CONNECTION", c);
     224     2169149 :   if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
     225         157 :     destroy = c->subchannel;
     226             :   }
     227     2169135 :   if (--c->refs == 0 && c->subchannel->active != c) {
     228         419 :     connection_destroy(exec_ctx, c);
     229             :   }
     230     2169135 :   return destroy;
     231             : }
     232             : 
     233             : /*
     234             :  * grpc_subchannel implementation
     235             :  */
     236             : 
     237     2178676 : static void subchannel_ref_locked(grpc_subchannel *c
     238             :                                       GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     239             :   REF_LOG("SUBCHANNEL", c);
     240     2178941 :   ++c->refs;
     241     2178676 : }
     242             : 
     243     2182496 : static int subchannel_unref_locked(grpc_subchannel *c
     244             :                                        GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     245             :   UNREF_LOG("SUBCHANNEL", c);
     246     2182696 :   return --c->refs == 0;
     247             : }
     248             : 
     249        4340 : void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     250        4340 :   gpr_mu_lock(&c->mu);
     251        4291 :   subchannel_ref_locked(c REF_PASS_ARGS);
     252        4340 :   gpr_mu_unlock(&c->mu);
     253        4340 : }
     254             : 
     255        8869 : void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
     256             :                            grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     257             :   int destroy;
     258        8869 :   gpr_mu_lock(&c->mu);
     259        8769 :   destroy = subchannel_unref_locked(c REF_PASS_ARGS);
     260        8869 :   gpr_mu_unlock(&c->mu);
     261        8869 :   if (destroy) subchannel_destroy(exec_ctx, c);
     262        8869 : }
     263             : 
     264        3834 : static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
     265        3834 :   if (c->active != NULL) {
     266           0 :     connection_destroy(exec_ctx, c->active);
     267             :   }
     268        3834 :   gpr_free((void *)c->filters);
     269        3834 :   grpc_channel_args_destroy(c->args);
     270        3834 :   gpr_free(c->addr);
     271        3834 :   gpr_slice_unref(c->initial_connect_string);
     272        3834 :   grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
     273        3834 :   grpc_connector_unref(exec_ctx, c->connector);
     274        3834 :   gpr_free(c);
     275        3834 : }
     276             : 
     277       10614 : void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
     278             :                                           grpc_subchannel *c,
     279             :                                           grpc_pollset *pollset) {
     280       10614 :   grpc_pollset_set_add_pollset(exec_ctx, c->pollset_set, pollset);
     281       10614 : }
     282             : 
     283       10029 : void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
     284             :                                           grpc_subchannel *c,
     285             :                                           grpc_pollset *pollset) {
     286       10029 :   grpc_pollset_set_del_pollset(exec_ctx, c->pollset_set, pollset);
     287       10488 : }
     288             : 
     289        3889 : static gpr_uint32 random_seed() {
     290        3889 :   return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC)));
     291             : }
     292             : 
     293        3889 : grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
     294             :                                         grpc_subchannel_args *args) {
     295        3889 :   grpc_subchannel *c = gpr_malloc(sizeof(*c));
     296        3889 :   grpc_channel_element *parent_elem = grpc_channel_stack_last_element(
     297             :       grpc_channel_get_channel_stack(args->master));
     298        3889 :   memset(c, 0, sizeof(*c));
     299        3889 :   c->refs = 1;
     300        3889 :   c->connector = connector;
     301        3889 :   grpc_connector_ref(c->connector);
     302        3889 :   c->num_filters = args->filter_count;
     303        3889 :   c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
     304        3889 :   memcpy((void *)c->filters, args->filters,
     305        3889 :          sizeof(grpc_channel_filter *) * c->num_filters);
     306        3889 :   c->addr = gpr_malloc(args->addr_len);
     307        3889 :   memcpy(c->addr, args->addr, args->addr_len);
     308        3889 :   c->addr_len = args->addr_len;
     309        3889 :   grpc_set_initial_connect_string(&c->addr, &c->addr_len,
     310             :                                   &c->initial_connect_string);
     311        3889 :   c->args = grpc_channel_args_copy(args->args);
     312        3889 :   c->master = args->master;
     313        3889 :   c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
     314        3889 :   c->random = random_seed();
     315        3889 :   grpc_closure_init(&c->connected, subchannel_connected, c);
     316        3889 :   grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
     317             :                                "subchannel");
     318        3889 :   gpr_mu_init(&c->mu);
     319        3889 :   return c;
     320             : }
     321             : 
     322         331 : static void cancel_waiting_calls(grpc_exec_ctx *exec_ctx,
     323             :                                  grpc_subchannel *subchannel,
     324             :                                  int iomgr_success) {
     325             :   waiting_for_connect *w4c;
     326         331 :   gpr_mu_lock(&subchannel->mu);
     327         331 :   w4c = subchannel->waiting;
     328         331 :   subchannel->waiting = NULL;
     329         331 :   gpr_mu_unlock(&subchannel->mu);
     330         662 :   while (w4c != NULL) {
     331           0 :     waiting_for_connect *next = w4c->next;
     332           0 :     grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
     333             :                                          w4c->pollset);
     334           0 :     if (w4c->notify) {
     335           0 :       w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
     336             :     }
     337             : 
     338           0 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
     339           0 :     gpr_free(w4c);
     340             : 
     341           0 :     w4c = next;
     342             :   }
     343         331 : }
     344             : 
     345          22 : void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx,
     346             :                                         grpc_subchannel *subchannel,
     347             :                                         gpr_atm *target) {
     348             :   waiting_for_connect *w4c;
     349          22 :   int unref_count = 0;
     350          22 :   gpr_mu_lock(&subchannel->mu);
     351          22 :   w4c = subchannel->waiting;
     352          22 :   subchannel->waiting = NULL;
     353          66 :   while (w4c != NULL) {
     354          22 :     waiting_for_connect *next = w4c->next;
     355          22 :     if (w4c->target == target) {
     356          22 :       grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
     357             :                                            w4c->pollset);
     358          22 :       grpc_exec_ctx_enqueue(exec_ctx, w4c->notify, 0);
     359             : 
     360          22 :       unref_count++;
     361          22 :       gpr_free(w4c);
     362             :     } else {
     363           0 :       w4c->next = subchannel->waiting;
     364           0 :       subchannel->waiting = w4c;
     365             :     }
     366             : 
     367          22 :     w4c = next;
     368             :   }
     369          22 :   gpr_mu_unlock(&subchannel->mu);
     370             : 
     371          66 :   while (unref_count-- > 0) {
     372          22 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "waiting_for_connect");
     373             :   }
     374          22 : }
     375             : 
     376        3114 : static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
     377             :   grpc_connect_in_args args;
     378             : 
     379        3114 :   args.interested_parties = c->pollset_set;
     380        3114 :   args.addr = c->addr;
     381        3114 :   args.addr_len = c->addr_len;
     382        3114 :   args.deadline = compute_connect_deadline(c);
     383        3114 :   args.channel_args = c->args;
     384        3114 :   args.initial_connect_string = c->initial_connect_string;
     385             : 
     386        3114 :   grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
     387             :                          &c->connected);
     388        3114 : }
     389             : 
     390        2720 : static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
     391        2720 :   c->backoff_delta = gpr_time_from_seconds(
     392             :       GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
     393        2720 :   c->next_attempt =
     394        2720 :       gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
     395        2720 :   continue_connect(exec_ctx, c);
     396        2720 : }
     397             : 
     398         326 : static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
     399             :                                    int iomgr_success) {
     400             :   int call_creation_finished_ok;
     401         326 :   waiting_for_connect *w4c = arg;
     402         326 :   grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
     403         326 :   call_creation_finished_ok = grpc_subchannel_create_call(
     404             :       exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
     405         326 :   GPR_ASSERT(call_creation_finished_ok == 1);
     406         326 :   w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
     407         326 :   GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
     408         326 :   gpr_free(w4c);
     409         326 : }
     410             : 
     411     2167184 : int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
     412             :                                 grpc_pollset *pollset, gpr_atm *target,
     413             :                                 grpc_closure *notify) {
     414             :   connection *con;
     415             :   grpc_subchannel_call *call;
     416             :   GPR_TIMER_BEGIN("grpc_subchannel_create_call", 0);
     417     2167184 :   gpr_mu_lock(&c->mu);
     418     2167681 :   if (c->active != NULL) {
     419     2167243 :     con = c->active;
     420     2167243 :     CONNECTION_REF_LOCKED(con, "call");
     421     2167302 :     gpr_mu_unlock(&c->mu);
     422             : 
     423     2167282 :     call = create_call(exec_ctx, con, pollset);
     424     2167328 :     if (!gpr_atm_rel_cas(target, 0, (gpr_atm)(gpr_uintptr)call)) {
     425           1 :       GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "failed to set");
     426             :     }
     427             :     GPR_TIMER_END("grpc_subchannel_create_call", 0);
     428     2166889 :     return 1;
     429             :   } else {
     430         348 :     waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
     431         348 :     w4c->next = c->waiting;
     432         348 :     w4c->notify = notify;
     433         348 :     w4c->pollset = pollset;
     434         348 :     w4c->target = target;
     435         348 :     w4c->subchannel = c;
     436             :     /* released when clearing w4c */
     437         348 :     SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
     438         348 :     grpc_closure_init(&w4c->continuation, continue_creating_call, w4c);
     439         348 :     c->waiting = w4c;
     440         348 :     grpc_subchannel_add_interested_party(exec_ctx, c, pollset);
     441         348 :     if (!c->connecting) {
     442          39 :       c->connecting = 1;
     443          39 :       connectivity_state_changed_locked(exec_ctx, c, "create_call");
     444             :       /* released by connection */
     445          39 :       SUBCHANNEL_REF_LOCKED(c, "connecting");
     446          39 :       GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
     447          39 :       gpr_mu_unlock(&c->mu);
     448             : 
     449          39 :       start_connect(exec_ctx, c);
     450             :     } else {
     451         309 :       gpr_mu_unlock(&c->mu);
     452             :     }
     453             :     GPR_TIMER_END("grpc_subchannel_create_call", 0);
     454         348 :     return 0;
     455             :   }
     456             : }
     457             : 
     458         271 : grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
     459             :   grpc_connectivity_state state;
     460         271 :   gpr_mu_lock(&c->mu);
     461         271 :   state = grpc_connectivity_state_check(&c->state_tracker);
     462         271 :   gpr_mu_unlock(&c->mu);
     463         271 :   return state;
     464             : }
     465             : 
     466        8874 : void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
     467             :                                             grpc_subchannel *c,
     468             :                                             grpc_connectivity_state *state,
     469             :                                             grpc_closure *notify) {
     470        8623 :   int do_connect = 0;
     471        8874 :   gpr_mu_lock(&c->mu);
     472        8874 :   if (grpc_connectivity_state_notify_on_state_change(
     473             :           exec_ctx, &c->state_tracker, state, notify)) {
     474        2596 :     do_connect = 1;
     475        2681 :     c->connecting = 1;
     476             :     /* released by connection */
     477        2596 :     SUBCHANNEL_REF_LOCKED(c, "connecting");
     478        2681 :     GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
     479        2681 :     connectivity_state_changed_locked(exec_ctx, c, "state_change");
     480             :   }
     481        8874 :   gpr_mu_unlock(&c->mu);
     482             : 
     483        8874 :   if (do_connect) {
     484        2681 :     start_connect(exec_ctx, c);
     485             :   }
     486        8874 : }
     487             : 
     488         246 : int grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
     489             :                                              grpc_subchannel *c,
     490             :                                              grpc_closure *subscribed_notify) {
     491             :   int success;
     492         246 :   gpr_mu_lock(&c->mu);
     493         246 :   success = grpc_connectivity_state_change_unsubscribe(
     494             :       exec_ctx, &c->state_tracker, subscribed_notify);
     495         246 :   gpr_mu_unlock(&c->mu);
     496         246 :   return success;
     497             : }
     498             : 
     499        3614 : void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
     500             :                                           grpc_subchannel *c,
     501             :                                           grpc_transport_op *op) {
     502        3565 :   connection *con = NULL;
     503             :   grpc_subchannel *destroy;
     504        3565 :   int cancel_alarm = 0;
     505        3614 :   gpr_mu_lock(&c->mu);
     506        3614 :   if (c->active != NULL) {
     507        1882 :     con = c->active;
     508        1882 :     CONNECTION_REF_LOCKED(con, "transport-op");
     509             :   }
     510        3614 :   if (op->disconnect) {
     511        3614 :     c->disconnected = 1;
     512        3614 :     connectivity_state_changed_locked(exec_ctx, c, "disconnect");
     513        3614 :     if (c->have_alarm) {
     514          45 :       cancel_alarm = 1;
     515             :     }
     516             :   }
     517        3614 :   gpr_mu_unlock(&c->mu);
     518             : 
     519        3614 :   if (con != NULL) {
     520        1882 :     grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
     521        1882 :     grpc_channel_element *top_elem =
     522             :         grpc_channel_stack_element(channel_stack, 0);
     523        1882 :     top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
     524             : 
     525        1882 :     gpr_mu_lock(&c->mu);
     526        1882 :     destroy = CONNECTION_UNREF_LOCKED(exec_ctx, con, "transport-op");
     527        1882 :     gpr_mu_unlock(&c->mu);
     528        1882 :     if (destroy) {
     529           0 :       subchannel_destroy(exec_ctx, destroy);
     530             :     }
     531             :   }
     532             : 
     533        3614 :   if (cancel_alarm) {
     534          47 :     grpc_timer_cancel(exec_ctx, &c->alarm);
     535             :   }
     536             : 
     537        3614 :   if (op->disconnect) {
     538        3614 :     grpc_connector_shutdown(exec_ctx, c->connector);
     539             :   }
     540        3614 : }
     541             : 
     542        2346 : static void on_state_changed(grpc_exec_ctx *exec_ctx, void *p,
     543             :                              int iomgr_success) {
     544        2306 :   state_watcher *sw = p;
     545        2346 :   grpc_subchannel *c = sw->subchannel;
     546        2346 :   gpr_mu *mu = &c->mu;
     547             :   int destroy;
     548             :   grpc_transport_op op;
     549             :   grpc_channel_element *elem;
     550        2306 :   connection *destroy_connection = NULL;
     551             : 
     552        2346 :   gpr_mu_lock(mu);
     553             : 
     554             :   /* if we failed or there is a version number mismatch, just leave
     555             :      this closure */
     556        2346 :   if (!iomgr_success || sw->subchannel->active_version != sw->version) {
     557             :     goto done;
     558             :   }
     559             : 
     560        2346 :   switch (sw->connectivity_state) {
     561             :     case GRPC_CHANNEL_CONNECTING:
     562             :     case GRPC_CHANNEL_READY:
     563             :     case GRPC_CHANNEL_IDLE:
     564             :       /* all is still good: keep watching */
     565           0 :       memset(&op, 0, sizeof(op));
     566           0 :       op.connectivity_state = &sw->connectivity_state;
     567           0 :       op.on_connectivity_state_change = &sw->closure;
     568           0 :       elem = grpc_channel_stack_element(
     569           0 :           CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
     570           0 :       elem->filter->start_transport_op(exec_ctx, elem, &op);
     571             :       /* early out */
     572           0 :       gpr_mu_unlock(mu);
     573        2346 :       return;
     574             :     case GRPC_CHANNEL_FATAL_FAILURE:
     575             :     case GRPC_CHANNEL_TRANSIENT_FAILURE:
     576             :       /* things have gone wrong, deactivate and enter idle */
     577        2346 :       if (sw->subchannel->active->refs == 0) {
     578        1881 :         destroy_connection = sw->subchannel->active;
     579             :       }
     580        2346 :       sw->subchannel->active = NULL;
     581        2346 :       grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
     582        2346 :                                   c->disconnected
     583             :                                       ? GRPC_CHANNEL_FATAL_FAILURE
     584             :                                       : GRPC_CHANNEL_TRANSIENT_FAILURE,
     585             :                                   "connection_failed");
     586        2346 :       break;
     587             :   }
     588             : 
     589             : done:
     590        2346 :   connectivity_state_changed_locked(exec_ctx, c, "transport_state_changed");
     591        2306 :   destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
     592        2346 :   gpr_free(sw);
     593        2346 :   gpr_mu_unlock(mu);
     594        2346 :   if (destroy) {
     595        1718 :     subchannel_destroy(exec_ctx, c);
     596             :   }
     597        2345 :   if (destroy_connection != NULL) {
     598        1884 :     connection_destroy(exec_ctx, destroy_connection);
     599             :   }
     600             : }
     601             : 
     602        2347 : static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
     603             :   size_t channel_stack_size;
     604             :   connection *con;
     605             :   grpc_channel_stack *stk;
     606             :   size_t num_filters;
     607             :   const grpc_channel_filter **filters;
     608             :   waiting_for_connect *w4c;
     609             :   grpc_transport_op op;
     610             :   state_watcher *sw;
     611        2306 :   connection *destroy_connection = NULL;
     612             :   grpc_channel_element *elem;
     613             : 
     614             :   /* build final filter list */
     615        2347 :   num_filters = c->num_filters + c->connecting_result.num_filters + 1;
     616        2347 :   filters = gpr_malloc(sizeof(*filters) * num_filters);
     617        2347 :   memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
     618        2347 :   memcpy((void *)(filters + c->num_filters), c->connecting_result.filters,
     619        2347 :          sizeof(*filters) * c->connecting_result.num_filters);
     620        2347 :   filters[num_filters - 1] = &grpc_connected_channel_filter;
     621             : 
     622             :   /* construct channel stack */
     623        2347 :   channel_stack_size = grpc_channel_stack_size(filters, num_filters);
     624        2346 :   con = gpr_malloc(sizeof(connection) + channel_stack_size);
     625        2347 :   stk = (grpc_channel_stack *)(con + 1);
     626        2347 :   con->refs = 0;
     627        2347 :   con->subchannel = c;
     628        2347 :   grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args,
     629             :                           stk);
     630        2347 :   grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
     631        2347 :   gpr_free((void *)c->connecting_result.filters);
     632        2347 :   memset(&c->connecting_result, 0, sizeof(c->connecting_result));
     633             : 
     634             :   /* initialize state watcher */
     635        2347 :   sw = gpr_malloc(sizeof(*sw));
     636        2347 :   grpc_closure_init(&sw->closure, on_state_changed, sw);
     637        2347 :   sw->subchannel = c;
     638        2347 :   sw->connectivity_state = GRPC_CHANNEL_READY;
     639             : 
     640        2347 :   gpr_mu_lock(&c->mu);
     641             : 
     642        2347 :   if (c->disconnected) {
     643           0 :     gpr_mu_unlock(&c->mu);
     644           0 :     gpr_free(sw);
     645           0 :     gpr_free((void *)filters);
     646           0 :     grpc_channel_stack_destroy(exec_ctx, stk);
     647           0 :     GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
     648           0 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
     649        2347 :     return;
     650             :   }
     651             : 
     652             :   /* publish */
     653        2347 :   if (c->active != NULL && c->active->refs == 0) {
     654           0 :     destroy_connection = c->active;
     655             :   }
     656        2347 :   c->active = con;
     657        2347 :   c->active_version++;
     658        2347 :   sw->version = c->active_version;
     659        2347 :   c->connecting = 0;
     660             : 
     661             :   /* watch for changes; subchannel ref for connecting is donated
     662             :      to the state watcher */
     663        2347 :   memset(&op, 0, sizeof(op));
     664        2347 :   op.connectivity_state = &sw->connectivity_state;
     665        2347 :   op.on_connectivity_state_change = &sw->closure;
     666        2347 :   op.bind_pollset_set = c->pollset_set;
     667        2306 :   SUBCHANNEL_REF_LOCKED(c, "state_watcher");
     668        2347 :   GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
     669        2347 :   GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
     670        2347 :   elem =
     671        2347 :       grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
     672        2347 :   elem->filter->start_transport_op(exec_ctx, elem, &op);
     673             : 
     674             :   /* signal completion */
     675        2347 :   connectivity_state_changed_locked(exec_ctx, c, "connected");
     676        2347 :   w4c = c->waiting;
     677        2347 :   c->waiting = NULL;
     678             : 
     679        2347 :   gpr_mu_unlock(&c->mu);
     680             : 
     681        5020 :   while (w4c != NULL) {
     682         326 :     waiting_for_connect *next = w4c->next;
     683         326 :     grpc_exec_ctx_enqueue(exec_ctx, &w4c->continuation, 1);
     684         326 :     w4c = next;
     685             :   }
     686             : 
     687        2347 :   gpr_free((void *)filters);
     688             : 
     689        2347 :   if (destroy_connection != NULL) {
     690           0 :     connection_destroy(exec_ctx, destroy_connection);
     691             :   }
     692             : }
     693             : 
     694             : /* Generate a random number between 0 and 1. */
     695         264 : static double generate_uniform_random_number(grpc_subchannel *c) {
     696         264 :   c->random = (1103515245 * c->random + 12345) % ((gpr_uint32)1 << 31);
     697         264 :   return c->random / (double)((gpr_uint32)1 << 31);
     698             : }
     699             : 
     700             : /* Update backoff_delta and next_attempt in subchannel */
     701         394 : static void update_reconnect_parameters(grpc_subchannel *c) {
     702             :   size_t i;
     703             :   gpr_int32 backoff_delta_millis, jitter;
     704         394 :   gpr_int32 max_backoff_millis =
     705             :       GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
     706             :   double jitter_range;
     707             : 
     708         394 :   if (c->args) {
     709         525 :     for (i = 0; i < c->args->num_args; i++) {
     710         261 :       if (0 == strcmp(c->args->args[i].key,
     711             :                       "grpc.testing.fixed_reconnect_backoff")) {
     712         130 :         GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
     713         130 :         c->next_attempt = gpr_time_add(
     714             :             gpr_now(GPR_CLOCK_MONOTONIC),
     715         130 :             gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN));
     716         524 :         return;
     717             :       }
     718             :     }
     719             :   }
     720             : 
     721         264 :   backoff_delta_millis =
     722         264 :       (gpr_int32)(gpr_time_to_millis(c->backoff_delta) *
     723             :                   GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER);
     724         264 :   if (backoff_delta_millis > max_backoff_millis) {
     725           0 :     backoff_delta_millis = max_backoff_millis;
     726             :   }
     727         264 :   c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN);
     728         264 :   c->next_attempt =
     729         264 :       gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
     730             : 
     731         264 :   jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis;
     732         264 :   jitter =
     733         264 :       (gpr_int32)((2 * generate_uniform_random_number(c) - 1) * jitter_range);
     734         264 :   c->next_attempt =
     735         264 :       gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
     736             : }
     737             : 
     738         725 : static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
     739         723 :   grpc_subchannel *c = arg;
     740         725 :   gpr_mu_lock(&c->mu);
     741         725 :   c->have_alarm = 0;
     742         725 :   if (c->disconnected) {
     743         329 :     iomgr_success = 0;
     744             :   }
     745         725 :   connectivity_state_changed_locked(exec_ctx, c, "alarm");
     746         725 :   gpr_mu_unlock(&c->mu);
     747         725 :   if (iomgr_success) {
     748         394 :     update_reconnect_parameters(c);
     749         394 :     continue_connect(exec_ctx, c);
     750             :   } else {
     751         331 :     cancel_waiting_calls(exec_ctx, c, iomgr_success);
     752         331 :     GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
     753         331 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
     754             :   }
     755         725 : }
     756             : 
     757        3072 : static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
     758             :                                  int iomgr_success) {
     759        3029 :   grpc_subchannel *c = arg;
     760        3072 :   if (c->connecting_result.transport != NULL) {
     761        2347 :     publish_transport(exec_ctx, c);
     762             :   } else {
     763         725 :     gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
     764         725 :     gpr_mu_lock(&c->mu);
     765         725 :     GPR_ASSERT(!c->have_alarm);
     766         725 :     c->have_alarm = 1;
     767         725 :     connectivity_state_changed_locked(exec_ctx, c, "connect_failed");
     768         725 :     grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
     769         725 :     gpr_mu_unlock(&c->mu);
     770             :   }
     771        3072 : }
     772             : 
     773        3114 : static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
     774        3114 :   gpr_timespec current_deadline =
     775             :       gpr_time_add(c->next_attempt, c->backoff_delta);
     776        3114 :   gpr_timespec min_deadline = gpr_time_add(
     777             :       gpr_now(GPR_CLOCK_MONOTONIC),
     778             :       gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS,
     779             :                             GPR_TIMESPAN));
     780        3114 :   return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline
     781             :                                                           : min_deadline;
     782             : }
     783             : 
     784       12258 : static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
     785       12477 :   if (c->disconnected) {
     786        6060 :     return GRPC_CHANNEL_FATAL_FAILURE;
     787             :   }
     788        6366 :   if (c->connecting) {
     789        3555 :     if (c->have_alarm) {
     790         439 :       return GRPC_CHANNEL_TRANSIENT_FAILURE;
     791             :     }
     792        3029 :     return GRPC_CHANNEL_CONNECTING;
     793             :   }
     794        2811 :   if (c->active) {
     795        2306 :     return GRPC_CHANNEL_READY;
     796             :   }
     797         424 :   return GRPC_CHANNEL_IDLE;
     798             : }
     799             : 
     800       12477 : static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
     801             :                                               grpc_subchannel *c,
     802             :                                               const char *reason) {
     803       12258 :   grpc_connectivity_state current = compute_connectivity_locked(c);
     804       12477 :   grpc_connectivity_state_set(exec_ctx, &c->state_tracker, current, reason);
     805       12477 : }
     806             : 
     807             : /*
     808             :  * grpc_subchannel_call implementation
     809             :  */
     810             : 
     811     2166392 : static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
     812             :                                     int success) {
     813     2166373 :   grpc_subchannel_call *c = call;
     814     2166392 :   gpr_mu *mu = &c->connection->subchannel->mu;
     815             :   grpc_subchannel *destroy;
     816             :   GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
     817     2166392 :   grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
     818     2167222 :   gpr_mu_lock(mu);
     819     2167251 :   destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call");
     820     2167251 :   gpr_mu_unlock(mu);
     821     2167249 :   gpr_free(c);
     822     2167206 :   if (destroy != NULL) {
     823         157 :     subchannel_destroy(exec_ctx, destroy);
     824             :   }
     825             :   GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
     826     2167206 : }
     827             : 
     828     2167227 : void grpc_subchannel_call_ref(grpc_subchannel_call *c
     829             :                                   GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     830             : #ifdef GRPC_STREAM_REFCOUNT_DEBUG
     831             :   grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c), reason);
     832             : #else
     833     2167227 :   grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c));
     834             : #endif
     835     2167304 : }
     836             : 
     837     4332322 : void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
     838             :                                 grpc_subchannel_call *c
     839             :                                     GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     840             : #ifdef GRPC_STREAM_REFCOUNT_DEBUG
     841             :   grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), reason);
     842             : #else
     843     4332322 :   grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
     844             : #endif
     845     4333313 : }
     846             : 
     847         434 : char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
     848             :                                     grpc_subchannel_call *call) {
     849         434 :   grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
     850         434 :   grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
     851         434 :   return top_elem->filter->get_peer(exec_ctx, top_elem);
     852             : }
     853             : 
     854     4171414 : void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
     855             :                                      grpc_subchannel_call *call,
     856             :                                      grpc_transport_stream_op *op) {
     857     4171414 :   grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
     858     4171414 :   grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
     859     4171467 :   top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
     860     4173196 : }
     861             : 
     862     2165267 : static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
     863             :                                          connection *con,
     864             :                                          grpc_pollset *pollset) {
     865     2165267 :   grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
     866     2165267 :   grpc_subchannel_call *call =
     867     2165267 :       gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
     868     2166955 :   grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
     869     2166955 :   call->connection = con;
     870     2166955 :   grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
     871             :                        NULL, NULL, callstk);
     872     2167307 :   grpc_call_stack_set_pollset(exec_ctx, callstk, pollset);
     873     2167328 :   return call;
     874             : }
     875             : 
     876         270 : grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel) {
     877         270 :   return subchannel->master;
     878             : }
     879             : 
     880           0 : grpc_call_stack *grpc_subchannel_call_get_call_stack(
     881             :     grpc_subchannel_call *subchannel_call) {
     882           0 :   return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
     883             : }

Generated by: LCOV version 1.11