LCOV - code coverage report
Current view: top level - src/core/channel - client_channel.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 358 393 91.1 %
Date: 2015-10-10 Functions: 25 25 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/channel/client_channel.h"
      35             : 
      36             : #include <stdio.h>
      37             : #include <string.h>
      38             : 
      39             : #include "src/core/channel/channel_args.h"
      40             : #include "src/core/channel/connected_channel.h"
      41             : #include "src/core/surface/channel.h"
      42             : #include "src/core/iomgr/iomgr.h"
      43             : #include "src/core/support/string.h"
      44             : #include "src/core/transport/connectivity_state.h"
      45             : #include <grpc/support/alloc.h>
      46             : #include <grpc/support/log.h>
      47             : #include <grpc/support/sync.h>
      48             : #include <grpc/support/useful.h>
      49             : 
      50             : /* Client channel implementation */
      51             : 
      52             : typedef struct call_data call_data;
      53             : 
      54             : typedef struct client_channel_channel_data {
      55             :   /** metadata context for this channel */
      56             :   grpc_mdctx *mdctx;
      57             :   /** resolver for this channel */
      58             :   grpc_resolver *resolver;
      59             :   /** have we started resolving this channel */
      60             :   int started_resolving;
      61             :   /** master channel - the grpc_channel instance that ultimately owns
      62             :       this channel_data via its channel stack.
      63             :       We occasionally use this to bump the refcount on the master channel
      64             :       to keep ourselves alive through an asynchronous operation. */
      65             :   grpc_channel *master;
      66             : 
      67             :   /** mutex protecting client configuration, including all
      68             :       variables below in this data structure */
      69             :   gpr_mu mu_config;
      70             :   /** currently active load balancer - guarded by mu_config */
      71             :   grpc_lb_policy *lb_policy;
      72             :   /** incoming configuration - set by resolver.next
      73             :       guarded by mu_config */
      74             :   grpc_client_config *incoming_configuration;
      75             :   /** a list of closures that are all waiting for config to come in */
      76             :   grpc_closure_list waiting_for_config_closures;
      77             :   /** resolver callback */
      78             :   grpc_closure on_config_changed;
      79             :   /** connectivity state being tracked */
      80             :   grpc_connectivity_state_tracker state_tracker;
      81             :   /** when an lb_policy arrives, should we try to exit idle */
      82             :   int exit_idle_when_lb_policy_arrives;
      83             :   /** pollset_set of interested parties in a new connection */
      84             :   grpc_pollset_set pollset_set;
      85             : } channel_data;
      86             : 
      87             : /** We create one watcher for each new lb_policy that is returned from a
      88             :    resolver,
      89             :     to watch for state changes from the lb_policy. When a state change is seen,
      90             :    we
      91             :     update the channel, and create a new watcher */
      92             : typedef struct {
      93             :   channel_data *chand;
      94             :   grpc_closure on_changed;
      95             :   grpc_connectivity_state state;
      96             :   grpc_lb_policy *lb_policy;
      97             : } lb_policy_connectivity_watcher;
      98             : 
      99             : typedef enum {
     100             :   CALL_CREATED,
     101             :   CALL_WAITING_FOR_SEND,
     102             :   CALL_WAITING_FOR_CONFIG,
     103             :   CALL_WAITING_FOR_PICK,
     104             :   CALL_WAITING_FOR_CALL,
     105             :   CALL_ACTIVE,
     106             :   CALL_CANCELLED
     107             : } call_state;
     108             : 
     109             : struct call_data {
     110             :   /* owning element */
     111             :   grpc_call_element *elem;
     112             : 
     113             :   gpr_mu mu_state;
     114             : 
     115             :   call_state state;
     116             :   gpr_timespec deadline;
     117             :   grpc_subchannel *picked_channel;
     118             :   grpc_closure async_setup_task;
     119             :   grpc_transport_stream_op waiting_op;
     120             :   /* our child call stack */
     121             :   grpc_subchannel_call *subchannel_call;
     122             :   grpc_linked_mdelem status;
     123             :   grpc_linked_mdelem details;
     124             : };
     125             : 
     126             : static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
     127             :                                            grpc_transport_stream_op *new_op)
     128             :     GRPC_MUST_USE_RESULT;
     129             : 
     130         667 : static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx,
     131             :                                          grpc_call_element *elem,
     132             :                                          grpc_transport_stream_op *op) {
     133         667 :   call_data *calld = elem->call_data;
     134         667 :   channel_data *chand = elem->channel_data;
     135         667 :   if (op->send_ops) {
     136         187 :     grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
     137         187 :     op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
     138             :   }
     139         667 :   if (op->recv_ops) {
     140             :     char status[GPR_LTOA_MIN_BUFSIZE];
     141             :     grpc_metadata_batch mdb;
     142         330 :     gpr_ltoa(GRPC_STATUS_CANCELLED, status);
     143         330 :     calld->status.md =
     144         330 :         grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
     145         330 :     calld->details.md =
     146         330 :         grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
     147         330 :     calld->status.prev = calld->details.next = NULL;
     148         330 :     calld->status.next = &calld->details;
     149         330 :     calld->details.prev = &calld->status;
     150         330 :     mdb.list.head = &calld->status;
     151         330 :     mdb.list.tail = &calld->details;
     152         330 :     mdb.garbage.head = mdb.garbage.tail = NULL;
     153         330 :     mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
     154         330 :     grpc_sopb_add_metadata(op->recv_ops, mdb);
     155         330 :     *op->recv_state = GRPC_STREAM_CLOSED;
     156         330 :     op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
     157             :   }
     158         667 :   if (op->on_consumed) {
     159         446 :     op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
     160             :   }
     161         667 : }
     162             : 
     163             : typedef struct {
     164             :   grpc_closure closure;
     165             :   grpc_call_element *elem;
     166             : } waiting_call;
     167             : 
     168             : static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
     169             :                                         grpc_call_element *elem,
     170             :                                         grpc_transport_stream_op *op,
     171             :                                         int continuation);
     172             : 
     173        2752 : static void continue_with_pick(grpc_exec_ctx *exec_ctx, void *arg,
     174             :                                int iomgr_success) {
     175        2752 :   waiting_call *wc = arg;
     176        2752 :   call_data *calld = wc->elem->call_data;
     177        2752 :   perform_transport_stream_op(exec_ctx, wc->elem, &calld->waiting_op, 1);
     178        2752 :   gpr_free(wc);
     179        2752 : }
     180             : 
     181        2752 : static void add_to_lb_policy_wait_queue_locked_state_config(
     182             :     grpc_call_element *elem) {
     183        2752 :   channel_data *chand = elem->channel_data;
     184        2752 :   waiting_call *wc = gpr_malloc(sizeof(*wc));
     185        2752 :   grpc_closure_init(&wc->closure, continue_with_pick, wc);
     186        2752 :   wc->elem = elem;
     187        2752 :   grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1);
     188        2752 : }
     189             : 
     190     1403310 : static int is_empty(void *p, int len) {
     191     1403310 :   char *ptr = p;
     192             :   int i;
     193     1445574 :   for (i = 0; i < len; i++) {
     194     1445574 :     if (ptr[i] != 0) return 0;
     195             :   }
     196           0 :   return 1;
     197             : }
     198             : 
     199     1402307 : static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
     200             :                          int iomgr_success) {
     201     1402307 :   call_data *calld = arg;
     202             :   grpc_transport_stream_op op;
     203             :   int have_waiting;
     204             : 
     205     1402307 :   gpr_mu_lock(&calld->mu_state);
     206     1402260 :   if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
     207           0 :     memset(&op, 0, sizeof(op));
     208           0 :     op.cancel_with_status = GRPC_STATUS_CANCELLED;
     209           0 :     gpr_mu_unlock(&calld->mu_state);
     210           0 :     grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op);
     211     1402260 :   } else if (calld->state == CALL_WAITING_FOR_CALL) {
     212     1402260 :     have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
     213     1402326 :     if (calld->subchannel_call != NULL) {
     214     1402326 :       calld->state = CALL_ACTIVE;
     215     1402326 :       gpr_mu_unlock(&calld->mu_state);
     216     1402345 :       if (have_waiting) {
     217     1402346 :         grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
     218             :                                         &calld->waiting_op);
     219             :       }
     220             :     } else {
     221           0 :       calld->state = CALL_CANCELLED;
     222           0 :       gpr_mu_unlock(&calld->mu_state);
     223           0 :       if (have_waiting) {
     224           0 :         handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
     225             :       }
     226             :     }
     227             :   } else {
     228           0 :     GPR_ASSERT(calld->state == CALL_CANCELLED);
     229           0 :     gpr_mu_unlock(&calld->mu_state);
     230             :   }
     231     1402317 : }
     232             : 
     233     1401913 : static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
     234             :                           int iomgr_success) {
     235     1401913 :   call_data *calld = arg;
     236             :   grpc_pollset *pollset;
     237             : 
     238     1401913 :   if (calld->picked_channel == NULL) {
     239             :     /* treat this like a cancellation */
     240          34 :     calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
     241          34 :     perform_transport_stream_op(exec_ctx, calld->elem, &calld->waiting_op, 1);
     242             :   } else {
     243     1401879 :     gpr_mu_lock(&calld->mu_state);
     244     1402284 :     if (calld->state == CALL_CANCELLED) {
     245           1 :       gpr_mu_unlock(&calld->mu_state);
     246           1 :       handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
     247             :     } else {
     248     1402283 :       GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
     249     1402283 :       calld->state = CALL_WAITING_FOR_CALL;
     250     1402283 :       pollset = calld->waiting_op.bind_pollset;
     251     1402283 :       gpr_mu_unlock(&calld->mu_state);
     252     1402338 :       grpc_closure_init(&calld->async_setup_task, started_call, calld);
     253     1402336 :       grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset,
     254             :                                   &calld->subchannel_call,
     255             :                                   &calld->async_setup_task);
     256             :     }
     257             :   }
     258     1402357 : }
     259             : 
     260        9617 : static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
     261             :                                            grpc_transport_stream_op *new_op) {
     262        9617 :   call_data *calld = elem->call_data;
     263        9617 :   grpc_closure *consumed_op = NULL;
     264        9617 :   grpc_transport_stream_op *waiting_op = &calld->waiting_op;
     265        9617 :   GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
     266        9617 :   GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
     267        9617 :   if (new_op->send_ops != NULL) {
     268         286 :     waiting_op->send_ops = new_op->send_ops;
     269         286 :     waiting_op->is_last_send = new_op->is_last_send;
     270         286 :     waiting_op->on_done_send = new_op->on_done_send;
     271             :   }
     272        9617 :   if (new_op->recv_ops != NULL) {
     273        9222 :     waiting_op->recv_ops = new_op->recv_ops;
     274        9222 :     waiting_op->recv_state = new_op->recv_state;
     275        9222 :     waiting_op->on_done_recv = new_op->on_done_recv;
     276             :   }
     277        9617 :   if (new_op->on_consumed != NULL) {
     278         109 :     if (waiting_op->on_consumed != NULL) {
     279         109 :       consumed_op = waiting_op->on_consumed;
     280             :     }
     281         109 :     waiting_op->on_consumed = new_op->on_consumed;
     282             :   }
     283        9617 :   if (new_op->cancel_with_status != GRPC_STATUS_OK) {
     284         109 :     waiting_op->cancel_with_status = new_op->cancel_with_status;
     285             :   }
     286        9617 :   return consumed_op;
     287             : }
     288             : 
     289         730 : static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
     290         730 :   call_data *calld = elem->call_data;
     291         730 :   channel_data *chand = elem->channel_data;
     292             :   grpc_subchannel_call *subchannel_call;
     293             :   char *result;
     294             : 
     295         730 :   gpr_mu_lock(&calld->mu_state);
     296         730 :   if (calld->state == CALL_ACTIVE) {
     297         390 :     subchannel_call = calld->subchannel_call;
     298         390 :     GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
     299         390 :     gpr_mu_unlock(&calld->mu_state);
     300         390 :     result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
     301         390 :     GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer");
     302         390 :     return result;
     303             :   } else {
     304         340 :     gpr_mu_unlock(&calld->mu_state);
     305         340 :     return grpc_channel_get_target(chand->master);
     306             :   }
     307             : }
     308             : 
     309     2857372 : static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
     310             :                                         grpc_call_element *elem,
     311             :                                         grpc_transport_stream_op *op,
     312             :                                         int continuation) {
     313     2857372 :   call_data *calld = elem->call_data;
     314     2857372 :   channel_data *chand = elem->channel_data;
     315             :   grpc_subchannel_call *subchannel_call;
     316             :   grpc_lb_policy *lb_policy;
     317             :   grpc_transport_stream_op op2;
     318     2857372 :   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
     319     2857372 :   GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
     320             : 
     321     2857372 :   gpr_mu_lock(&calld->mu_state);
     322     2859148 :   switch (calld->state) {
     323             :     case CALL_ACTIVE:
     324     1444838 :       GPR_ASSERT(!continuation);
     325     1444838 :       subchannel_call = calld->subchannel_call;
     326     1444838 :       gpr_mu_unlock(&calld->mu_state);
     327     1444832 :       grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op);
     328     1444825 :       break;
     329             :     case CALL_CANCELLED:
     330         237 :       gpr_mu_unlock(&calld->mu_state);
     331         237 :       handle_op_after_cancellation(exec_ctx, elem, op);
     332         237 :       break;
     333             :     case CALL_WAITING_FOR_SEND:
     334         395 :       GPR_ASSERT(!continuation);
     335         395 :       grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
     336         504 :       if (!calld->waiting_op.send_ops &&
     337         109 :           calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
     338           0 :         gpr_mu_unlock(&calld->mu_state);
     339           0 :         break;
     340             :       }
     341         395 :       *op = calld->waiting_op;
     342         395 :       memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
     343         395 :       continuation = 1;
     344             :     /* fall through */
     345             :     case CALL_WAITING_FOR_CONFIG:
     346             :     case CALL_WAITING_FOR_PICK:
     347             :     case CALL_WAITING_FOR_CALL:
     348       12404 :       if (!continuation) {
     349        9321 :         if (op->cancel_with_status != GRPC_STATUS_OK) {
     350          99 :           calld->state = CALL_CANCELLED;
     351          99 :           op2 = calld->waiting_op;
     352          99 :           memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
     353          99 :           if (op->on_consumed) {
     354          99 :             calld->waiting_op.on_consumed = op->on_consumed;
     355          99 :             op->on_consumed = NULL;
     356           0 :           } else if (op2.on_consumed) {
     357           0 :             calld->waiting_op.on_consumed = op2.on_consumed;
     358           0 :             op2.on_consumed = NULL;
     359             :           }
     360          99 :           gpr_mu_unlock(&calld->mu_state);
     361          99 :           handle_op_after_cancellation(exec_ctx, elem, op);
     362          99 :           handle_op_after_cancellation(exec_ctx, elem, &op2);
     363             :         } else {
     364        9222 :           grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
     365        9222 :           gpr_mu_unlock(&calld->mu_state);
     366             :         }
     367        9321 :         break;
     368             :       }
     369             :     /* fall through */
     370             :     case CALL_CREATED:
     371     1405291 :       if (op->cancel_with_status != GRPC_STATUS_OK) {
     372         231 :         calld->state = CALL_CANCELLED;
     373         231 :         gpr_mu_unlock(&calld->mu_state);
     374         231 :         handle_op_after_cancellation(exec_ctx, elem, op);
     375             :       } else {
     376     1405060 :         calld->waiting_op = *op;
     377             : 
     378     1405060 :         if (op->send_ops == NULL) {
     379             :           /* need to have some send ops before we can select the
     380             :              lb target */
     381         395 :           calld->state = CALL_WAITING_FOR_SEND;
     382         395 :           gpr_mu_unlock(&calld->mu_state);
     383             :         } else {
     384     1404665 :           gpr_mu_lock(&chand->mu_config);
     385     1405120 :           lb_policy = chand->lb_policy;
     386     1405120 :           if (lb_policy) {
     387     1402368 :             grpc_transport_stream_op *waiting_op = &calld->waiting_op;
     388     1402368 :             grpc_pollset *bind_pollset = waiting_op->bind_pollset;
     389     1402368 :             grpc_metadata_batch *initial_metadata =
     390     1402368 :                 &waiting_op->send_ops->ops[0].data.metadata;
     391     1402368 :             GRPC_LB_POLICY_REF(lb_policy, "pick");
     392     1402377 :             gpr_mu_unlock(&chand->mu_config);
     393     1402381 :             calld->state = CALL_WAITING_FOR_PICK;
     394             : 
     395     1402381 :             GPR_ASSERT(waiting_op->bind_pollset);
     396     1402381 :             GPR_ASSERT(waiting_op->send_ops);
     397     1402381 :             GPR_ASSERT(waiting_op->send_ops->nops >= 1);
     398     1402381 :             GPR_ASSERT(waiting_op->send_ops->ops[0].type == GRPC_OP_METADATA);
     399     1402381 :             gpr_mu_unlock(&calld->mu_state);
     400             : 
     401     1402382 :             grpc_closure_init(&calld->async_setup_task, picked_target, calld);
     402     1402382 :             grpc_lb_policy_pick(exec_ctx, lb_policy, bind_pollset,
     403             :                                 initial_metadata, &calld->picked_channel,
     404             :                                 &calld->async_setup_task);
     405             : 
     406     1402380 :             GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick");
     407        2752 :           } else if (chand->resolver != NULL) {
     408        2752 :             calld->state = CALL_WAITING_FOR_CONFIG;
     409        2752 :             add_to_lb_policy_wait_queue_locked_state_config(elem);
     410        2752 :             if (!chand->started_resolving && chand->resolver != NULL) {
     411        1429 :               GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
     412        1429 :               chand->started_resolving = 1;
     413        1429 :               grpc_resolver_next(exec_ctx, chand->resolver,
     414             :                                  &chand->incoming_configuration,
     415             :                                  &chand->on_config_changed);
     416             :             }
     417        2752 :             gpr_mu_unlock(&chand->mu_config);
     418        2752 :             gpr_mu_unlock(&calld->mu_state);
     419             :           } else {
     420           0 :             calld->state = CALL_CANCELLED;
     421           0 :             gpr_mu_unlock(&chand->mu_config);
     422           0 :             gpr_mu_unlock(&calld->mu_state);
     423           0 :             handle_op_after_cancellation(exec_ctx, elem, op);
     424             :           }
     425             :         }
     426             :       }
     427     1405765 :       break;
     428             :   }
     429     2859609 : }
     430             : 
     431     2853679 : static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
     432             :                                          grpc_call_element *elem,
     433             :                                          grpc_transport_stream_op *op) {
     434     2853679 :   perform_transport_stream_op(exec_ctx, elem, op, 0);
     435     2855883 : }
     436             : 
     437             : static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
     438             :                             grpc_lb_policy *lb_policy,
     439             :                             grpc_connectivity_state current_state);
     440             : 
     441        5592 : static void on_lb_policy_state_changed_locked(
     442             :     grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
     443             :   /* check if the notification is for a stale policy */
     444       11184 :   if (w->lb_policy != w->chand->lb_policy) return;
     445             : 
     446        4206 :   grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state,
     447             :                               "lb_changed");
     448        4206 :   if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
     449        4206 :     watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
     450             :   }
     451             : }
     452             : 
     453        5592 : static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
     454             :                                        int iomgr_success) {
     455        5592 :   lb_policy_connectivity_watcher *w = arg;
     456             : 
     457        5592 :   gpr_mu_lock(&w->chand->mu_config);
     458        5592 :   on_lb_policy_state_changed_locked(exec_ctx, w);
     459        5592 :   gpr_mu_unlock(&w->chand->mu_config);
     460             : 
     461        5592 :   GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy");
     462        5592 :   gpr_free(w);
     463        5592 : }
     464             : 
     465        5592 : static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
     466             :                             grpc_lb_policy *lb_policy,
     467             :                             grpc_connectivity_state current_state) {
     468        5592 :   lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
     469        5592 :   GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
     470             : 
     471        5592 :   w->chand = chand;
     472        5592 :   grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
     473        5592 :   w->state = current_state;
     474        5592 :   w->lb_policy = lb_policy;
     475        5592 :   grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
     476             :                                         &w->on_changed);
     477        5592 : }
     478             : 
     479        2833 : static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
     480             :                                  int iomgr_success) {
     481        2833 :   channel_data *chand = arg;
     482        2833 :   grpc_lb_policy *lb_policy = NULL;
     483             :   grpc_lb_policy *old_lb_policy;
     484             :   grpc_resolver *old_resolver;
     485        2833 :   grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
     486        2833 :   int exit_idle = 0;
     487             : 
     488        2833 :   if (chand->incoming_configuration != NULL) {
     489        1386 :     lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
     490        1386 :     if (lb_policy != NULL) {
     491        1386 :       GRPC_LB_POLICY_REF(lb_policy, "channel");
     492        1386 :       GRPC_LB_POLICY_REF(lb_policy, "config_change");
     493        1386 :       state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy);
     494             :     }
     495             : 
     496        1386 :     grpc_client_config_unref(exec_ctx, chand->incoming_configuration);
     497             :   }
     498             : 
     499        2833 :   chand->incoming_configuration = NULL;
     500             : 
     501        2833 :   gpr_mu_lock(&chand->mu_config);
     502        2833 :   old_lb_policy = chand->lb_policy;
     503        2833 :   chand->lb_policy = lb_policy;
     504        2833 :   if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
     505        2832 :     grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures);
     506             :   }
     507        2833 :   if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
     508          17 :     GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
     509          17 :     exit_idle = 1;
     510          17 :     chand->exit_idle_when_lb_policy_arrives = 0;
     511             :   }
     512             : 
     513        4220 :   if (iomgr_success && chand->resolver) {
     514        1387 :     grpc_resolver *resolver = chand->resolver;
     515        1387 :     GRPC_RESOLVER_REF(resolver, "channel-next");
     516        1387 :     grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
     517             :                                 "new_lb+resolver");
     518        1387 :     if (lb_policy != NULL) {
     519        1386 :       watch_lb_policy(exec_ctx, chand, lb_policy, state);
     520             :     }
     521        1387 :     gpr_mu_unlock(&chand->mu_config);
     522        1387 :     GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
     523        1387 :     grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
     524             :                        &chand->on_config_changed);
     525        1387 :     GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next");
     526             :   } else {
     527        1446 :     old_resolver = chand->resolver;
     528        1446 :     chand->resolver = NULL;
     529        1446 :     grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
     530             :                                 GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
     531        1446 :     gpr_mu_unlock(&chand->mu_config);
     532        1446 :     if (old_resolver != NULL) {
     533           0 :       grpc_resolver_shutdown(exec_ctx, old_resolver);
     534           0 :       GRPC_RESOLVER_UNREF(exec_ctx, old_resolver, "channel");
     535             :     }
     536             :   }
     537             : 
     538        2833 :   if (exit_idle) {
     539          17 :     grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
     540          17 :     GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
     541             :   }
     542             : 
     543        2833 :   if (old_lb_policy != NULL) {
     544           0 :     grpc_lb_policy_shutdown(exec_ctx, old_lb_policy);
     545           0 :     GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
     546             :   }
     547             : 
     548        2833 :   if (lb_policy != NULL) {
     549        1386 :     GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
     550             :   }
     551             : 
     552        2833 :   GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver");
     553        2833 : }
     554             : 
     555        1761 : static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
     556             :                                   grpc_channel_element *elem,
     557             :                                   grpc_transport_op *op) {
     558        1761 :   grpc_lb_policy *lb_policy = NULL;
     559        1761 :   channel_data *chand = elem->channel_data;
     560        1761 :   grpc_resolver *destroy_resolver = NULL;
     561             : 
     562        1761 :   grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
     563             : 
     564        1761 :   GPR_ASSERT(op->set_accept_stream == NULL);
     565        1761 :   GPR_ASSERT(op->bind_pollset == NULL);
     566             : 
     567        1761 :   gpr_mu_lock(&chand->mu_config);
     568        1761 :   if (op->on_connectivity_state_change != NULL) {
     569           0 :     grpc_connectivity_state_notify_on_state_change(
     570             :         exec_ctx, &chand->state_tracker, op->connectivity_state,
     571             :         op->on_connectivity_state_change);
     572           0 :     op->on_connectivity_state_change = NULL;
     573           0 :     op->connectivity_state = NULL;
     574             :   }
     575             : 
     576        1761 :   if (!is_empty(op, sizeof(*op))) {
     577        1761 :     lb_policy = chand->lb_policy;
     578        1761 :     if (lb_policy) {
     579        1386 :       GRPC_LB_POLICY_REF(lb_policy, "broadcast");
     580             :     }
     581             :   }
     582             : 
     583        1761 :   if (op->disconnect && chand->resolver != NULL) {
     584        1761 :     grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
     585             :                                 GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
     586        1761 :     destroy_resolver = chand->resolver;
     587        1761 :     chand->resolver = NULL;
     588        1761 :     if (chand->lb_policy != NULL) {
     589        1386 :       grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy);
     590        1386 :       GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
     591        1386 :       chand->lb_policy = NULL;
     592             :     }
     593             :   }
     594        1761 :   gpr_mu_unlock(&chand->mu_config);
     595             : 
     596        1761 :   if (destroy_resolver) {
     597        1761 :     grpc_resolver_shutdown(exec_ctx, destroy_resolver);
     598        1761 :     GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel");
     599             :   }
     600             : 
     601        1761 :   if (lb_policy) {
     602        1386 :     grpc_lb_policy_broadcast(exec_ctx, lb_policy, op);
     603        1386 :     GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast");
     604             :   }
     605        1761 : }
     606             : 
     607             : /* Constructor for call_data */
     608     1400980 : static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
     609             :                            const void *server_transport_data,
     610             :                            grpc_transport_stream_op *initial_op) {
     611     1400980 :   call_data *calld = elem->call_data;
     612             : 
     613             :   /* TODO(ctiller): is there something useful we can do here? */
     614     1400980 :   GPR_ASSERT(initial_op == NULL);
     615             : 
     616     1400980 :   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
     617     1400980 :   GPR_ASSERT(server_transport_data == NULL);
     618     1400980 :   gpr_mu_init(&calld->mu_state);
     619     1402469 :   calld->elem = elem;
     620     1402469 :   calld->state = CALL_CREATED;
     621     1402469 :   calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
     622     1402689 : }
     623             : 
     624             : /* Destructor for call_data */
     625     1402501 : static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
     626             :                               grpc_call_element *elem) {
     627     1402501 :   call_data *calld = elem->call_data;
     628             :   grpc_subchannel_call *subchannel_call;
     629             : 
     630             :   /* if the call got activated, we need to destroy the child stack also, and
     631             :      remove it from the in-flight requests tracked by the child_entry we
     632             :      picked */
     633     1402501 :   gpr_mu_lock(&calld->mu_state);
     634     1402673 :   switch (calld->state) {
     635             :     case CALL_ACTIVE:
     636     1402343 :       subchannel_call = calld->subchannel_call;
     637     1402343 :       gpr_mu_unlock(&calld->mu_state);
     638     1402320 :       GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_channel");
     639     1402315 :       break;
     640             :     case CALL_CREATED:
     641             :     case CALL_CANCELLED:
     642         330 :       gpr_mu_unlock(&calld->mu_state);
     643         330 :       break;
     644             :     case CALL_WAITING_FOR_PICK:
     645             :     case CALL_WAITING_FOR_CONFIG:
     646             :     case CALL_WAITING_FOR_CALL:
     647             :     case CALL_WAITING_FOR_SEND:
     648           0 :       GPR_UNREACHABLE_CODE(return );
     649             :   }
     650     1402645 : }
     651             : 
     652             : /* Constructor for channel_data */
     653        1761 : static void init_channel_elem(grpc_exec_ctx *exec_ctx,
     654             :                               grpc_channel_element *elem, grpc_channel *master,
     655             :                               const grpc_channel_args *args,
     656             :                               grpc_mdctx *metadata_context, int is_first,
     657             :                               int is_last) {
     658        1761 :   channel_data *chand = elem->channel_data;
     659             : 
     660        1761 :   memset(chand, 0, sizeof(*chand));
     661             : 
     662        1761 :   GPR_ASSERT(is_last);
     663        1761 :   GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
     664             : 
     665        1761 :   gpr_mu_init(&chand->mu_config);
     666        1761 :   chand->mdctx = metadata_context;
     667        1761 :   chand->master = master;
     668        1761 :   grpc_pollset_set_init(&chand->pollset_set);
     669        1761 :   grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
     670             : 
     671        1761 :   grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
     672             :                                "client_channel");
     673        1761 : }
     674             : 
     675             : /* Destructor for channel_data */
     676        1761 : static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
     677             :                                  grpc_channel_element *elem) {
     678        1761 :   channel_data *chand = elem->channel_data;
     679             : 
     680        1761 :   if (chand->resolver != NULL) {
     681           0 :     grpc_resolver_shutdown(exec_ctx, chand->resolver);
     682           0 :     GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
     683             :   }
     684        1761 :   if (chand->lb_policy != NULL) {
     685           0 :     GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
     686             :   }
     687        1761 :   grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
     688        1761 :   grpc_pollset_set_destroy(&chand->pollset_set);
     689        1761 :   gpr_mu_destroy(&chand->mu_config);
     690        1761 : }
     691             : 
     692             : const grpc_channel_filter grpc_client_channel_filter = {
     693             :     cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
     694             :     init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem,
     695             :     destroy_channel_elem, cc_get_peer, "client-channel",
     696             : };
     697             : 
     698        1761 : void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
     699             :                                       grpc_channel_stack *channel_stack,
     700             :                                       grpc_resolver *resolver) {
     701             :   /* post construction initialization: set the transport setup pointer */
     702        1761 :   grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
     703        1761 :   channel_data *chand = elem->channel_data;
     704        1761 :   gpr_mu_lock(&chand->mu_config);
     705        1761 :   GPR_ASSERT(!chand->resolver);
     706        1761 :   chand->resolver = resolver;
     707        1761 :   GRPC_RESOLVER_REF(resolver, "channel");
     708        3522 :   if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
     709        1761 :       chand->exit_idle_when_lb_policy_arrives) {
     710           0 :     chand->started_resolving = 1;
     711           0 :     GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
     712           0 :     grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
     713             :                        &chand->on_config_changed);
     714             :   }
     715        1761 :   gpr_mu_unlock(&chand->mu_config);
     716        1761 : }
     717             : 
     718         129 : grpc_connectivity_state grpc_client_channel_check_connectivity_state(
     719             :     grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
     720         129 :   channel_data *chand = elem->channel_data;
     721             :   grpc_connectivity_state out;
     722         129 :   gpr_mu_lock(&chand->mu_config);
     723         129 :   out = grpc_connectivity_state_check(&chand->state_tracker);
     724         129 :   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
     725          17 :     if (chand->lb_policy != NULL) {
     726           0 :       grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
     727             :     } else {
     728          17 :       chand->exit_idle_when_lb_policy_arrives = 1;
     729          17 :       if (!chand->started_resolving && chand->resolver != NULL) {
     730          17 :         GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
     731          17 :         chand->started_resolving = 1;
     732          17 :         grpc_resolver_next(exec_ctx, chand->resolver,
     733             :                            &chand->incoming_configuration,
     734             :                            &chand->on_config_changed);
     735             :       }
     736             :     }
     737             :   }
     738         129 :   gpr_mu_unlock(&chand->mu_config);
     739         129 :   return out;
     740             : }
     741             : 
     742          94 : void grpc_client_channel_watch_connectivity_state(
     743             :     grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
     744             :     grpc_connectivity_state *state, grpc_closure *on_complete) {
     745          94 :   channel_data *chand = elem->channel_data;
     746          94 :   gpr_mu_lock(&chand->mu_config);
     747          94 :   grpc_connectivity_state_notify_on_state_change(
     748             :       exec_ctx, &chand->state_tracker, state, on_complete);
     749          94 :   gpr_mu_unlock(&chand->mu_config);
     750          94 : }
     751             : 
     752        1470 : grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
     753             :     grpc_channel_element *elem) {
     754        1470 :   channel_data *chand = elem->channel_data;
     755        1470 :   return &chand->pollset_set;
     756             : }
     757             : 
     758          94 : void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx,
     759             :                                               grpc_channel_element *elem,
     760             :                                               grpc_pollset *pollset) {
     761          94 :   channel_data *chand = elem->channel_data;
     762          94 :   grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset);
     763          94 : }
     764             : 
     765          94 : void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx,
     766             :                                               grpc_channel_element *elem,
     767             :                                               grpc_pollset *pollset) {
     768          94 :   channel_data *chand = elem->channel_data;
     769          94 :   grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset);
     770          94 : }

Generated by: LCOV version 1.10