Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/client_config/lb_policy_factory.h"
35 : #include "src/core/client_config/lb_policies/pick_first.h"
36 :
37 : #include <string.h>
38 :
39 : #include <grpc/support/alloc.h>
40 : #include "src/core/transport/connectivity_state.h"
41 :
42 : typedef struct pending_pick {
43 : struct pending_pick *next;
44 : grpc_pollset *pollset;
45 : grpc_subchannel **target;
46 : grpc_closure *on_complete;
47 : } pending_pick;
48 :
49 : typedef struct {
50 : /** base policy: must be first */
51 : grpc_lb_policy base;
52 : /** all our subchannels */
53 : grpc_subchannel **subchannels;
54 : size_t num_subchannels;
55 :
56 : grpc_closure connectivity_changed;
57 :
58 : /** mutex protecting remaining members */
59 : gpr_mu mu;
60 : /** the selected channel
61 : TODO(ctiller): this should be atomically set so we don't
62 : need to take a mutex in the common case */
63 : grpc_subchannel *selected;
64 : /** have we started picking? */
65 : int started_picking;
66 : /** are we shut down? */
67 : int shutdown;
68 : /** which subchannel are we watching? */
69 : size_t checking_subchannel;
70 : /** what is the connectivity of that channel? */
71 : grpc_connectivity_state checking_connectivity;
72 : /** list of picks that are waiting on connectivity */
73 : pending_pick *pending_picks;
74 :
75 : /** our connectivity state tracker */
76 : grpc_connectivity_state_tracker state_tracker;
77 : } pick_first_lb_policy;
78 :
79 1451 : static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
80 : pick_first_lb_policy *p) {
81 : pending_pick *pp;
82 1523 : for (pp = p->pending_picks; pp; pp = pp->next) {
83 144 : grpc_subchannel_del_interested_party(
84 72 : exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
85 : }
86 1451 : }
87 :
88 70 : static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx,
89 : pick_first_lb_policy *p) {
90 : pending_pick *pp;
91 122 : for (pp = p->pending_picks; pp; pp = pp->next) {
92 104 : grpc_subchannel_add_interested_party(
93 52 : exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
94 : }
95 70 : }
96 :
97 1440 : void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
98 1440 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
99 : size_t i;
100 1440 : GPR_ASSERT(p->pending_picks == NULL);
101 1526 : for (i = 0; i < p->num_subchannels; i++) {
102 86 : GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
103 : }
104 1440 : if (p->selected) {
105 1354 : GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
106 : }
107 1440 : grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
108 1440 : gpr_free(p->subchannels);
109 1440 : gpr_mu_destroy(&p->mu);
110 1440 : gpr_free(p);
111 1440 : }
112 :
113 1381 : void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
114 1381 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
115 : pending_pick *pp;
116 1381 : gpr_mu_lock(&p->mu);
117 1381 : del_interested_parties_locked(exec_ctx, p);
118 1381 : p->shutdown = 1;
119 1381 : pp = p->pending_picks;
120 1381 : p->pending_picks = NULL;
121 1381 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
122 : GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
123 1381 : gpr_mu_unlock(&p->mu);
124 2782 : while (pp != NULL) {
125 20 : pending_pick *next = pp->next;
126 20 : *pp->target = NULL;
127 20 : grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
128 20 : gpr_free(pp);
129 20 : pp = next;
130 : }
131 1381 : }
132 :
133 1377 : static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
134 1377 : p->started_picking = 1;
135 1377 : p->checking_subchannel = 0;
136 1377 : p->checking_connectivity = GRPC_CHANNEL_IDLE;
137 1377 : GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
138 2754 : grpc_subchannel_notify_on_state_change(
139 1377 : exec_ctx, p->subchannels[p->checking_subchannel],
140 : &p->checking_connectivity, &p->connectivity_changed);
141 1377 : }
142 :
143 17 : void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
144 17 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
145 17 : gpr_mu_lock(&p->mu);
146 17 : if (!p->started_picking) {
147 17 : start_picking(exec_ctx, p);
148 : }
149 17 : gpr_mu_unlock(&p->mu);
150 17 : }
151 :
152 1401079 : void pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
153 : grpc_pollset *pollset, grpc_metadata_batch *initial_metadata,
154 : grpc_subchannel **target, grpc_closure *on_complete) {
155 1401079 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
156 : pending_pick *pp;
157 1401079 : gpr_mu_lock(&p->mu);
158 1402342 : if (p->selected) {
159 1391632 : gpr_mu_unlock(&p->mu);
160 1391619 : *target = p->selected;
161 1391619 : grpc_exec_ctx_enqueue(exec_ctx, on_complete, 1);
162 : } else {
163 10710 : if (!p->started_picking) {
164 1360 : start_picking(exec_ctx, p);
165 : }
166 10710 : grpc_subchannel_add_interested_party(
167 10710 : exec_ctx, p->subchannels[p->checking_subchannel], pollset);
168 10710 : pp = gpr_malloc(sizeof(*pp));
169 10710 : pp->next = p->pending_picks;
170 10710 : pp->pollset = pollset;
171 10710 : pp->target = target;
172 10710 : pp->on_complete = on_complete;
173 10710 : p->pending_picks = pp;
174 10710 : gpr_mu_unlock(&p->mu);
175 : }
176 1401926 : }
177 :
178 1354 : static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
179 : int iomgr_success) {
180 1354 : pick_first_lb_policy *p = arg;
181 : size_t i;
182 : grpc_transport_op op;
183 1354 : size_t num_subchannels = p->num_subchannels;
184 : grpc_subchannel **subchannels;
185 : grpc_subchannel *exclude_subchannel;
186 :
187 1354 : gpr_mu_lock(&p->mu);
188 1354 : subchannels = p->subchannels;
189 1354 : p->num_subchannels = 0;
190 1354 : p->subchannels = NULL;
191 1354 : exclude_subchannel = p->selected;
192 1354 : gpr_mu_unlock(&p->mu);
193 1354 : GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
194 :
195 2718 : for (i = 0; i < num_subchannels; i++) {
196 1364 : if (subchannels[i] != exclude_subchannel) {
197 10 : memset(&op, 0, sizeof(op));
198 10 : op.disconnect = 1;
199 10 : grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
200 : }
201 1364 : GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
202 : }
203 :
204 1354 : gpr_free(subchannels);
205 1354 : }
206 :
207 5640 : static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
208 : int iomgr_success) {
209 5640 : pick_first_lb_policy *p = arg;
210 : pending_pick *pp;
211 :
212 5640 : gpr_mu_lock(&p->mu);
213 :
214 5640 : if (p->shutdown) {
215 1377 : gpr_mu_unlock(&p->mu);
216 1377 : GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
217 7017 : return;
218 4263 : } else if (p->selected != NULL) {
219 1405 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
220 : p->checking_connectivity, "selected_changed");
221 1405 : if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
222 1405 : grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
223 : &p->checking_connectivity,
224 : &p->connectivity_changed);
225 : } else {
226 0 : GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
227 : }
228 : } else {
229 : loop:
230 2860 : switch (p->checking_connectivity) {
231 : case GRPC_CHANNEL_READY:
232 1354 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
233 : GRPC_CHANNEL_READY, "connecting_ready");
234 1354 : p->selected = p->subchannels[p->checking_subchannel];
235 1354 : GRPC_SUBCHANNEL_REF(p->selected, "picked_first");
236 : /* drop the pick list: we are connected now */
237 1354 : GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels");
238 1354 : grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(destroy_subchannels, p), 1);
239 : /* update any calls that were waiting for a pick */
240 13257 : while ((pp = p->pending_picks)) {
241 10549 : p->pending_picks = pp->next;
242 10549 : *pp->target = p->selected;
243 10549 : grpc_subchannel_del_interested_party(exec_ctx, p->selected,
244 : pp->pollset);
245 10538 : grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
246 10484 : gpr_free(pp);
247 : }
248 1354 : grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
249 : &p->checking_connectivity,
250 : &p->connectivity_changed);
251 1354 : break;
252 : case GRPC_CHANNEL_TRANSIENT_FAILURE:
253 70 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
254 : GRPC_CHANNEL_TRANSIENT_FAILURE,
255 : "connecting_transient_failure");
256 70 : del_interested_parties_locked(exec_ctx, p);
257 70 : p->checking_subchannel =
258 70 : (p->checking_subchannel + 1) % p->num_subchannels;
259 70 : p->checking_connectivity = grpc_subchannel_check_connectivity(
260 70 : p->subchannels[p->checking_subchannel]);
261 70 : add_interested_parties_locked(exec_ctx, p);
262 70 : if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
263 136 : grpc_subchannel_notify_on_state_change(
264 68 : exec_ctx, p->subchannels[p->checking_subchannel],
265 : &p->checking_connectivity, &p->connectivity_changed);
266 : } else {
267 2 : goto loop;
268 : }
269 68 : break;
270 : case GRPC_CHANNEL_CONNECTING:
271 : case GRPC_CHANNEL_IDLE:
272 1436 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
273 : GRPC_CHANNEL_CONNECTING,
274 : "connecting_changed");
275 2872 : grpc_subchannel_notify_on_state_change(
276 1436 : exec_ctx, p->subchannels[p->checking_subchannel],
277 : &p->checking_connectivity, &p->connectivity_changed);
278 1436 : break;
279 : case GRPC_CHANNEL_FATAL_FAILURE:
280 0 : del_interested_parties_locked(exec_ctx, p);
281 0 : GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
282 : p->subchannels[p->num_subchannels - 1]);
283 0 : p->num_subchannels--;
284 0 : GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
285 : "pick_first");
286 0 : if (p->num_subchannels == 0) {
287 0 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
288 : GRPC_CHANNEL_FATAL_FAILURE,
289 : "no_more_channels");
290 0 : while ((pp = p->pending_picks)) {
291 0 : p->pending_picks = pp->next;
292 0 : *pp->target = NULL;
293 0 : grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
294 0 : gpr_free(pp);
295 : }
296 0 : GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
297 : } else {
298 0 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
299 : GRPC_CHANNEL_TRANSIENT_FAILURE,
300 : "subchannel_failed");
301 0 : p->checking_subchannel %= p->num_subchannels;
302 0 : p->checking_connectivity = grpc_subchannel_check_connectivity(
303 0 : p->subchannels[p->checking_subchannel]);
304 0 : add_interested_parties_locked(exec_ctx, p);
305 0 : goto loop;
306 : }
307 : }
308 : }
309 :
310 4263 : gpr_mu_unlock(&p->mu);
311 : }
312 :
313 1381 : static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
314 : grpc_transport_op *op) {
315 1381 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
316 : size_t i;
317 : size_t n;
318 : grpc_subchannel **subchannels;
319 : grpc_subchannel *selected;
320 :
321 1381 : gpr_mu_lock(&p->mu);
322 1381 : n = p->num_subchannels;
323 1381 : subchannels = gpr_malloc(n * sizeof(*subchannels));
324 1381 : selected = p->selected;
325 1381 : if (selected) {
326 1354 : GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
327 : }
328 1408 : for (i = 0; i < n; i++) {
329 27 : subchannels[i] = p->subchannels[i];
330 27 : GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
331 : }
332 1381 : gpr_mu_unlock(&p->mu);
333 :
334 1408 : for (i = 0; i < n; i++) {
335 27 : if (selected == subchannels[i]) continue;
336 27 : grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
337 27 : GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
338 : }
339 1381 : if (p->selected) {
340 1354 : grpc_subchannel_process_transport_op(exec_ctx, selected, op);
341 1354 : GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
342 : }
343 1381 : gpr_free(subchannels);
344 1381 : }
345 :
346 1381 : static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
347 : grpc_lb_policy *pol) {
348 1381 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
349 : grpc_connectivity_state st;
350 1381 : gpr_mu_lock(&p->mu);
351 1381 : st = grpc_connectivity_state_check(&p->state_tracker);
352 1381 : gpr_mu_unlock(&p->mu);
353 1381 : return st;
354 : }
355 :
356 5533 : void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
357 : grpc_connectivity_state *current,
358 : grpc_closure *notify) {
359 5533 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
360 5533 : gpr_mu_lock(&p->mu);
361 5533 : grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
362 : current, notify);
363 5533 : gpr_mu_unlock(&p->mu);
364 5533 : }
365 :
366 : static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
367 : pf_destroy, pf_shutdown, pf_pick, pf_exit_idle, pf_broadcast,
368 : pf_check_connectivity, pf_notify_on_state_change};
369 :
370 2501 : static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
371 :
372 0 : static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
373 :
374 1440 : static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
375 : grpc_lb_policy_args *args) {
376 1440 : pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
377 1440 : GPR_ASSERT(args->num_subchannels > 0);
378 1440 : memset(p, 0, sizeof(*p));
379 1440 : grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
380 1440 : p->subchannels =
381 1440 : gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
382 1440 : p->num_subchannels = args->num_subchannels;
383 1440 : grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
384 : "pick_first");
385 1440 : memcpy(p->subchannels, args->subchannels,
386 1440 : sizeof(grpc_subchannel *) * args->num_subchannels);
387 1440 : grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
388 1440 : gpr_mu_init(&p->mu);
389 1440 : return &p->base;
390 : }
391 :
392 : static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = {
393 : pick_first_factory_ref, pick_first_factory_unref, create_pick_first,
394 : "pick_first"};
395 :
396 : static grpc_lb_policy_factory pick_first_lb_policy_factory = {
397 : &pick_first_factory_vtable};
398 :
399 5002 : grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() {
400 5002 : return &pick_first_lb_policy_factory;
401 : }
|