LCOV - code coverage report
Current view: top level - core/client_config/lb_policies - pick_first.c (source / functions) Hit Total Coverage
Test: tmp.CaZ6RjdVn2 Lines: 194 220 88.2 %
Date: 2015-12-10 22:15:08 Functions: 17 17 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/client_config/lb_policy_factory.h"
      35             : #include "src/core/client_config/lb_policies/pick_first.h"
      36             : 
      37             : #include <string.h>
      38             : 
      39             : #include <grpc/support/alloc.h>
      40             : #include "src/core/transport/connectivity_state.h"
      41             : 
      42             : typedef struct pending_pick {
      43             :   struct pending_pick *next;
      44             :   grpc_pollset *pollset;
      45             :   grpc_subchannel **target;
      46             :   grpc_closure *on_complete;
      47             : } pending_pick;
      48             : 
      49             : typedef struct {
      50             :   /** base policy: must be first */
      51             :   grpc_lb_policy base;
      52             :   /** all our subchannels */
      53             :   grpc_subchannel **subchannels;
      54             :   size_t num_subchannels;
      55             : 
      56             :   grpc_closure connectivity_changed;
      57             : 
      58             :   /** mutex protecting remaining members */
      59             :   gpr_mu mu;
      60             :   /** the selected channel
      61             :       TODO(ctiller): this should be atomically set so we don't
      62             :                      need to take a mutex in the common case */
      63             :   grpc_subchannel *selected;
      64             :   /** have we started picking? */
      65             :   int started_picking;
      66             :   /** are we shut down? */
      67             :   int shutdown;
      68             :   /** which subchannel are we watching? */
      69             :   size_t checking_subchannel;
      70             :   /** what is the connectivity of that channel? */
      71             :   grpc_connectivity_state checking_connectivity;
      72             :   /** list of picks that are waiting on connectivity */
      73             :   pending_pick *pending_picks;
      74             : 
      75             :   /** our connectivity state tracker */
      76             :   grpc_connectivity_state_tracker state_tracker;
      77             : } pick_first_lb_policy;
      78             : 
      79        2453 : static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
      80             :                                           pick_first_lb_policy *p) {
      81             :   pending_pick *pp;
      82        2504 :   for (pp = p->pending_picks; pp; pp = pp->next) {
      83         102 :     grpc_subchannel_del_interested_party(
      84          51 :         exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
      85             :   }
      86        2453 : }
      87             : 
      88         271 : static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx,
      89             :                                           pick_first_lb_policy *p) {
      90             :   pending_pick *pp;
      91         322 :   for (pp = p->pending_picks; pp; pp = pp->next) {
      92         102 :     grpc_subchannel_add_interested_party(
      93          51 :         exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
      94             :   }
      95         271 : }
      96             : 
      97        2300 : void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
      98        2294 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
      99             :   size_t i;
     100        2300 :   GPR_ASSERT(p->pending_picks == NULL);
     101        2633 :   for (i = 0; i < p->num_subchannels; i++) {
     102         339 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
     103             :   }
     104        2300 :   if (p->selected) {
     105        2090 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
     106             :   }
     107        2300 :   grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
     108        2300 :   gpr_free(p->subchannels);
     109        2300 :   gpr_mu_destroy(&p->mu);
     110        2300 :   gpr_free(p);
     111        2300 : }
     112             : 
     113        2182 : void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
     114        2176 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     115             :   pending_pick *pp;
     116        2182 :   gpr_mu_lock(&p->mu);
     117        2182 :   del_interested_parties_locked(exec_ctx, p);
     118        2182 :   p->shutdown = 1;
     119        2182 :   pp = p->pending_picks;
     120        2182 :   p->pending_picks = NULL;
     121        2182 :   grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     122             :                               GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
     123        2182 :   gpr_mu_unlock(&p->mu);
     124        4364 :   while (pp != NULL) {
     125           0 :     pending_pick *next = pp->next;
     126           0 :     *pp->target = NULL;
     127           0 :     grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
     128           0 :     gpr_free(pp);
     129           0 :     pp = next;
     130             :   }
     131        2182 : }
     132             : 
     133          22 : static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
     134             :                            grpc_subchannel **target) {
     135          20 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     136             :   pending_pick *pp;
     137          22 :   gpr_mu_lock(&p->mu);
     138          22 :   pp = p->pending_picks;
     139          22 :   p->pending_picks = NULL;
     140          65 :   while (pp != NULL) {
     141          21 :     pending_pick *next = pp->next;
     142          21 :     if (pp->target == target) {
     143          42 :       grpc_subchannel_del_interested_party(
     144          21 :           exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
     145          21 :       *target = NULL;
     146          21 :       grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
     147          21 :       gpr_free(pp);
     148             :     } else {
     149           0 :       pp->next = p->pending_picks;
     150           0 :       p->pending_picks = pp;
     151             :     }
     152          20 :     pp = next;
     153             :   }
     154          22 :   gpr_mu_unlock(&p->mu);
     155          22 : }
     156             : 
     157        2215 : static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
     158        2215 :   p->started_picking = 1;
     159        2215 :   p->checking_subchannel = 0;
     160        2215 :   p->checking_connectivity = GRPC_CHANNEL_IDLE;
     161        2215 :   GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
     162        4430 :   grpc_subchannel_notify_on_state_change(
     163        2215 :       exec_ctx, p->subchannels[p->checking_subchannel],
     164             :       &p->checking_connectivity, &p->connectivity_changed);
     165        2215 : }
     166             : 
     167         271 : void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
     168         265 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     169         271 :   gpr_mu_lock(&p->mu);
     170         271 :   if (!p->started_picking) {
     171         271 :     start_picking(exec_ctx, p);
     172             :   }
     173         271 :   gpr_mu_unlock(&p->mu);
     174         271 : }
     175             : 
     176     2114781 : int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
     177             :             grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
     178             :             grpc_closure *on_complete) {
     179     2114690 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     180             :   pending_pick *pp;
     181     2114781 :   gpr_mu_lock(&p->mu);
     182     2114923 :   if (p->selected) {
     183     2104848 :     gpr_mu_unlock(&p->mu);
     184     2104835 :     *target = p->selected;
     185     2104835 :     return 1;
     186             :   } else {
     187       10075 :     if (!p->started_picking) {
     188        1944 :       start_picking(exec_ctx, p);
     189             :     }
     190       10075 :     grpc_subchannel_add_interested_party(
     191       10075 :         exec_ctx, p->subchannels[p->checking_subchannel], pollset);
     192       10075 :     pp = gpr_malloc(sizeof(*pp));
     193       10075 :     pp->next = p->pending_picks;
     194       10075 :     pp->pollset = pollset;
     195       10075 :     pp->target = target;
     196       10075 :     pp->on_complete = on_complete;
     197       10075 :     p->pending_picks = pp;
     198       10075 :     gpr_mu_unlock(&p->mu);
     199       10075 :     return 0;
     200             :   }
     201             : }
     202             : 
     203        2127 : static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
     204             :                                 int iomgr_success) {
     205        2086 :   pick_first_lb_policy *p = arg;
     206             :   size_t i;
     207             :   grpc_transport_op op;
     208        2127 :   size_t num_subchannels = p->num_subchannels;
     209             :   grpc_subchannel **subchannels;
     210             :   grpc_subchannel *exclude_subchannel;
     211             : 
     212        2127 :   gpr_mu_lock(&p->mu);
     213        2127 :   subchannels = p->subchannels;
     214        2127 :   p->num_subchannels = 0;
     215        2127 :   p->subchannels = NULL;
     216        2127 :   exclude_subchannel = p->selected;
     217        2127 :   gpr_mu_unlock(&p->mu);
     218        2127 :   GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
     219             : 
     220        5655 :   for (i = 0; i < num_subchannels; i++) {
     221        3528 :     if (subchannels[i] != exclude_subchannel) {
     222        1401 :       memset(&op, 0, sizeof(op));
     223        1401 :       op.disconnect = 1;
     224        1401 :       grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
     225             :     }
     226        3528 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
     227             :   }
     228             : 
     229        2127 :   gpr_free(subchannels);
     230        2127 : }
     231             : 
     232        8471 : static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
     233             :                                     int iomgr_success) {
     234        8257 :   pick_first_lb_policy *p = arg;
     235             :   pending_pick *pp;
     236             : 
     237        8471 :   gpr_mu_lock(&p->mu);
     238             : 
     239        8471 :   if (p->shutdown) {
     240        2178 :     gpr_mu_unlock(&p->mu);
     241        2178 :     GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
     242       10649 :     return;
     243        6293 :   } else if (p->selected != NULL) {
     244        1428 :     grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     245             :                                 p->checking_connectivity, "selected_changed");
     246        1428 :     if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
     247        1428 :       grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
     248             :                                              &p->checking_connectivity,
     249             :                                              &p->connectivity_changed);
     250             :     } else {
     251           0 :       GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
     252             :     }
     253             :   } else {
     254             :   loop:
     255        4890 :     switch (p->checking_connectivity) {
     256             :       case GRPC_CHANNEL_READY:
     257        2127 :         grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     258             :                                     GRPC_CHANNEL_READY, "connecting_ready");
     259        2127 :         p->selected = p->subchannels[p->checking_subchannel];
     260        2127 :         GRPC_SUBCHANNEL_REF(p->selected, "picked_first");
     261             :         /* drop the pick list: we are connected now */
     262        2127 :         GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels");
     263        2127 :         grpc_exec_ctx_enqueue(exec_ctx,
     264             :                               grpc_closure_create(destroy_subchannels, p), 1);
     265             :         /* update any calls that were waiting for a pick */
     266       14128 :         while ((pp = p->pending_picks)) {
     267        9874 :           p->pending_picks = pp->next;
     268        9874 :           *pp->target = p->selected;
     269        9874 :           grpc_subchannel_del_interested_party(exec_ctx, p->selected,
     270             :                                                pp->pollset);
     271        9918 :           grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
     272        9690 :           gpr_free(pp);
     273             :         }
     274        2127 :         grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
     275             :                                                &p->checking_connectivity,
     276             :                                                &p->connectivity_changed);
     277        2127 :         break;
     278             :       case GRPC_CHANNEL_TRANSIENT_FAILURE:
     279         271 :         grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     280             :                                     GRPC_CHANNEL_TRANSIENT_FAILURE,
     281             :                                     "connecting_transient_failure");
     282         271 :         del_interested_parties_locked(exec_ctx, p);
     283         271 :         p->checking_subchannel =
     284         271 :             (p->checking_subchannel + 1) % p->num_subchannels;
     285         271 :         p->checking_connectivity = grpc_subchannel_check_connectivity(
     286         271 :             p->subchannels[p->checking_subchannel]);
     287         271 :         add_interested_parties_locked(exec_ctx, p);
     288         271 :         if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
     289         492 :           grpc_subchannel_notify_on_state_change(
     290         246 :               exec_ctx, p->subchannels[p->checking_subchannel],
     291             :               &p->checking_connectivity, &p->connectivity_changed);
     292             :         } else {
     293          23 :           goto loop;
     294             :         }
     295         246 :         break;
     296             :       case GRPC_CHANNEL_CONNECTING:
     297             :       case GRPC_CHANNEL_IDLE:
     298        2492 :         grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     299             :                                     GRPC_CHANNEL_CONNECTING,
     300             :                                     "connecting_changed");
     301        4984 :         grpc_subchannel_notify_on_state_change(
     302        2492 :             exec_ctx, p->subchannels[p->checking_subchannel],
     303             :             &p->checking_connectivity, &p->connectivity_changed);
     304        2492 :         break;
     305             :       case GRPC_CHANNEL_FATAL_FAILURE:
     306           0 :         del_interested_parties_locked(exec_ctx, p);
     307           0 :         GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
     308             :                  p->subchannels[p->num_subchannels - 1]);
     309           0 :         p->num_subchannels--;
     310           0 :         GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
     311             :                               "pick_first");
     312           0 :         if (p->num_subchannels == 0) {
     313           0 :           grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     314             :                                       GRPC_CHANNEL_FATAL_FAILURE,
     315             :                                       "no_more_channels");
     316           0 :           while ((pp = p->pending_picks)) {
     317           0 :             p->pending_picks = pp->next;
     318           0 :             *pp->target = NULL;
     319           0 :             grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
     320           0 :             gpr_free(pp);
     321             :           }
     322           0 :           GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
     323             :         } else {
     324           0 :           grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     325             :                                       GRPC_CHANNEL_TRANSIENT_FAILURE,
     326             :                                       "subchannel_failed");
     327           0 :           p->checking_subchannel %= p->num_subchannels;
     328           0 :           p->checking_connectivity = grpc_subchannel_check_connectivity(
     329           0 :               p->subchannels[p->checking_subchannel]);
     330           0 :           add_interested_parties_locked(exec_ctx, p);
     331           0 :           goto loop;
     332             :         }
     333             :     }
     334             :   }
     335             : 
     336        6293 :   gpr_mu_unlock(&p->mu);
     337             : }
     338             : 
     339        2182 : static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
     340             :                          grpc_transport_op *op) {
     341        2176 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     342             :   size_t i;
     343             :   size_t n;
     344             :   grpc_subchannel **subchannels;
     345             :   grpc_subchannel *selected;
     346             : 
     347        2182 :   gpr_mu_lock(&p->mu);
     348        2182 :   n = p->num_subchannels;
     349        2182 :   subchannels = gpr_malloc(n * sizeof(*subchannels));
     350        2182 :   selected = p->selected;
     351        2182 :   if (selected) {
     352        2090 :     GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
     353             :   }
     354        2279 :   for (i = 0; i < n; i++) {
     355         103 :     subchannels[i] = p->subchannels[i];
     356         103 :     GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
     357             :   }
     358        2182 :   gpr_mu_unlock(&p->mu);
     359             : 
     360        2285 :   for (i = 0; i < n; i++) {
     361         103 :     if (selected == subchannels[i]) continue;
     362         103 :     grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
     363         103 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
     364             :   }
     365        2182 :   if (p->selected) {
     366        2090 :     grpc_subchannel_process_transport_op(exec_ctx, selected, op);
     367        2090 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
     368             :   }
     369        2182 :   gpr_free(subchannels);
     370        2182 : }
     371             : 
     372        2220 : static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
     373             :                                                      grpc_lb_policy *pol) {
     374        2176 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     375             :   grpc_connectivity_state st;
     376        2220 :   gpr_mu_lock(&p->mu);
     377        2220 :   st = grpc_connectivity_state_check(&p->state_tracker);
     378        2220 :   gpr_mu_unlock(&p->mu);
     379        2220 :   return st;
     380             : }
     381             : 
     382        8509 : void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
     383             :                                grpc_connectivity_state *current,
     384             :                                grpc_closure *notify) {
     385        8257 :   pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
     386        8509 :   gpr_mu_lock(&p->mu);
     387        8509 :   grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
     388             :                                                  current, notify);
     389        8509 :   gpr_mu_unlock(&p->mu);
     390        8509 : }
     391             : 
     392             : static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
     393             :     pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle,
     394             :     pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
     395             : 
     396        3452 : static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
     397             : 
     398        3450 : static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
     399             : 
     400        2338 : static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
     401             :                                          grpc_lb_policy_args *args) {
     402        2338 :   pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
     403        2338 :   GPR_ASSERT(args->num_subchannels > 0);
     404        2338 :   memset(p, 0, sizeof(*p));
     405        2338 :   grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
     406        2338 :   p->subchannels =
     407        2338 :       gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
     408        2338 :   p->num_subchannels = args->num_subchannels;
     409        2338 :   grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
     410             :                                "pick_first");
     411        2338 :   memcpy(p->subchannels, args->subchannels,
     412        2338 :          sizeof(grpc_subchannel *) * args->num_subchannels);
     413        2338 :   grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
     414        2338 :   gpr_mu_init(&p->mu);
     415        2338 :   return &p->base;
     416             : }
     417             : 
     418             : static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = {
     419             :     pick_first_factory_ref, pick_first_factory_unref, create_pick_first,
     420             :     "pick_first"};
     421             : 
     422             : static grpc_lb_policy_factory pick_first_lb_policy_factory = {
     423             :     &pick_first_factory_vtable};
     424             : 
     425        6904 : grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() {
     426        6904 :   return &pick_first_lb_policy_factory;
     427             : }

Generated by: LCOV version 1.11