LCOV - code coverage report
Current view: top level - src/core/client_config - subchannel.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 329 364 90.4 %
Date: 2015-10-10 Functions: 33 34 97.1 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/client_config/subchannel.h"
      35             : 
      36             : #include <string.h>
      37             : 
      38             : #include <grpc/support/alloc.h>
      39             : 
      40             : #include "src/core/channel/channel_args.h"
      41             : #include "src/core/channel/client_channel.h"
      42             : #include "src/core/channel/connected_channel.h"
      43             : #include "src/core/iomgr/alarm.h"
      44             : #include "src/core/transport/connectivity_state.h"
      45             : #include "src/core/surface/channel.h"
      46             : 
      47             : #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
      48             : #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
      49             : #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
      50             : #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
      51             : #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
      52             : 
      53             : typedef struct {
      54             :   /* all fields protected by subchannel->mu */
      55             :   /** refcount */
      56             :   int refs;
      57             :   /** parent subchannel */
      58             :   grpc_subchannel *subchannel;
      59             : } connection;
      60             : 
      61             : typedef struct {
      62             :   grpc_closure closure;
      63             :   size_t version;
      64             :   grpc_subchannel *subchannel;
      65             :   grpc_connectivity_state connectivity_state;
      66             : } state_watcher;
      67             : 
      68             : typedef struct waiting_for_connect {
      69             :   struct waiting_for_connect *next;
      70             :   grpc_closure *notify;
      71             :   grpc_pollset *pollset;
      72             :   grpc_subchannel_call **target;
      73             :   grpc_subchannel *subchannel;
      74             :   grpc_closure continuation;
      75             : } waiting_for_connect;
      76             : 
      77             : struct grpc_subchannel {
      78             :   grpc_connector *connector;
      79             : 
      80             :   /** non-transport related channel filters */
      81             :   const grpc_channel_filter **filters;
      82             :   size_t num_filters;
      83             :   /** channel arguments */
      84             :   grpc_channel_args *args;
      85             :   /** address to connect to */
      86             :   struct sockaddr *addr;
      87             :   size_t addr_len;
      88             :   /** metadata context */
      89             :   grpc_mdctx *mdctx;
      90             :   /** master channel - the grpc_channel instance that ultimately owns
      91             :       this channel_data via its channel stack.
      92             :       We occasionally use this to bump the refcount on the master channel
      93             :       to keep ourselves alive through an asynchronous operation. */
      94             :   grpc_channel *master;
      95             :   /** have we seen a disconnection? */
      96             :   int disconnected;
      97             : 
      98             :   /** set during connection */
      99             :   grpc_connect_out_args connecting_result;
     100             : 
     101             :   /** callback for connection finishing */
     102             :   grpc_closure connected;
     103             : 
     104             :   /** pollset_set tracking who's interested in a connection
     105             :       being setup - owned by the master channel (in particular the
     106             :      client_channel
     107             :       filter there-in) */
     108             :   grpc_pollset_set *pollset_set;
     109             : 
     110             :   /** mutex protecting remaining elements */
     111             :   gpr_mu mu;
     112             : 
     113             :   /** active connection */
     114             :   connection *active;
     115             :   /** version number for the active connection */
     116             :   size_t active_version;
     117             :   /** refcount */
     118             :   int refs;
     119             :   /** are we connecting */
     120             :   int connecting;
     121             :   /** things waiting for a connection */
     122             :   waiting_for_connect *waiting;
     123             :   /** connectivity state tracking */
     124             :   grpc_connectivity_state_tracker state_tracker;
     125             : 
     126             :   /** next connect attempt time */
     127             :   gpr_timespec next_attempt;
     128             :   /** amount to backoff each failure */
     129             :   gpr_timespec backoff_delta;
     130             :   /** do we have an active alarm? */
     131             :   int have_alarm;
     132             :   /** our alarm */
     133             :   grpc_alarm alarm;
     134             :   /** current random value */
     135             :   gpr_uint32 random;
     136             : };
     137             : 
     138             : struct grpc_subchannel_call {
     139             :   connection *connection;
     140             :   gpr_refcount refs;
     141             : };
     142             : 
     143             : #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
     144             : #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
     145             : 
     146             : static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
     147             :                                          connection *con);
     148             : static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
     149             :                                               grpc_subchannel *c,
     150             :                                               const char *reason);
     151             : static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
     152             : static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
     153             : static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
     154             :                                  int iomgr_success);
     155             : 
     156             : static void subchannel_ref_locked(grpc_subchannel *c
     157             :                                       GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
     158             : static int subchannel_unref_locked(
     159             :     grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
     160             : static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
     161             : static grpc_subchannel *connection_unref_locked(
     162             :     grpc_exec_ctx *exec_ctx,
     163             :     connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
     164             : static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
     165             : 
     166             : #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
     167             : #define SUBCHANNEL_REF_LOCKED(p, r) \
     168             :   subchannel_ref_locked((p), __FILE__, __LINE__, (r))
     169             : #define SUBCHANNEL_UNREF_LOCKED(p, r) \
     170             :   subchannel_unref_locked((p), __FILE__, __LINE__, (r))
     171             : #define CONNECTION_REF_LOCKED(p, r) \
     172             :   connection_ref_locked((p), __FILE__, __LINE__, (r))
     173             : #define CONNECTION_UNREF_LOCKED(cl, p, r) \
     174             :   connection_unref_locked((cl), (p), __FILE__, __LINE__, (r))
     175             : #define REF_PASS_ARGS , file, line, reason
     176             : #define REF_LOG(name, p)                                                  \
     177             :   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p   ref %d -> %d %s", \
     178             :           (name), (p), (p)->refs, (p)->refs + 1, reason)
     179             : #define UNREF_LOG(name, p)                                                \
     180             :   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
     181             :           (name), (p), (p)->refs, (p)->refs - 1, reason)
     182             : #else
     183             : #define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
     184             : #define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
     185             : #define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
     186             : #define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p))
     187             : #define REF_PASS_ARGS
     188             : #define REF_LOG(name, p) \
     189             :   do {                   \
     190             :   } while (0)
     191             : #define UNREF_LOG(name, p) \
     192             :   do {                     \
     193             :   } while (0)
     194             : #endif
     195             : 
     196             : /*
     197             :  * connection implementation
     198             :  */
     199             : 
     200        1548 : static void connection_destroy(grpc_exec_ctx *exec_ctx, connection *c) {
     201        1548 :   GPR_ASSERT(c->refs == 0);
     202        1548 :   grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
     203        1548 :   gpr_free(c);
     204        1548 : }
     205             : 
     206     1403492 : static void connection_ref_locked(connection *c
     207             :                                       GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     208             :   REF_LOG("CONNECTION", c);
     209     1403492 :   subchannel_ref_locked(c->subchannel REF_PASS_ARGS);
     210     1403490 :   ++c->refs;
     211     1403490 : }
     212             : 
     213     1402975 : static grpc_subchannel *connection_unref_locked(
     214             :     grpc_exec_ctx *exec_ctx, connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     215     1402975 :   grpc_subchannel *destroy = NULL;
     216             :   UNREF_LOG("CONNECTION", c);
     217     1402975 :   if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
     218          45 :     destroy = c->subchannel;
     219             :   }
     220     1403506 :   if (--c->refs == 0 && c->subchannel->active != c) {
     221         292 :     connection_destroy(exec_ctx, c);
     222             :   }
     223     1403134 :   return destroy;
     224             : }
     225             : 
     226             : /*
     227             :  * grpc_subchannel implementation
     228             :  */
     229             : 
     230     1409725 : static void subchannel_ref_locked(grpc_subchannel *c
     231             :                                       GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     232             :   REF_LOG("SUBCHANNEL", c);
     233     1409725 :   ++c->refs;
     234     1409725 : }
     235             : 
     236     1410935 : static int subchannel_unref_locked(grpc_subchannel *c
     237             :                                        GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     238             :   UNREF_LOG("SUBCHANNEL", c);
     239     1410935 :   return --c->refs == 0;
     240             : }
     241             : 
     242        2755 : void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     243        2755 :   gpr_mu_lock(&c->mu);
     244        2755 :   subchannel_ref_locked(c REF_PASS_ARGS);
     245        2755 :   gpr_mu_unlock(&c->mu);
     246        2755 : }
     247             : 
     248        4611 : void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
     249             :                            grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     250             :   int destroy;
     251        4611 :   gpr_mu_lock(&c->mu);
     252        4611 :   destroy = subchannel_unref_locked(c REF_PASS_ARGS);
     253        4611 :   gpr_mu_unlock(&c->mu);
     254        4611 :   if (destroy) subchannel_destroy(exec_ctx, c);
     255        4611 : }
     256             : 
     257        1470 : static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
     258        1470 :   if (c->active != NULL) {
     259           0 :     connection_destroy(exec_ctx, c->active);
     260             :   }
     261        1470 :   gpr_free((void *)c->filters);
     262        1470 :   grpc_channel_args_destroy(c->args);
     263        1470 :   gpr_free(c->addr);
     264        1470 :   grpc_mdctx_unref(c->mdctx);
     265        1470 :   grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
     266        1470 :   grpc_connector_unref(exec_ctx, c->connector);
     267        1470 :   gpr_free(c);
     268        1470 : }
     269             : 
     270       10976 : void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
     271             :                                           grpc_subchannel *c,
     272             :                                           grpc_pollset *pollset) {
     273       10976 :   grpc_pollset_set_add_pollset(exec_ctx, c->pollset_set, pollset);
     274       10976 : }
     275             : 
     276       10648 : void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
     277             :                                           grpc_subchannel *c,
     278             :                                           grpc_pollset *pollset) {
     279       10648 :   grpc_pollset_set_del_pollset(exec_ctx, c->pollset_set, pollset);
     280       10920 : }
     281             : 
     282        1470 : static gpr_uint32 random_seed() {
     283        1470 :   return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC)));
     284             : }
     285             : 
     286        1470 : grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
     287             :                                         grpc_subchannel_args *args) {
     288        1470 :   grpc_subchannel *c = gpr_malloc(sizeof(*c));
     289        1470 :   grpc_channel_element *parent_elem = grpc_channel_stack_last_element(
     290             :       grpc_channel_get_channel_stack(args->master));
     291        1470 :   memset(c, 0, sizeof(*c));
     292        1470 :   c->refs = 1;
     293        1470 :   c->connector = connector;
     294        1470 :   grpc_connector_ref(c->connector);
     295        1470 :   c->num_filters = args->filter_count;
     296        1470 :   c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
     297        1470 :   memcpy((void *)c->filters, args->filters,
     298        1470 :          sizeof(grpc_channel_filter *) * c->num_filters);
     299        1470 :   c->addr = gpr_malloc(args->addr_len);
     300        1470 :   memcpy(c->addr, args->addr, args->addr_len);
     301        1470 :   c->addr_len = args->addr_len;
     302        1470 :   c->args = grpc_channel_args_copy(args->args);
     303        1470 :   c->mdctx = args->mdctx;
     304        1470 :   c->master = args->master;
     305        1470 :   c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
     306        1470 :   c->random = random_seed();
     307        1470 :   grpc_mdctx_ref(c->mdctx);
     308        1470 :   grpc_closure_init(&c->connected, subchannel_connected, c);
     309        1470 :   grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
     310             :                                "subchannel");
     311        1470 :   gpr_mu_init(&c->mu);
     312        1470 :   return c;
     313             : }
     314             : 
     315        1921 : static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
     316             :   grpc_connect_in_args args;
     317             : 
     318        1921 :   args.interested_parties = c->pollset_set;
     319        1921 :   args.addr = c->addr;
     320        1921 :   args.addr_len = c->addr_len;
     321        1921 :   args.deadline = compute_connect_deadline(c);
     322        1921 :   args.channel_args = c->args;
     323             : 
     324        1921 :   grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
     325             :                          &c->connected);
     326        1921 : }
     327             : 
     328        1800 : static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
     329        1800 :   c->backoff_delta = gpr_time_from_seconds(
     330             :       GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
     331        1800 :   c->next_attempt =
     332        1800 :       gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
     333        1800 :   continue_connect(exec_ctx, c);
     334        1800 : }
     335             : 
     336         134 : static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
     337             :                                    int iomgr_success) {
     338         134 :   waiting_for_connect *w4c = arg;
     339         134 :   grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
     340         134 :   grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset,
     341             :                               w4c->target, w4c->notify);
     342         134 :   GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
     343         134 :   gpr_free(w4c);
     344         134 : }
     345             : 
     346     1402184 : void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
     347             :                                  grpc_pollset *pollset,
     348             :                                  grpc_subchannel_call **target,
     349             :                                  grpc_closure *notify) {
     350             :   connection *con;
     351     1402184 :   gpr_mu_lock(&c->mu);
     352     1402479 :   if (c->active != NULL) {
     353     1402345 :     con = c->active;
     354     1402345 :     CONNECTION_REF_LOCKED(con, "call");
     355     1402343 :     gpr_mu_unlock(&c->mu);
     356             : 
     357     1402336 :     *target = create_call(exec_ctx, con);
     358     1402338 :     notify->cb(exec_ctx, notify->cb_arg, 1);
     359             :   } else {
     360         134 :     waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
     361         134 :     w4c->next = c->waiting;
     362         134 :     w4c->notify = notify;
     363         134 :     w4c->pollset = pollset;
     364         134 :     w4c->target = target;
     365         134 :     w4c->subchannel = c;
     366             :     /* released when clearing w4c */
     367         134 :     SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
     368         134 :     grpc_closure_init(&w4c->continuation, continue_creating_call, w4c);
     369         134 :     c->waiting = w4c;
     370         134 :     grpc_subchannel_add_interested_party(exec_ctx, c, pollset);
     371         134 :     if (!c->connecting) {
     372           0 :       c->connecting = 1;
     373           0 :       connectivity_state_changed_locked(exec_ctx, c, "create_call");
     374             :       /* released by connection */
     375           0 :       SUBCHANNEL_REF_LOCKED(c, "connecting");
     376           0 :       GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
     377           0 :       gpr_mu_unlock(&c->mu);
     378             : 
     379           0 :       start_connect(exec_ctx, c);
     380             :     } else {
     381         134 :       gpr_mu_unlock(&c->mu);
     382             :     }
     383             :   }
     384     1402444 : }
     385             : 
     386          70 : grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
     387             :   grpc_connectivity_state state;
     388          70 :   gpr_mu_lock(&c->mu);
     389          70 :   state = grpc_connectivity_state_check(&c->state_tracker);
     390          70 :   gpr_mu_unlock(&c->mu);
     391          70 :   return state;
     392             : }
     393             : 
     394        5828 : void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
     395             :                                             grpc_subchannel *c,
     396             :                                             grpc_connectivity_state *state,
     397             :                                             grpc_closure *notify) {
     398        5828 :   int do_connect = 0;
     399        5828 :   gpr_mu_lock(&c->mu);
     400        5828 :   if (grpc_connectivity_state_notify_on_state_change(
     401             :           exec_ctx, &c->state_tracker, state, notify)) {
     402        1800 :     do_connect = 1;
     403        1800 :     c->connecting = 1;
     404             :     /* released by connection */
     405        1800 :     SUBCHANNEL_REF_LOCKED(c, "connecting");
     406        1800 :     GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
     407        1800 :     connectivity_state_changed_locked(exec_ctx, c, "state_change");
     408             :   }
     409        5828 :   gpr_mu_unlock(&c->mu);
     410             : 
     411        5828 :   if (do_connect) {
     412        1800 :     start_connect(exec_ctx, c);
     413             :   }
     414        5828 : }
     415             : 
     416           0 : int grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
     417             :                                              grpc_subchannel *c,
     418             :                                              grpc_closure *subscribed_notify) {
     419             :   int success;
     420           0 :   gpr_mu_lock(&c->mu);
     421           0 :   success = grpc_connectivity_state_change_unsubscribe(
     422             :       exec_ctx, &c->state_tracker, subscribed_notify);
     423           0 :   gpr_mu_unlock(&c->mu);
     424           0 :   return success;
     425             : }
     426             : 
     427        1411 : void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
     428             :                                           grpc_subchannel *c,
     429             :                                           grpc_transport_op *op) {
     430        1411 :   connection *con = NULL;
     431             :   grpc_subchannel *destroy;
     432        1411 :   int cancel_alarm = 0;
     433        1411 :   gpr_mu_lock(&c->mu);
     434        1411 :   if (c->active != NULL) {
     435        1147 :     con = c->active;
     436        1147 :     CONNECTION_REF_LOCKED(con, "transport-op");
     437             :   }
     438        1411 :   if (op->disconnect) {
     439        1411 :     c->disconnected = 1;
     440        1411 :     connectivity_state_changed_locked(exec_ctx, c, "disconnect");
     441        1411 :     if (c->have_alarm) {
     442          40 :       cancel_alarm = 1;
     443             :     }
     444             :   }
     445        1411 :   gpr_mu_unlock(&c->mu);
     446             : 
     447        1411 :   if (con != NULL) {
     448        1147 :     grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
     449        1147 :     grpc_channel_element *top_elem =
     450             :         grpc_channel_stack_element(channel_stack, 0);
     451        1147 :     top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
     452             : 
     453        1147 :     gpr_mu_lock(&c->mu);
     454        1147 :     destroy = CONNECTION_UNREF_LOCKED(exec_ctx, con, "transport-op");
     455        1147 :     gpr_mu_unlock(&c->mu);
     456        1147 :     if (destroy) {
     457           0 :       subchannel_destroy(exec_ctx, destroy);
     458             :     }
     459             :   }
     460             : 
     461        1411 :   if (cancel_alarm) {
     462          40 :     grpc_alarm_cancel(exec_ctx, &c->alarm);
     463             :   }
     464             : 
     465        1411 :   if (op->disconnect) {
     466        1411 :     grpc_connector_shutdown(exec_ctx, c->connector);
     467             :   }
     468        1411 : }
     469             : 
     470        1548 : static void on_state_changed(grpc_exec_ctx *exec_ctx, void *p,
     471             :                              int iomgr_success) {
     472        1548 :   state_watcher *sw = p;
     473        1548 :   grpc_subchannel *c = sw->subchannel;
     474        1548 :   gpr_mu *mu = &c->mu;
     475             :   int destroy;
     476             :   grpc_transport_op op;
     477             :   grpc_channel_element *elem;
     478        1548 :   connection *destroy_connection = NULL;
     479             : 
     480        1548 :   gpr_mu_lock(mu);
     481             : 
     482             :   /* if we failed or there is a version number mismatch, just leave
     483             :      this closure */
     484        1548 :   if (!iomgr_success || sw->subchannel->active_version != sw->version) {
     485             :     goto done;
     486             :   }
     487             : 
     488        1548 :   switch (sw->connectivity_state) {
     489             :     case GRPC_CHANNEL_CONNECTING:
     490             :     case GRPC_CHANNEL_READY:
     491             :     case GRPC_CHANNEL_IDLE:
     492             :       /* all is still good: keep watching */
     493           0 :       memset(&op, 0, sizeof(op));
     494           0 :       op.connectivity_state = &sw->connectivity_state;
     495           0 :       op.on_connectivity_state_change = &sw->closure;
     496           0 :       elem = grpc_channel_stack_element(
     497           0 :           CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
     498           0 :       elem->filter->start_transport_op(exec_ctx, elem, &op);
     499             :       /* early out */
     500           0 :       gpr_mu_unlock(mu);
     501        1548 :       return;
     502             :     case GRPC_CHANNEL_FATAL_FAILURE:
     503             :     case GRPC_CHANNEL_TRANSIENT_FAILURE:
     504             :       /* things have gone wrong, deactivate and enter idle */
     505        1548 :       if (sw->subchannel->active->refs == 0) {
     506        1256 :         destroy_connection = sw->subchannel->active;
     507             :       }
     508        1548 :       sw->subchannel->active = NULL;
     509        1548 :       grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
     510        1548 :                                   c->disconnected
     511             :                                       ? GRPC_CHANNEL_FATAL_FAILURE
     512             :                                       : GRPC_CHANNEL_TRANSIENT_FAILURE,
     513             :                                   "connection_failed");
     514        1548 :       break;
     515             :   }
     516             : 
     517             : done:
     518        1548 :   connectivity_state_changed_locked(exec_ctx, c, "transport_state_changed");
     519        1548 :   destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
     520        1548 :   gpr_free(sw);
     521        1548 :   gpr_mu_unlock(mu);
     522        1548 :   if (destroy) {
     523        1099 :     subchannel_destroy(exec_ctx, c);
     524             :   }
     525        1548 :   if (destroy_connection != NULL) {
     526        1256 :     connection_destroy(exec_ctx, destroy_connection);
     527             :   }
     528             : }
     529             : 
     530        1547 : static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
     531             :   size_t channel_stack_size;
     532             :   connection *con;
     533             :   grpc_channel_stack *stk;
     534             :   size_t num_filters;
     535             :   const grpc_channel_filter **filters;
     536             :   waiting_for_connect *w4c;
     537             :   grpc_transport_op op;
     538             :   state_watcher *sw;
     539        1547 :   connection *destroy_connection = NULL;
     540             :   grpc_channel_element *elem;
     541             : 
     542             :   /* build final filter list */
     543        1547 :   num_filters = c->num_filters + c->connecting_result.num_filters + 1;
     544        1547 :   filters = gpr_malloc(sizeof(*filters) * num_filters);
     545        1547 :   memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
     546        1547 :   memcpy((void *)(filters + c->num_filters), c->connecting_result.filters,
     547        1547 :          sizeof(*filters) * c->connecting_result.num_filters);
     548        1547 :   filters[num_filters - 1] = &grpc_connected_channel_filter;
     549             : 
     550             :   /* construct channel stack */
     551        1547 :   channel_stack_size = grpc_channel_stack_size(filters, num_filters);
     552        1546 :   con = gpr_malloc(sizeof(connection) + channel_stack_size);
     553        1547 :   stk = (grpc_channel_stack *)(con + 1);
     554        1547 :   con->refs = 0;
     555        1547 :   con->subchannel = c;
     556        1547 :   grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args,
     557             :                           c->mdctx, stk);
     558        1546 :   grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
     559        1547 :   gpr_free((void *)c->connecting_result.filters);
     560        1548 :   memset(&c->connecting_result, 0, sizeof(c->connecting_result));
     561             : 
     562             :   /* initialize state watcher */
     563        1548 :   sw = gpr_malloc(sizeof(*sw));
     564        1547 :   grpc_closure_init(&sw->closure, on_state_changed, sw);
     565        1547 :   sw->subchannel = c;
     566        1547 :   sw->connectivity_state = GRPC_CHANNEL_READY;
     567             : 
     568        1547 :   gpr_mu_lock(&c->mu);
     569             : 
     570        1548 :   if (c->disconnected) {
     571           0 :     gpr_mu_unlock(&c->mu);
     572           0 :     gpr_free(sw);
     573           0 :     gpr_free((void *)filters);
     574           0 :     grpc_channel_stack_destroy(exec_ctx, stk);
     575           0 :     GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
     576           0 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
     577        1548 :     return;
     578             :   }
     579             : 
     580             :   /* publish */
     581        1548 :   if (c->active != NULL && c->active->refs == 0) {
     582           0 :     destroy_connection = c->active;
     583             :   }
     584        1548 :   c->active = con;
     585        1548 :   c->active_version++;
     586        1548 :   sw->version = c->active_version;
     587        1548 :   c->connecting = 0;
     588             : 
     589             :   /* watch for changes; subchannel ref for connecting is donated
     590             :      to the state watcher */
     591        1548 :   memset(&op, 0, sizeof(op));
     592        1548 :   op.connectivity_state = &sw->connectivity_state;
     593        1548 :   op.on_connectivity_state_change = &sw->closure;
     594        1548 :   op.bind_pollset_set = c->pollset_set;
     595        1548 :   SUBCHANNEL_REF_LOCKED(c, "state_watcher");
     596        1548 :   GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
     597        1548 :   GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
     598        1548 :   elem =
     599        1548 :       grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
     600        1548 :   elem->filter->start_transport_op(exec_ctx, elem, &op);
     601             : 
     602             :   /* signal completion */
     603        1548 :   connectivity_state_changed_locked(exec_ctx, c, "connected");
     604        1548 :   w4c = c->waiting;
     605        1548 :   c->waiting = NULL;
     606             : 
     607        1548 :   gpr_mu_unlock(&c->mu);
     608             : 
     609        3230 :   while (w4c != NULL) {
     610         134 :     waiting_for_connect *next = w4c->next;
     611         134 :     grpc_exec_ctx_enqueue(exec_ctx, &w4c->continuation, 1);
     612         134 :     w4c = next;
     613             :   }
     614             : 
     615        1548 :   gpr_free((void *)filters);
     616             : 
     617        1548 :   if (destroy_connection != NULL) {
     618           0 :     connection_destroy(exec_ctx, destroy_connection);
     619             :   }
     620             : }
     621             : 
     622             : /* Generate a random number between 0 and 1. */
     623         121 : static double generate_uniform_random_number(grpc_subchannel *c) {
     624         121 :   c->random = (1103515245 * c->random + 12345) % ((gpr_uint32)1 << 31);
     625         121 :   return c->random / (double)((gpr_uint32)1 << 31);
     626             : }
     627             : 
     628             : /* Update backoff_delta and next_attempt in subchannel */
     629         121 : static void update_reconnect_parameters(grpc_subchannel *c) {
     630             :   gpr_int32 backoff_delta_millis, jitter;
     631         121 :   gpr_int32 max_backoff_millis =
     632             :       GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
     633             :   double jitter_range;
     634         121 :   backoff_delta_millis =
     635         121 :       (gpr_int32)(gpr_time_to_millis(c->backoff_delta) *
     636             :                   GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER);
     637         121 :   if (backoff_delta_millis > max_backoff_millis) {
     638           0 :     backoff_delta_millis = max_backoff_millis;
     639             :   }
     640         121 :   c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN);
     641         121 :   c->next_attempt =
     642         121 :       gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
     643             : 
     644         121 :   jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis;
     645         121 :   jitter =
     646         121 :       (gpr_int32)((2 * generate_uniform_random_number(c) - 1) * jitter_range);
     647         121 :   c->next_attempt =
     648         121 :       gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
     649         121 : }
     650             : 
     651         373 : static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
     652         373 :   grpc_subchannel *c = arg;
     653         373 :   gpr_mu_lock(&c->mu);
     654         373 :   c->have_alarm = 0;
     655         373 :   if (c->disconnected) {
     656         252 :     iomgr_success = 0;
     657             :   }
     658         373 :   connectivity_state_changed_locked(exec_ctx, c, "alarm");
     659         373 :   if (iomgr_success) {
     660         121 :     gpr_mu_unlock(&c->mu);
     661         121 :     update_reconnect_parameters(c);
     662         121 :     continue_connect(exec_ctx, c);
     663             :   } else {
     664             :     waiting_for_connect *w4c;
     665         252 :     w4c = c->waiting;
     666         252 :     c->waiting = NULL;
     667         252 :     gpr_mu_unlock(&c->mu);
     668         504 :     while (w4c != NULL) {
     669           0 :       waiting_for_connect *next = w4c->next;
     670           0 :       grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
     671             :                                            w4c->pollset);
     672           0 :       w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, 0);
     673           0 :       GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
     674           0 :       gpr_free(w4c);
     675           0 :       w4c = next;
     676             :     }
     677         252 :     GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
     678         252 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
     679             :   }
     680         373 : }
     681             : 
     682        1920 : static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
     683             :                                  int iomgr_success) {
     684        1920 :   grpc_subchannel *c = arg;
     685        1920 :   if (c->connecting_result.transport != NULL) {
     686        1547 :     publish_transport(exec_ctx, c);
     687             :   } else {
     688         373 :     gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
     689         373 :     gpr_mu_lock(&c->mu);
     690         373 :     GPR_ASSERT(!c->have_alarm);
     691         373 :     c->have_alarm = 1;
     692         373 :     connectivity_state_changed_locked(exec_ctx, c, "connect_failed");
     693         373 :     grpc_alarm_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
     694         373 :     gpr_mu_unlock(&c->mu);
     695             :   }
     696        1921 : }
     697             : 
     698        1921 : static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
     699        1921 :   gpr_timespec current_deadline =
     700             :       gpr_time_add(c->next_attempt, c->backoff_delta);
     701        1921 :   gpr_timespec min_deadline = gpr_time_add(
     702             :       gpr_now(GPR_CLOCK_MONOTONIC),
     703             :       gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS,
     704             :                             GPR_TIMESPAN));
     705        1921 :   return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline
     706             :                                                           : min_deadline;
     707             : }
     708             : 
     709        7053 : static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
     710        7053 :   if (c->disconnected) {
     711        3022 :     return GRPC_CHANNEL_FATAL_FAILURE;
     712             :   }
     713        4031 :   if (c->connecting) {
     714        2082 :     if (c->have_alarm) {
     715         161 :       return GRPC_CHANNEL_TRANSIENT_FAILURE;
     716             :     }
     717        1921 :     return GRPC_CHANNEL_CONNECTING;
     718             :   }
     719        1949 :   if (c->active) {
     720        1548 :     return GRPC_CHANNEL_READY;
     721             :   }
     722         401 :   return GRPC_CHANNEL_IDLE;
     723             : }
     724             : 
     725        7053 : static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
     726             :                                               grpc_subchannel *c,
     727             :                                               const char *reason) {
     728        7053 :   grpc_connectivity_state current = compute_connectivity_locked(c);
     729        7053 :   grpc_connectivity_state_set(exec_ctx, &c->state_tracker, current, reason);
     730        7053 : }
     731             : 
     732             : /*
     733             :  * grpc_subchannel_call implementation
     734             :  */
     735             : 
     736         390 : void grpc_subchannel_call_ref(grpc_subchannel_call *c
     737             :                                   GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     738         390 :   gpr_ref(&c->refs);
     739         390 : }
     740             : 
     741     1402711 : void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
     742             :                                 grpc_subchannel_call *c
     743             :                                     GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
     744     1402711 :   if (gpr_unref(&c->refs)) {
     745     1402342 :     gpr_mu *mu = &c->connection->subchannel->mu;
     746             :     grpc_subchannel *destroy;
     747     1402342 :     grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
     748     1402352 :     gpr_mu_lock(mu);
     749     1402358 :     destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call");
     750     1402356 :     gpr_mu_unlock(mu);
     751     1402353 :     gpr_free(c);
     752     1402309 :     if (destroy != NULL) {
     753          45 :       subchannel_destroy(exec_ctx, destroy);
     754             :     }
     755             :   }
     756     1402699 : }
     757             : 
     758         390 : char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
     759             :                                     grpc_subchannel_call *call) {
     760         390 :   grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
     761         390 :   grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
     762         390 :   return top_elem->filter->get_peer(exec_ctx, top_elem);
     763             : }
     764             : 
     765     2845904 : void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
     766             :                                      grpc_subchannel_call *call,
     767             :                                      grpc_transport_stream_op *op) {
     768     2845904 :   grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
     769     2845904 :   grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
     770     2845601 :   top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
     771     2846988 : }
     772             : 
     773     1401490 : static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
     774             :                                          connection *con) {
     775     1401490 :   grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
     776     1401490 :   grpc_subchannel_call *call =
     777     1401490 :       gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
     778     1402194 :   grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
     779     1402194 :   call->connection = con;
     780     1402194 :   gpr_ref_init(&call->refs, 1);
     781     1402218 :   grpc_call_stack_init(exec_ctx, chanstk, NULL, NULL, callstk);
     782     1402344 :   return call;
     783             : }

Generated by: LCOV version 1.10