LCOV - code coverage report
Current view: top level - core/channel - client_uchannel.c (source / functions) Hit Total Coverage
Test: tmp.CaZ6RjdVn2 Lines: 105 108 97.2 %
Date: 2015-12-10 22:15:08 Functions: 17 17 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/channel/client_uchannel.h"
      35             : 
      36             : #include <string.h>
      37             : 
      38             : #include "src/core/census/grpc_filter.h"
      39             : #include "src/core/channel/channel_args.h"
      40             : #include "src/core/channel/client_channel.h"
      41             : #include "src/core/channel/compress_filter.h"
      42             : #include "src/core/channel/subchannel_call_holder.h"
      43             : #include "src/core/iomgr/iomgr.h"
      44             : #include "src/core/support/string.h"
      45             : #include "src/core/surface/channel.h"
      46             : #include "src/core/transport/connectivity_state.h"
      47             : 
      48             : #include <grpc/support/alloc.h>
      49             : #include <grpc/support/log.h>
      50             : #include <grpc/support/sync.h>
      51             : #include <grpc/support/useful.h>
      52             : 
      53             : /** Microchannel (uchannel) implementation: a lightweight channel without any
      54             :  * load-balancing mechanisms meant for communication from within the core. */
      55             : 
      56             : typedef struct client_uchannel_channel_data {
      57             :   /** master channel - the grpc_channel instance that ultimately owns
      58             :       this channel_data via its channel stack.
      59             :       We occasionally use this to bump the refcount on the master channel
      60             :       to keep ourselves alive through an asynchronous operation. */
      61             :   grpc_channel *master;
      62             : 
      63             :   /** connectivity state being tracked */
      64             :   grpc_connectivity_state_tracker state_tracker;
      65             : 
      66             :   /** the subchannel wrapped by the microchannel */
      67             :   grpc_subchannel *subchannel;
      68             : 
      69             :   /** the callback used to stay subscribed to subchannel connectivity
      70             :    * notifications */
      71             :   grpc_closure connectivity_cb;
      72             : 
      73             :   /** the current connectivity state of the wrapped subchannel */
      74             :   grpc_connectivity_state subchannel_connectivity;
      75             : 
      76             :   gpr_mu mu_state;
      77             : } channel_data;
      78             : 
      79             : typedef grpc_subchannel_call_holder call_data;
      80             : 
      81          12 : static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
      82             :                                int iomgr_success) {
      83          12 :   channel_data *chand = arg;
      84          12 :   grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
      85             :                               chand->subchannel_connectivity,
      86             :                               "uchannel_monitor_subchannel");
      87          12 :   grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
      88             :                                          &chand->subchannel_connectivity,
      89             :                                          &chand->connectivity_cb);
      90          12 : }
      91             : 
      92          80 : static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
      93          80 :   channel_data *chand = elem->channel_data;
      94          80 :   return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
      95             :                                               chand->master);
      96             : }
      97             : 
      98       52566 : static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
      99             :                                           grpc_call_element *elem,
     100             :                                           grpc_transport_stream_op *op) {
     101       52566 :   GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
     102       52566 :   grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op);
     103       52566 : }
     104             : 
     105         246 : static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
     106             :                                    grpc_channel_element *elem,
     107             :                                    grpc_transport_op *op) {
     108         246 :   channel_data *chand = elem->channel_data;
     109             : 
     110         246 :   grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
     111             : 
     112         246 :   GPR_ASSERT(op->set_accept_stream == NULL);
     113         246 :   GPR_ASSERT(op->bind_pollset == NULL);
     114             : 
     115         246 :   if (op->on_connectivity_state_change != NULL) {
     116           0 :     grpc_connectivity_state_notify_on_state_change(
     117             :         exec_ctx, &chand->state_tracker, op->connectivity_state,
     118             :         op->on_connectivity_state_change);
     119           0 :     op->on_connectivity_state_change = NULL;
     120           0 :     op->connectivity_state = NULL;
     121             :   }
     122             : 
     123         246 :   if (op->disconnect) {
     124         246 :     grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
     125             :                                 GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
     126             :   }
     127         246 : }
     128             : 
     129       52358 : static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
     130             :                                grpc_metadata_batch *initial_metadata,
     131             :                                grpc_subchannel **subchannel,
     132             :                                grpc_closure *on_ready) {
     133       52358 :   channel_data *chand = arg;
     134       52358 :   GPR_ASSERT(initial_metadata != NULL);
     135       52358 :   *subchannel = chand->subchannel;
     136       52358 :   return 1;
     137             : }
     138             : 
     139             : /* Constructor for call_data */
     140       52394 : static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
     141             :                                grpc_call_element_args *args) {
     142       52394 :   grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel,
     143             :                                    elem->channel_data);
     144       52394 : }
     145             : 
     146             : /* Destructor for call_data */
     147       52394 : static void cuc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
     148             :                                   grpc_call_element *elem) {
     149       52394 :   grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
     150       52394 : }
     151             : 
     152             : /* Constructor for channel_data */
     153         246 : static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
     154             :                                   grpc_channel_element *elem,
     155             :                                   grpc_channel_element_args *args) {
     156         246 :   channel_data *chand = elem->channel_data;
     157         246 :   memset(chand, 0, sizeof(*chand));
     158         246 :   grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand);
     159         246 :   GPR_ASSERT(args->is_last);
     160         246 :   GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
     161         246 :   chand->master = args->master;
     162         246 :   grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
     163             :                                "client_uchannel");
     164         246 :   gpr_mu_init(&chand->mu_state);
     165         246 : }
     166             : 
     167             : /* Destructor for channel_data */
     168         246 : static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
     169             :                                      grpc_channel_element *elem) {
     170         246 :   channel_data *chand = elem->channel_data;
     171         246 :   grpc_subchannel_state_change_unsubscribe(exec_ctx, chand->subchannel,
     172             :                                            &chand->connectivity_cb);
     173         246 :   grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
     174         246 :   gpr_mu_destroy(&chand->mu_state);
     175         246 : }
     176             : 
     177       52394 : static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
     178             :                             grpc_pollset *pollset) {
     179       52394 :   call_data *calld = elem->call_data;
     180       52394 :   calld->pollset = pollset;
     181       52394 : }
     182             : 
     183             : const grpc_channel_filter grpc_client_uchannel_filter = {
     184             :     cuc_start_transport_stream_op, cuc_start_transport_op, sizeof(call_data),
     185             :     cuc_init_call_elem, cuc_set_pollset, cuc_destroy_call_elem,
     186             :     sizeof(channel_data), cuc_init_channel_elem, cuc_destroy_channel_elem,
     187             :     cuc_get_peer, "client-uchannel",
     188             : };
     189             : 
     190          16 : grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
     191             :     grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
     192          16 :   channel_data *chand = elem->channel_data;
     193             :   grpc_connectivity_state out;
     194          16 :   out = grpc_connectivity_state_check(&chand->state_tracker);
     195          16 :   gpr_mu_lock(&chand->mu_state);
     196          16 :   if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
     197           2 :     grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
     198             :                                 GRPC_CHANNEL_CONNECTING,
     199             :                                 "uchannel_connecting_changed");
     200           2 :     chand->subchannel_connectivity = out;
     201           2 :     grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
     202             :                                            &chand->subchannel_connectivity,
     203             :                                            &chand->connectivity_cb);
     204             :   }
     205          16 :   gpr_mu_unlock(&chand->mu_state);
     206          16 :   return out;
     207             : }
     208             : 
     209          12 : void grpc_client_uchannel_watch_connectivity_state(
     210             :     grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
     211             :     grpc_connectivity_state *state, grpc_closure *on_complete) {
     212          12 :   channel_data *chand = elem->channel_data;
     213          12 :   gpr_mu_lock(&chand->mu_state);
     214          12 :   grpc_connectivity_state_notify_on_state_change(
     215             :       exec_ctx, &chand->state_tracker, state, on_complete);
     216          12 :   gpr_mu_unlock(&chand->mu_state);
     217          12 : }
     218             : 
     219          24 : grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set(
     220             :     grpc_channel_element *elem) {
     221          24 :   channel_data *chand = elem->channel_data;
     222             :   grpc_channel_element *parent_elem;
     223          24 :   gpr_mu_lock(&chand->mu_state);
     224          24 :   parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(
     225             :       grpc_subchannel_get_master(chand->subchannel)));
     226          24 :   gpr_mu_unlock(&chand->mu_state);
     227          24 :   return grpc_client_channel_get_connecting_pollset_set(parent_elem);
     228             : }
     229             : 
     230          12 : void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
     231             :                                                grpc_channel_element *elem,
     232             :                                                grpc_pollset *pollset) {
     233          12 :   grpc_pollset_set *master_pollset_set =
     234             :       grpc_client_uchannel_get_connecting_pollset_set(elem);
     235          12 :   grpc_pollset_set_add_pollset(exec_ctx, master_pollset_set, pollset);
     236          12 : }
     237             : 
     238          12 : void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
     239             :                                                grpc_channel_element *elem,
     240             :                                                grpc_pollset *pollset) {
     241          12 :   grpc_pollset_set *master_pollset_set =
     242             :       grpc_client_uchannel_get_connecting_pollset_set(elem);
     243          12 :   grpc_pollset_set_del_pollset(exec_ctx, master_pollset_set, pollset);
     244          12 : }
     245             : 
     246         246 : grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
     247             :                                           grpc_channel_args *args) {
     248         246 :   grpc_channel *channel = NULL;
     249             : #define MAX_FILTERS 3
     250             :   const grpc_channel_filter *filters[MAX_FILTERS];
     251         246 :   grpc_channel *master = grpc_subchannel_get_master(subchannel);
     252         246 :   char *target = grpc_channel_get_target(master);
     253         246 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     254         246 :   size_t n = 0;
     255             : 
     256         246 :   if (grpc_channel_args_is_census_enabled(args)) {
     257           2 :     filters[n++] = &grpc_client_census_filter;
     258             :   }
     259         246 :   filters[n++] = &grpc_compress_filter;
     260         246 :   filters[n++] = &grpc_client_uchannel_filter;
     261         246 :   GPR_ASSERT(n <= MAX_FILTERS);
     262             : 
     263         246 :   channel =
     264             :       grpc_channel_create_from_filters(&exec_ctx, target, filters, n, args, 1);
     265             : 
     266         246 :   gpr_free(target);
     267         246 :   return channel;
     268             : }
     269             : 
     270         246 : void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel,
     271             :                                          grpc_subchannel *subchannel) {
     272         246 :   grpc_channel_element *elem =
     273         246 :       grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel));
     274         246 :   channel_data *chand = elem->channel_data;
     275         246 :   GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
     276         246 :   gpr_mu_lock(&chand->mu_state);
     277         246 :   chand->subchannel = subchannel;
     278         246 :   gpr_mu_unlock(&chand->mu_state);
     279         246 : }

Generated by: LCOV version 1.11