LCOV - code coverage report
Current view: top level - src/core/client_config/lb_policies - pick_first.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 182 202 90.1 %
Date: 2015-10-10 Functions: 15 16 93.8 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/client_config/lb_policy_factory.h"
      35             : #include "src/core/client_config/lb_policies/pick_first.h"
      36             : 
      37             : #include <string.h>
      38             : 
      39             : #include <grpc/support/alloc.h>
      40             : #include "src/core/transport/connectivity_state.h"
      41             : 
      42             : typedef struct pending_pick {
      43             :   struct pending_pick *next;
      44             :   grpc_pollset *pollset;
      45             :   grpc_subchannel **target;
      46             :   grpc_closure *on_complete;
      47             : } pending_pick;
      48             : 
      49             : typedef struct {
      50             :   /** base policy: must be first */
      51             :   grpc_lb_policy base;
      52             :   /** all our subchannels */
      53             :   grpc_subchannel **subchannels;
      54             :   size_t num_subchannels;
      55             : 
      56             :   grpc_closure connectivity_changed;
      57             : 
      58             :   /** mutex protecting remaining members */
      59             :   gpr_mu mu;
      60             :   /** the selected channel
      61             :       TODO(ctiller): this should be atomically set so we don't
      62             :                      need to take a mutex in the common case */
      63             :   grpc_subchannel *selected;
      64             :   /** have we started picking? */
      65             :   int started_picking;
      66             :   /** are we shut down? */
      67             :   int shutdown;
      68             :   /** which subchannel are we watching? */
      69             :   size_t checking_subchannel;
      70             :   /** what is the connectivity of that channel? */
      71             :   grpc_connectivity_state checking_connectivity;
      72             :   /** list of picks that are waiting on connectivity */
      73             :   pending_pick *pending_picks;
      74             : 
      75             :   /** our connectivity state tracker */
      76             :   grpc_connectivity_state_tracker state_tracker;
      77             : } pick_first_lb_policy;
      78             : 
      79        1451 : static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
      80             :                                           pick_first_lb_policy *p) {
      81             :   pending_pick *pp;
      82        1523 :   for (pp = p->pending_picks; pp; pp = pp->next) {
      83         144 :     grpc_subchannel_del_interested_party(
      84          72 :         exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
      85             :   }
      86        1451 : }
      87             : 
      88          70 : static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx,
      89             :                                           pick_first_lb_policy *p) {
      90             :   pending_pick *pp;
      91         122 :   for (pp = p->pending_picks; pp; pp = pp->next) {
      92         104 :     grpc_subchannel_add_interested_party(
      93          52 :         exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
      94             :   }
      95          70 : }
      96             : 
      97        1440 : void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
      98        1440 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
      99             :   size_t i;
     100        1440 :   GPR_ASSERT(p->pending_picks == NULL);
     101        1526 :   for (i = 0; i < p->num_subchannels; i++) {
     102          86 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
     103             :   }
     104        1440 :   if (p->selected) {
     105        1354 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
     106             :   }
     107        1440 :   grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
     108        1440 :   gpr_free(p->subchannels);
     109        1440 :   gpr_mu_destroy(&p->mu);
     110        1440 :   gpr_free(p);
     111        1440 : }
     112             : 
     113        1381 : void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
     114        1381 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     115             :   pending_pick *pp;
     116        1381 :   gpr_mu_lock(&p->mu);
     117        1381 :   del_interested_parties_locked(exec_ctx, p);
     118        1381 :   p->shutdown = 1;
     119        1381 :   pp = p->pending_picks;
     120        1381 :   p->pending_picks = NULL;
     121        1381 :   grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     122             :                               GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
     123        1381 :   gpr_mu_unlock(&p->mu);
     124        2782 :   while (pp != NULL) {
     125          20 :     pending_pick *next = pp->next;
     126          20 :     *pp->target = NULL;
     127          20 :     grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
     128          20 :     gpr_free(pp);
     129          20 :     pp = next;
     130             :   }
     131        1381 : }
     132             : 
     133        1377 : static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
     134        1377 :   p->started_picking = 1;
     135        1377 :   p->checking_subchannel = 0;
     136        1377 :   p->checking_connectivity = GRPC_CHANNEL_IDLE;
     137        1377 :   GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
     138        2754 :   grpc_subchannel_notify_on_state_change(
     139        1377 :       exec_ctx, p->subchannels[p->checking_subchannel],
     140             :       &p->checking_connectivity, &p->connectivity_changed);
     141        1377 : }
     142             : 
     143          17 : void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
     144          17 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     145          17 :   gpr_mu_lock(&p->mu);
     146          17 :   if (!p->started_picking) {
     147          17 :     start_picking(exec_ctx, p);
     148             :   }
     149          17 :   gpr_mu_unlock(&p->mu);
     150          17 : }
     151             : 
     152     1401079 : void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
     153             :              grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
     154             :              grpc_subchannel **target, grpc_closure *on_complete) {
     155     1401079 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     156             :   pending_pick *pp;
     157     1401079 :   gpr_mu_lock(&p->mu);
     158     1402342 :   if (p->selected) {
     159     1391632 :     gpr_mu_unlock(&p->mu);
     160     1391619 :     *target = p->selected;
     161     1391619 :     grpc_exec_ctx_enqueue(exec_ctx, on_complete, 1);
     162             :   } else {
     163       10710 :     if (!p->started_picking) {
     164        1360 :       start_picking(exec_ctx, p);
     165             :     }
     166       10710 :     grpc_subchannel_add_interested_party(
     167       10710 :         exec_ctx, p->subchannels[p->checking_subchannel], pollset);
     168       10710 :     pp = gpr_malloc(sizeof(*pp));
     169       10710 :     pp->next = p->pending_picks;
     170       10710 :     pp->pollset = pollset;
     171       10710 :     pp->target = target;
     172       10710 :     pp->on_complete = on_complete;
     173       10710 :     p->pending_picks = pp;
     174       10710 :     gpr_mu_unlock(&p->mu);
     175             :   }
     176     1401926 : }
     177             : 
     178        1354 : static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg, 
     179             :                                 int iomgr_success) {
     180        1354 :   pick_first_lb_policy *p = arg;
     181             :   size_t i;
     182             :   grpc_transport_op op;
     183        1354 :   size_t num_subchannels = p->num_subchannels;
     184             :   grpc_subchannel **subchannels;
     185             :   grpc_subchannel *exclude_subchannel;
     186             : 
     187        1354 :   gpr_mu_lock(&p->mu);
     188        1354 :   subchannels = p->subchannels;
     189        1354 :   p->num_subchannels = 0;
     190        1354 :   p->subchannels = NULL;
     191        1354 :   exclude_subchannel = p->selected;
     192        1354 :   gpr_mu_unlock(&p->mu);
     193        1354 :   GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
     194             : 
     195        2718 :   for (i = 0; i < num_subchannels; i++) {
     196        1364 :     if (subchannels[i] != exclude_subchannel) {
     197          10 :       memset(&op, 0, sizeof(op));
     198          10 :       op.disconnect = 1;
     199          10 :       grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
     200             :     }
     201        1364 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
     202             :   }
     203             : 
     204        1354 :   gpr_free(subchannels);
     205        1354 : }
     206             : 
     207        5640 : static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
     208             :                                     int iomgr_success) {
     209        5640 :   pick_first_lb_policy *p = arg;
     210             :   pending_pick *pp;
     211             : 
     212        5640 :   gpr_mu_lock(&p->mu);
     213             : 
     214        5640 :   if (p->shutdown) {
     215        1377 :     gpr_mu_unlock(&p->mu);
     216        1377 :     GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
     217        7017 :     return;
     218        4263 :   } else if (p->selected != NULL) {
     219        1405 :     grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     220             :                                 p->checking_connectivity, "selected_changed");
     221        1405 :     if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
     222        1405 :       grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
     223             :                                              &p->checking_connectivity,
     224             :                                              &p->connectivity_changed);
     225             :     } else {
     226           0 :       GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
     227             :     }
     228             :   } else {
     229             :   loop:
     230        2860 :     switch (p->checking_connectivity) {
     231             :       case GRPC_CHANNEL_READY:
     232        1354 :         grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     233             :                                     GRPC_CHANNEL_READY, "connecting_ready");
     234        1354 :         p->selected = p->subchannels[p->checking_subchannel];
     235        1354 :         GRPC_SUBCHANNEL_REF(p->selected, "picked_first");
     236             :         /* drop the pick list: we are connected now */
     237        1354 :         GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels");
     238        1354 :         grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(destroy_subchannels, p), 1);
     239             :         /* update any calls that were waiting for a pick */
     240       13257 :         while ((pp = p->pending_picks)) {
     241       10549 :           p->pending_picks = pp->next;
     242       10549 :           *pp->target = p->selected;
     243       10549 :           grpc_subchannel_del_interested_party(exec_ctx, p->selected,
     244             :                                                pp->pollset);
     245       10538 :           grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
     246       10484 :           gpr_free(pp);
     247             :         }
     248        1354 :         grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
     249             :                                                &p->checking_connectivity,
     250             :                                                &p->connectivity_changed);
     251        1354 :         break;
     252             :       case GRPC_CHANNEL_TRANSIENT_FAILURE:
     253          70 :         grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     254             :                                     GRPC_CHANNEL_TRANSIENT_FAILURE,
     255             :                                     "connecting_transient_failure");
     256          70 :         del_interested_parties_locked(exec_ctx, p);
     257          70 :         p->checking_subchannel =
     258          70 :             (p->checking_subchannel + 1) % p->num_subchannels;
     259          70 :         p->checking_connectivity = grpc_subchannel_check_connectivity(
     260          70 :             p->subchannels[p->checking_subchannel]);
     261          70 :         add_interested_parties_locked(exec_ctx, p);
     262          70 :         if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
     263         136 :           grpc_subchannel_notify_on_state_change(
     264          68 :               exec_ctx, p->subchannels[p->checking_subchannel],
     265             :               &p->checking_connectivity, &p->connectivity_changed);
     266             :         } else {
     267           2 :           goto loop;
     268             :         }
     269          68 :         break;
     270             :       case GRPC_CHANNEL_CONNECTING:
     271             :       case GRPC_CHANNEL_IDLE:
     272        1436 :         grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     273             :                                     GRPC_CHANNEL_CONNECTING,
     274             :                                     "connecting_changed");
     275        2872 :         grpc_subchannel_notify_on_state_change(
     276        1436 :             exec_ctx, p->subchannels[p->checking_subchannel],
     277             :             &p->checking_connectivity, &p->connectivity_changed);
     278        1436 :         break;
     279             :       case GRPC_CHANNEL_FATAL_FAILURE:
     280           0 :         del_interested_parties_locked(exec_ctx, p);
     281           0 :         GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
     282             :                  p->subchannels[p->num_subchannels - 1]);
     283           0 :         p->num_subchannels--;
     284           0 :         GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
     285             :                               "pick_first");
     286           0 :         if (p->num_subchannels == 0) {
     287           0 :           grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     288             :                                       GRPC_CHANNEL_FATAL_FAILURE,
     289             :                                       "no_more_channels");
     290           0 :           while ((pp = p->pending_picks)) {
     291           0 :             p->pending_picks = pp->next;
     292           0 :             *pp->target = NULL;
     293           0 :             grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
     294           0 :             gpr_free(pp);
     295             :           }
     296           0 :           GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
     297             :         } else {
     298           0 :           grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     299             :                                       GRPC_CHANNEL_TRANSIENT_FAILURE,
     300             :                                       "subchannel_failed");
     301           0 :           p->checking_subchannel %= p->num_subchannels;
     302           0 :           p->checking_connectivity = grpc_subchannel_check_connectivity(
     303           0 :               p->subchannels[p->checking_subchannel]);
     304           0 :           add_interested_parties_locked(exec_ctx, p);
     305           0 :           goto loop;
     306             :         }
     307             :     }
     308             :   }
     309             : 
     310        4263 :   gpr_mu_unlock(&p->mu);
     311             : }
     312             : 
     313        1381 : static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
     314             :                          grpc_transport_op *op) {
     315        1381 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     316             :   size_t i;
     317             :   size_t n;
     318             :   grpc_subchannel **subchannels;
     319             :   grpc_subchannel *selected;
     320             : 
     321        1381 :   gpr_mu_lock(&p->mu);
     322        1381 :   n = p->num_subchannels;
     323        1381 :   subchannels = gpr_malloc(n * sizeof(*subchannels));
     324        1381 :   selected = p->selected;
     325        1381 :   if (selected) {
     326        1354 :     GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
     327             :   }
     328        1408 :   for (i = 0; i < n; i++) {
     329          27 :     subchannels[i] = p->subchannels[i];
     330          27 :     GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
     331             :   }
     332        1381 :   gpr_mu_unlock(&p->mu);
     333             : 
     334        1408 :   for (i = 0; i < n; i++) {
     335          27 :     if (selected == subchannels[i]) continue;
     336          27 :     grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
     337          27 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
     338             :   }
     339        1381 :   if (p->selected) {
     340        1354 :     grpc_subchannel_process_transport_op(exec_ctx, selected, op);
     341        1354 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
     342             :   }
     343        1381 :   gpr_free(subchannels);
     344        1381 : }
     345             : 
     346        1381 : static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
     347             :                                                      grpc_lb_policy *pol) {
     348        1381 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     349             :   grpc_connectivity_state st;
     350        1381 :   gpr_mu_lock(&p->mu);
     351        1381 :   st = grpc_connectivity_state_check(&p->state_tracker);
     352        1381 :   gpr_mu_unlock(&p->mu);
     353        1381 :   return st;
     354             : }
     355             : 
     356        5533 : void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
     357             :                                grpc_connectivity_state *current,
     358             :                                grpc_closure *notify) {
     359        5533 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     360        5533 :   gpr_mu_lock(&p->mu);
     361        5533 :   grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
     362             :                                                  current, notify);
     363        5533 :   gpr_mu_unlock(&p->mu);
     364        5533 : }
     365             : 
     366             : static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
     367             :     pf_destroy, pf_shutdown, pf_pick, pf_exit_idle, pf_broadcast,
     368             :     pf_check_connectivity, pf_notify_on_state_change};
     369             : 
     370        2501 : static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
     371             : 
     372           0 : static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
     373             : 
     374        1440 : static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
     375             :                                          grpc_lb_policy_args *args) {
     376        1440 :   pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
     377        1440 :   GPR_ASSERT(args->num_subchannels > 0);
     378        1440 :   memset(p, 0, sizeof(*p));
     379        1440 :   grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
     380        1440 :   p->subchannels =
     381        1440 :       gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
     382        1440 :   p->num_subchannels = args->num_subchannels;
     383        1440 :   grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
     384             :                                "pick_first");
     385        1440 :   memcpy(p->subchannels, args->subchannels,
     386        1440 :          sizeof(grpc_subchannel *) * args->num_subchannels);
     387        1440 :   grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
     388        1440 :   gpr_mu_init(&p->mu);
     389        1440 :   return &p->base;
     390             : }
     391             : 
     392             : static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = {
     393             :     pick_first_factory_ref, pick_first_factory_unref, create_pick_first,
     394             :     "pick_first"};
     395             : 
     396             : static grpc_lb_policy_factory pick_first_lb_policy_factory = {
     397             :     &pick_first_factory_vtable};
     398             : 
     399        5002 : grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() {
     400        5002 :   return &pick_first_lb_policy_factory;
     401             : }

Generated by: LCOV version 1.10