LCOV - code coverage report
Current view: top level - core/client_config/lb_policies - round_robin.c (source / functions) Hit Total Coverage
Test: tmp.CaZ6RjdVn2 Lines: 239 276 86.6 %
Date: 2015-12-10 22:15:08 Functions: 18 20 90.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/client_config/lb_policies/round_robin.h"
      35             : 
      36             : #include <string.h>
      37             : 
      38             : #include <grpc/support/alloc.h>
      39             : #include "src/core/transport/connectivity_state.h"
      40             : 
      41             : int grpc_lb_round_robin_trace = 0;
      42             : 
      43             : /** List of entities waiting for a pick.
      44             :  *
      45             :  * Once a pick is available, \a target is updated and \a on_complete called. */
      46             : typedef struct pending_pick {
      47             :   struct pending_pick *next;
      48             :   grpc_pollset *pollset;
      49             :   grpc_subchannel **target;
      50             :   grpc_closure *on_complete;
      51             : } pending_pick;
      52             : 
      53             : /** List of subchannels in a connectivity READY state */
      54             : typedef struct ready_list {
      55             :   grpc_subchannel *subchannel;
      56             :   struct ready_list *next;
      57             :   struct ready_list *prev;
      58             : } ready_list;
      59             : 
      60             : typedef struct {
      61             :   size_t subchannel_idx; /**< Index over p->subchannels */
      62             :   void *p;               /**< round_robin_lb_policy instance */
      63             : } connectivity_changed_cb_arg;
      64             : 
      65             : typedef struct {
      66             :   /** base policy: must be first */
      67             :   grpc_lb_policy base;
      68             : 
      69             :   /** all our subchannels */
      70             :   grpc_subchannel **subchannels;
      71             :   size_t num_subchannels;
      72             : 
      73             :   /** Callbacks, one per subchannel being watched, to be called when their
      74             :    * respective connectivity changes */
      75             :   grpc_closure *connectivity_changed_cbs;
      76             :   connectivity_changed_cb_arg *cb_args;
      77             : 
      78             :   /** mutex protecting remaining members */
      79             :   gpr_mu mu;
      80             :   /** have we started picking? */
      81             :   int started_picking;
      82             :   /** are we shutting down? */
      83             :   int shutdown;
      84             :   /** Connectivity state of the subchannels being watched */
      85             :   grpc_connectivity_state *subchannel_connectivity;
      86             :   /** List of picks that are waiting on connectivity */
      87             :   pending_pick *pending_picks;
      88             : 
      89             :   /** our connectivity state tracker */
      90             :   grpc_connectivity_state_tracker state_tracker;
      91             : 
      92             :   /** (Dummy) root of the doubly linked list containing READY subchannels */
      93             :   ready_list ready_list;
      94             :   /** Last pick from the ready list. */
      95             :   ready_list *ready_list_last_pick;
      96             : 
      97             :   /** Subchannel index to ready_list node.
      98             :    *
      99             :    * Kept in order to remove nodes from the ready list associated with a
     100             :    * subchannel */
     101             :   ready_list **subchannel_index_to_readylist_node;
     102             : } round_robin_lb_policy;
     103             : 
     104             : /** Returns the next subchannel from the connected list or NULL if the list is
     105             :  * empty.
     106             :  *
     107             :  * Note that this function does *not* advance p->ready_list_last_pick. Use \a
     108             :  * advance_last_picked_locked() for that. */
     109          70 : static ready_list *peek_next_connected_locked(const round_robin_lb_policy *p) {
     110             :   ready_list *selected;
     111          70 :   selected = p->ready_list_last_pick->next;
     112             : 
     113         147 :   while (selected != NULL) {
     114          57 :     if (selected == &p->ready_list) {
     115           7 :       GPR_ASSERT(selected->subchannel == NULL);
     116             :       /* skip dummy root */
     117           7 :       selected = selected->next;
     118             :     } else {
     119          50 :       GPR_ASSERT(selected->subchannel != NULL);
     120          50 :       return selected;
     121             :     }
     122             :   }
     123          20 :   return NULL;
     124             : }
     125             : 
     126             : /** Advance the \a ready_list picking head. */
     127          35 : static void advance_last_picked_locked(round_robin_lb_policy *p) {
     128          35 :   if (p->ready_list_last_pick->next != NULL) { /* non-empty list */
     129          35 :     p->ready_list_last_pick = p->ready_list_last_pick->next;
     130          35 :     if (p->ready_list_last_pick == &p->ready_list) {
     131             :       /* skip dummy root */
     132           7 :       p->ready_list_last_pick = p->ready_list_last_pick->next;
     133             :     }
     134             :   } else { /* should be an empty list */
     135           0 :     GPR_ASSERT(p->ready_list_last_pick == &p->ready_list);
     136             :   }
     137             : 
     138          35 :   if (grpc_lb_round_robin_trace) {
     139           0 :     gpr_log(GPR_DEBUG, "[READYLIST] ADVANCED LAST PICK. NOW AT NODE %p (SC %p)",
     140           0 :             p->ready_list_last_pick, p->ready_list_last_pick->subchannel);
     141             :   }
     142          35 : }
     143             : 
     144             : /** Prepends (relative to the root at p->ready_list) the connected subchannel \a
     145             :  * csc to the list of ready subchannels. */
     146          20 : static ready_list *add_connected_sc_locked(round_robin_lb_policy *p,
     147             :                                            grpc_subchannel *csc) {
     148          20 :   ready_list *new_elem = gpr_malloc(sizeof(ready_list));
     149          20 :   new_elem->subchannel = csc;
     150          20 :   if (p->ready_list.prev == NULL) {
     151             :     /* first element */
     152           5 :     new_elem->next = &p->ready_list;
     153           5 :     new_elem->prev = &p->ready_list;
     154           5 :     p->ready_list.next = new_elem;
     155           5 :     p->ready_list.prev = new_elem;
     156             :   } else {
     157          15 :     new_elem->next = &p->ready_list;
     158          15 :     new_elem->prev = p->ready_list.prev;
     159          15 :     p->ready_list.prev->next = new_elem;
     160          15 :     p->ready_list.prev = new_elem;
     161             :   }
     162          20 :   if (grpc_lb_round_robin_trace) {
     163           0 :     gpr_log(GPR_DEBUG, "[READYLIST] ADDING NODE %p (SC %p)", new_elem, csc);
     164             :   }
     165          20 :   return new_elem;
     166             : }
     167             : 
     168             : /** Removes \a node from the list of connected subchannels */
     169          10 : static void remove_disconnected_sc_locked(round_robin_lb_policy *p,
     170             :                                           ready_list *node) {
     171          10 :   if (node == NULL) {
     172          10 :     return;
     173             :   }
     174          10 :   if (node == p->ready_list_last_pick) {
     175             :     /* If removing the lastly picked node, reset the last pick pointer to the
     176             :      * dummy root of the list */
     177           3 :     p->ready_list_last_pick = &p->ready_list;
     178             :   }
     179             : 
     180             :   /* removing last item */
     181          10 :   if (node->next == &p->ready_list && node->prev == &p->ready_list) {
     182           2 :     GPR_ASSERT(p->ready_list.next == node);
     183           2 :     GPR_ASSERT(p->ready_list.prev == node);
     184           2 :     p->ready_list.next = NULL;
     185           2 :     p->ready_list.prev = NULL;
     186             :   } else {
     187           8 :     node->prev->next = node->next;
     188           8 :     node->next->prev = node->prev;
     189             :   }
     190             : 
     191          10 :   if (grpc_lb_round_robin_trace) {
     192           0 :     gpr_log(GPR_DEBUG, "[READYLIST] REMOVED NODE %p (SC %p)", node,
     193             :             node->subchannel);
     194             :   }
     195             : 
     196          10 :   node->next = NULL;
     197          10 :   node->prev = NULL;
     198          10 :   node->subchannel = NULL;
     199             : 
     200          10 :   gpr_free(node);
     201             : }
     202             : 
     203         182 : static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
     204             :                                           round_robin_lb_policy *p,
     205             :                                           const size_t subchannel_idx) {
     206             :   pending_pick *pp;
     207         302 :   for (pp = p->pending_picks; pp; pp = pp->next) {
     208         240 :     grpc_subchannel_del_interested_party(
     209         120 :         exec_ctx, p->subchannels[subchannel_idx], pp->pollset);
     210             :   }
     211         182 : }
     212             : 
     213           5 : void rr_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
     214           5 :   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
     215             :   size_t i;
     216             :   ready_list *elem;
     217          25 :   for (i = 0; i < p->num_subchannels; i++) {
     218          20 :     del_interested_parties_locked(exec_ctx, p, i);
     219             :   }
     220          25 :   for (i = 0; i < p->num_subchannels; i++) {
     221          20 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "round_robin");
     222             :   }
     223           5 :   gpr_free(p->connectivity_changed_cbs);
     224           5 :   gpr_free(p->subchannel_connectivity);
     225             : 
     226           5 :   grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
     227           5 :   gpr_free(p->subchannels);
     228           5 :   gpr_mu_destroy(&p->mu);
     229             : 
     230           5 :   elem = p->ready_list.next;
     231          20 :   while (elem != NULL && elem != &p->ready_list) {
     232             :     ready_list *tmp;
     233          10 :     tmp = elem->next;
     234          10 :     elem->next = NULL;
     235          10 :     elem->prev = NULL;
     236          10 :     elem->subchannel = NULL;
     237          10 :     gpr_free(elem);
     238          10 :     elem = tmp;
     239             :   }
     240           5 :   gpr_free(p->subchannel_index_to_readylist_node);
     241           5 :   gpr_free(p->cb_args);
     242           5 :   gpr_free(p);
     243           5 : }
     244             : 
     245           5 : void rr_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
     246             :   size_t i;
     247           5 :   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
     248             :   pending_pick *pp;
     249           5 :   gpr_mu_lock(&p->mu);
     250             : 
     251          25 :   for (i = 0; i < p->num_subchannels; i++) {
     252          20 :     del_interested_parties_locked(exec_ctx, p, i);
     253             :   }
     254             : 
     255           5 :   p->shutdown = 1;
     256          10 :   while ((pp = p->pending_picks)) {
     257           0 :     p->pending_picks = pp->next;
     258           0 :     *pp->target = NULL;
     259           0 :     grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
     260           0 :     gpr_free(pp);
     261             :   }
     262           5 :   grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     263             :                               GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
     264           5 :   gpr_mu_unlock(&p->mu);
     265           5 : }
     266             : 
     267          15 : static void rr_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
     268             :                            grpc_subchannel **target) {
     269          15 :   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
     270             :   pending_pick *pp;
     271             :   size_t i;
     272          15 :   gpr_mu_lock(&p->mu);
     273          15 :   pp = p->pending_picks;
     274          15 :   p->pending_picks = NULL;
     275          45 :   while (pp != NULL) {
     276          15 :     pending_pick *next = pp->next;
     277          15 :     if (pp->target == target) {
     278          75 :       for (i = 0; i < p->num_subchannels; i++) {
     279          60 :         grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i],
     280             :                                              pp->pollset);
     281             :       }
     282          15 :       *target = NULL;
     283          15 :       grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
     284          15 :       gpr_free(pp);
     285             :     } else {
     286           0 :       pp->next = p->pending_picks;
     287           0 :       p->pending_picks = pp;
     288             :     }
     289          15 :     pp = next;
     290             :   }
     291          15 :   gpr_mu_unlock(&p->mu);
     292          15 : }
     293             : 
     294           5 : static void start_picking(grpc_exec_ctx *exec_ctx, round_robin_lb_policy *p) {
     295             :   size_t i;
     296           5 :   p->started_picking = 1;
     297             : 
     298          25 :   for (i = 0; i < p->num_subchannels; i++) {
     299          20 :     p->subchannel_connectivity[i] = GRPC_CHANNEL_IDLE;
     300          40 :     grpc_subchannel_notify_on_state_change(exec_ctx, p->subchannels[i],
     301          20 :                                            &p->subchannel_connectivity[i],
     302          20 :                                            &p->connectivity_changed_cbs[i]);
     303          20 :     GRPC_LB_POLICY_REF(&p->base, "round_robin_connectivity");
     304             :   }
     305           5 : }
     306             : 
     307           0 : void rr_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
     308           0 :   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
     309           0 :   gpr_mu_lock(&p->mu);
     310           0 :   if (!p->started_picking) {
     311           0 :     start_picking(exec_ctx, p);
     312             :   }
     313           0 :   gpr_mu_unlock(&p->mu);
     314           0 : }
     315             : 
     316          50 : int rr_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
     317             :             grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
     318             :             grpc_closure *on_complete) {
     319             :   size_t i;
     320          50 :   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
     321             :   pending_pick *pp;
     322             :   ready_list *selected;
     323          50 :   gpr_mu_lock(&p->mu);
     324          50 :   if ((selected = peek_next_connected_locked(p))) {
     325          30 :     gpr_mu_unlock(&p->mu);
     326          30 :     *target = selected->subchannel;
     327          30 :     if (grpc_lb_round_robin_trace) {
     328           0 :       gpr_log(GPR_DEBUG, "[RR PICK] TARGET <-- SUBCHANNEL %p (NODE %p)",
     329             :               selected->subchannel, selected);
     330             :     }
     331             :     /* only advance the last picked pointer if the selection was used */
     332          30 :     advance_last_picked_locked(p);
     333          30 :     return 1;
     334             :   } else {
     335          20 :     if (!p->started_picking) {
     336           5 :       start_picking(exec_ctx, p);
     337             :     }
     338         100 :     for (i = 0; i < p->num_subchannels; i++) {
     339          80 :       grpc_subchannel_add_interested_party(exec_ctx, p->subchannels[i],
     340             :                                            pollset);
     341             :     }
     342          20 :     pp = gpr_malloc(sizeof(*pp));
     343          20 :     pp->next = p->pending_picks;
     344          20 :     pp->pollset = pollset;
     345          20 :     pp->target = target;
     346          20 :     pp->on_complete = on_complete;
     347          20 :     p->pending_picks = pp;
     348          20 :     gpr_mu_unlock(&p->mu);
     349          20 :     return 0;
     350             :   }
     351             : }
     352             : 
     353         352 : static void rr_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
     354             :                                     int iomgr_success) {
     355         352 :   connectivity_changed_cb_arg *cb_arg = arg;
     356         352 :   round_robin_lb_policy *p = cb_arg->p;
     357             :   /* index over p->subchannels of this cb's subchannel */
     358         352 :   const size_t this_idx = cb_arg->subchannel_idx;
     359             :   pending_pick *pp;
     360             :   ready_list *selected;
     361             : 
     362         352 :   int unref = 0;
     363             : 
     364             :   /* connectivity state of this cb's subchannel */
     365             :   grpc_connectivity_state *this_connectivity;
     366             : 
     367         352 :   gpr_mu_lock(&p->mu);
     368             : 
     369         352 :   this_connectivity = &p->subchannel_connectivity[this_idx];
     370             : 
     371         352 :   if (p->shutdown) {
     372          20 :     unref = 1;
     373             :   } else {
     374         332 :     switch (*this_connectivity) {
     375             :       case GRPC_CHANNEL_READY:
     376          20 :         grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     377             :                                     GRPC_CHANNEL_READY, "connecting_ready");
     378             :         /* add the newly connected subchannel to the list of connected ones.
     379             :          * Note that it goes to the "end of the line". */
     380          40 :         p->subchannel_index_to_readylist_node[this_idx] =
     381          20 :             add_connected_sc_locked(p, p->subchannels[this_idx]);
     382             :         /* at this point we know there's at least one suitable subchannel. Go
     383             :          * ahead and pick one and notify the pending suitors in
     384             :          * p->pending_picks. This preemtively replicates rr_pick()'s actions. */
     385          20 :         selected = peek_next_connected_locked(p);
     386          20 :         if (p->pending_picks != NULL) {
     387             :           /* if the selected subchannel is going to be used for the pending
     388             :            * picks, update the last picked pointer */
     389           5 :           advance_last_picked_locked(p);
     390             :         }
     391          45 :         while ((pp = p->pending_picks)) {
     392           5 :           p->pending_picks = pp->next;
     393           5 :           *pp->target = selected->subchannel;
     394           5 :           if (grpc_lb_round_robin_trace) {
     395           0 :             gpr_log(GPR_DEBUG,
     396             :                     "[RR CONN CHANGED] TARGET <-- SUBCHANNEL %p (NODE %p)",
     397             :                     selected->subchannel, selected);
     398             :           }
     399           5 :           grpc_subchannel_del_interested_party(exec_ctx, selected->subchannel,
     400             :                                                pp->pollset);
     401           5 :           grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
     402           5 :           gpr_free(pp);
     403             :         }
     404          40 :         grpc_subchannel_notify_on_state_change(
     405          20 :             exec_ctx, p->subchannels[this_idx], this_connectivity,
     406          20 :             &p->connectivity_changed_cbs[this_idx]);
     407          20 :         break;
     408             :       case GRPC_CHANNEL_CONNECTING:
     409             :       case GRPC_CHANNEL_IDLE:
     410         170 :         grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     411             :                                     *this_connectivity, "connecting_changed");
     412         340 :         grpc_subchannel_notify_on_state_change(
     413         170 :             exec_ctx, p->subchannels[this_idx], this_connectivity,
     414         170 :             &p->connectivity_changed_cbs[this_idx]);
     415         170 :         break;
     416             :       case GRPC_CHANNEL_TRANSIENT_FAILURE:
     417         142 :         del_interested_parties_locked(exec_ctx, p, this_idx);
     418             :         /* renew state notification */
     419         284 :         grpc_subchannel_notify_on_state_change(
     420         142 :             exec_ctx, p->subchannels[this_idx], this_connectivity,
     421         142 :             &p->connectivity_changed_cbs[this_idx]);
     422             : 
     423             :         /* remove from ready list if still present */
     424         142 :         if (p->subchannel_index_to_readylist_node[this_idx] != NULL) {
     425          10 :           remove_disconnected_sc_locked(
     426          10 :               p, p->subchannel_index_to_readylist_node[this_idx]);
     427          10 :           p->subchannel_index_to_readylist_node[this_idx] = NULL;
     428             :         }
     429         142 :         grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     430             :                                     GRPC_CHANNEL_TRANSIENT_FAILURE,
     431             :                                     "connecting_transient_failure");
     432         142 :         break;
     433             :       case GRPC_CHANNEL_FATAL_FAILURE:
     434           0 :         del_interested_parties_locked(exec_ctx, p, this_idx);
     435           0 :         if (p->subchannel_index_to_readylist_node[this_idx] != NULL) {
     436           0 :           remove_disconnected_sc_locked(
     437           0 :               p, p->subchannel_index_to_readylist_node[this_idx]);
     438           0 :           p->subchannel_index_to_readylist_node[this_idx] = NULL;
     439             :         }
     440             : 
     441           0 :         GPR_SWAP(grpc_subchannel *, p->subchannels[this_idx],
     442             :                  p->subchannels[p->num_subchannels - 1]);
     443           0 :         p->num_subchannels--;
     444           0 :         GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
     445             :                               "round_robin");
     446             : 
     447           0 :         if (p->num_subchannels == 0) {
     448           0 :           grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     449             :                                       GRPC_CHANNEL_FATAL_FAILURE,
     450             :                                       "no_more_channels");
     451           0 :           while ((pp = p->pending_picks)) {
     452           0 :             p->pending_picks = pp->next;
     453           0 :             *pp->target = NULL;
     454           0 :             grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
     455           0 :             gpr_free(pp);
     456             :           }
     457           0 :           unref = 1;
     458             :         } else {
     459           0 :           grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
     460             :                                       GRPC_CHANNEL_TRANSIENT_FAILURE,
     461             :                                       "subchannel_failed");
     462             :         }
     463             :     } /* switch */
     464             :   }   /* !unref */
     465             : 
     466         352 :   gpr_mu_unlock(&p->mu);
     467             : 
     468         352 :   if (unref) {
     469          20 :     GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "round_robin_connectivity");
     470             :   }
     471         352 : }
     472             : 
     473           5 : static void rr_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
     474             :                          grpc_transport_op *op) {
     475           5 :   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
     476             :   size_t i;
     477             :   size_t n;
     478             :   grpc_subchannel **subchannels;
     479             : 
     480           5 :   gpr_mu_lock(&p->mu);
     481           5 :   n = p->num_subchannels;
     482           5 :   subchannels = gpr_malloc(n * sizeof(*subchannels));
     483          25 :   for (i = 0; i < n; i++) {
     484          20 :     subchannels[i] = p->subchannels[i];
     485          20 :     GRPC_SUBCHANNEL_REF(subchannels[i], "rr_broadcast");
     486             :   }
     487           5 :   gpr_mu_unlock(&p->mu);
     488             : 
     489          25 :   for (i = 0; i < n; i++) {
     490          20 :     grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
     491          20 :     GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "rr_broadcast");
     492             :   }
     493           5 :   gpr_free(subchannels);
     494           5 : }
     495             : 
     496           5 : static grpc_connectivity_state rr_check_connectivity(grpc_exec_ctx *exec_ctx,
     497             :                                                      grpc_lb_policy *pol) {
     498           5 :   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
     499             :   grpc_connectivity_state st;
     500           5 :   gpr_mu_lock(&p->mu);
     501           5 :   st = grpc_connectivity_state_check(&p->state_tracker);
     502           5 :   gpr_mu_unlock(&p->mu);
     503           5 :   return st;
     504             : }
     505             : 
     506          97 : static void rr_notify_on_state_change(grpc_exec_ctx *exec_ctx,
     507             :                                       grpc_lb_policy *pol,
     508             :                                       grpc_connectivity_state *current,
     509             :                                       grpc_closure *notify) {
     510          97 :   round_robin_lb_policy *p = (round_robin_lb_policy *)pol;
     511          97 :   gpr_mu_lock(&p->mu);
     512          97 :   grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
     513             :                                                  current, notify);
     514          97 :   gpr_mu_unlock(&p->mu);
     515          97 : }
     516             : 
     517             : static const grpc_lb_policy_vtable round_robin_lb_policy_vtable = {
     518             :     rr_destroy, rr_shutdown, rr_pick, rr_cancel_pick, rr_exit_idle,
     519             :     rr_broadcast, rr_check_connectivity, rr_notify_on_state_change};
     520             : 
     521        3452 : static void round_robin_factory_ref(grpc_lb_policy_factory *factory) {}
     522             : 
     523        3450 : static void round_robin_factory_unref(grpc_lb_policy_factory *factory) {}
     524             : 
     525           5 : static grpc_lb_policy *create_round_robin(grpc_lb_policy_factory *factory,
     526             :                                           grpc_lb_policy_args *args) {
     527             :   size_t i;
     528           5 :   round_robin_lb_policy *p = gpr_malloc(sizeof(*p));
     529           5 :   GPR_ASSERT(args->num_subchannels > 0);
     530           5 :   memset(p, 0, sizeof(*p));
     531           5 :   grpc_lb_policy_init(&p->base, &round_robin_lb_policy_vtable);
     532           5 :   p->subchannels =
     533           5 :       gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
     534           5 :   p->num_subchannels = args->num_subchannels;
     535           5 :   grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
     536             :                                "round_robin");
     537           5 :   memcpy(p->subchannels, args->subchannels,
     538           5 :          sizeof(grpc_subchannel *) * args->num_subchannels);
     539             : 
     540           5 :   gpr_mu_init(&p->mu);
     541           5 :   p->connectivity_changed_cbs =
     542           5 :       gpr_malloc(sizeof(grpc_closure) * args->num_subchannels);
     543           5 :   p->subchannel_connectivity =
     544           5 :       gpr_malloc(sizeof(grpc_connectivity_state) * args->num_subchannels);
     545             : 
     546           5 :   p->cb_args =
     547           5 :       gpr_malloc(sizeof(connectivity_changed_cb_arg) * args->num_subchannels);
     548          25 :   for (i = 0; i < args->num_subchannels; i++) {
     549          20 :     p->cb_args[i].subchannel_idx = i;
     550          20 :     p->cb_args[i].p = p;
     551          20 :     grpc_closure_init(&p->connectivity_changed_cbs[i], rr_connectivity_changed,
     552          20 :                       &p->cb_args[i]);
     553             :   }
     554             : 
     555             :   /* The (dummy node) root of the ready list */
     556           5 :   p->ready_list.subchannel = NULL;
     557           5 :   p->ready_list.prev = NULL;
     558           5 :   p->ready_list.next = NULL;
     559           5 :   p->ready_list_last_pick = &p->ready_list;
     560             : 
     561           5 :   p->subchannel_index_to_readylist_node =
     562           5 :       gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
     563           5 :   memset(p->subchannel_index_to_readylist_node, 0,
     564           5 :          sizeof(grpc_subchannel *) * args->num_subchannels);
     565           5 :   return &p->base;
     566             : }
     567             : 
     568             : static const grpc_lb_policy_factory_vtable round_robin_factory_vtable = {
     569             :     round_robin_factory_ref, round_robin_factory_unref, create_round_robin,
     570             :     "round_robin"};
     571             : 
     572             : static grpc_lb_policy_factory round_robin_lb_policy_factory = {
     573             :     &round_robin_factory_vtable};
     574             : 
     575        3452 : grpc_lb_policy_factory *grpc_round_robin_lb_factory_create() {
     576        3452 :   return &round_robin_lb_policy_factory;
     577             : }

Generated by: LCOV version 1.11