Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/client_config/lb_policy_factory.h"
35 : #include "src/core/client_config/lb_policies/pick_first.h"
36 :
37 : #include <string.h>
38 :
39 : #include <grpc/support/alloc.h>
40 : #include "src/core/transport/connectivity_state.h"
41 :
42 : typedef struct pending_pick {
43 : struct pending_pick *next;
44 : grpc_pollset *pollset;
45 : grpc_subchannel **target;
46 : grpc_closure *on_complete;
47 : } pending_pick;
48 :
49 : typedef struct {
50 : /** base policy: must be first */
51 : grpc_lb_policy base;
52 : /** all our subchannels */
53 : grpc_subchannel **subchannels;
54 : size_t num_subchannels;
55 :
56 : grpc_closure connectivity_changed;
57 :
58 : /** mutex protecting remaining members */
59 : gpr_mu mu;
60 : /** the selected channel
61 : TODO(ctiller): this should be atomically set so we don't
62 : need to take a mutex in the common case */
63 : grpc_subchannel *selected;
64 : /** have we started picking? */
65 : int started_picking;
66 : /** are we shut down? */
67 : int shutdown;
68 : /** which subchannel are we watching? */
69 : size_t checking_subchannel;
70 : /** what is the connectivity of that channel? */
71 : grpc_connectivity_state checking_connectivity;
72 : /** list of picks that are waiting on connectivity */
73 : pending_pick *pending_picks;
74 :
75 : /** our connectivity state tracker */
76 : grpc_connectivity_state_tracker state_tracker;
77 : } pick_first_lb_policy;
78 :
79 2453 : static void del_interested_parties_locked(grpc_exec_ctx *exec_ctx,
80 : pick_first_lb_policy *p) {
81 : pending_pick *pp;
82 2504 : for (pp = p->pending_picks; pp; pp = pp->next) {
83 102 : grpc_subchannel_del_interested_party(
84 51 : exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
85 : }
86 2453 : }
87 :
88 271 : static void add_interested_parties_locked(grpc_exec_ctx *exec_ctx,
89 : pick_first_lb_policy *p) {
90 : pending_pick *pp;
91 322 : for (pp = p->pending_picks; pp; pp = pp->next) {
92 102 : grpc_subchannel_add_interested_party(
93 51 : exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
94 : }
95 271 : }
96 :
97 2300 : void pf_destroy(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
98 2294 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
99 : size_t i;
100 2300 : GPR_ASSERT(p->pending_picks == NULL);
101 2633 : for (i = 0; i < p->num_subchannels; i++) {
102 339 : GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[i], "pick_first");
103 : }
104 2300 : if (p->selected) {
105 2090 : GRPC_SUBCHANNEL_UNREF(exec_ctx, p->selected, "picked_first");
106 : }
107 2300 : grpc_connectivity_state_destroy(exec_ctx, &p->state_tracker);
108 2300 : gpr_free(p->subchannels);
109 2300 : gpr_mu_destroy(&p->mu);
110 2300 : gpr_free(p);
111 2300 : }
112 :
113 2182 : void pf_shutdown(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
114 2176 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
115 : pending_pick *pp;
116 2182 : gpr_mu_lock(&p->mu);
117 2182 : del_interested_parties_locked(exec_ctx, p);
118 2182 : p->shutdown = 1;
119 2182 : pp = p->pending_picks;
120 2182 : p->pending_picks = NULL;
121 2182 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
122 : GRPC_CHANNEL_FATAL_FAILURE, "shutdown");
123 2182 : gpr_mu_unlock(&p->mu);
124 4364 : while (pp != NULL) {
125 0 : pending_pick *next = pp->next;
126 0 : *pp->target = NULL;
127 0 : grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
128 0 : gpr_free(pp);
129 0 : pp = next;
130 : }
131 2182 : }
132 :
133 22 : static void pf_cancel_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
134 : grpc_subchannel **target) {
135 20 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
136 : pending_pick *pp;
137 22 : gpr_mu_lock(&p->mu);
138 22 : pp = p->pending_picks;
139 22 : p->pending_picks = NULL;
140 65 : while (pp != NULL) {
141 21 : pending_pick *next = pp->next;
142 21 : if (pp->target == target) {
143 42 : grpc_subchannel_del_interested_party(
144 21 : exec_ctx, p->subchannels[p->checking_subchannel], pp->pollset);
145 21 : *target = NULL;
146 21 : grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 0);
147 21 : gpr_free(pp);
148 : } else {
149 0 : pp->next = p->pending_picks;
150 0 : p->pending_picks = pp;
151 : }
152 20 : pp = next;
153 : }
154 22 : gpr_mu_unlock(&p->mu);
155 22 : }
156 :
157 2215 : static void start_picking(grpc_exec_ctx *exec_ctx, pick_first_lb_policy *p) {
158 2215 : p->started_picking = 1;
159 2215 : p->checking_subchannel = 0;
160 2215 : p->checking_connectivity = GRPC_CHANNEL_IDLE;
161 2215 : GRPC_LB_POLICY_REF(&p->base, "pick_first_connectivity");
162 4430 : grpc_subchannel_notify_on_state_change(
163 2215 : exec_ctx, p->subchannels[p->checking_subchannel],
164 : &p->checking_connectivity, &p->connectivity_changed);
165 2215 : }
166 :
167 271 : void pf_exit_idle(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol) {
168 265 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
169 271 : gpr_mu_lock(&p->mu);
170 271 : if (!p->started_picking) {
171 271 : start_picking(exec_ctx, p);
172 : }
173 271 : gpr_mu_unlock(&p->mu);
174 271 : }
175 :
176 2114781 : int pf_pick(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol, grpc_pollset *pollset,
177 : grpc_metadata_batch *initial_metadata, grpc_subchannel **target,
178 : grpc_closure *on_complete) {
179 2114690 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
180 : pending_pick *pp;
181 2114781 : gpr_mu_lock(&p->mu);
182 2114923 : if (p->selected) {
183 2104848 : gpr_mu_unlock(&p->mu);
184 2104835 : *target = p->selected;
185 2104835 : return 1;
186 : } else {
187 10075 : if (!p->started_picking) {
188 1944 : start_picking(exec_ctx, p);
189 : }
190 10075 : grpc_subchannel_add_interested_party(
191 10075 : exec_ctx, p->subchannels[p->checking_subchannel], pollset);
192 10075 : pp = gpr_malloc(sizeof(*pp));
193 10075 : pp->next = p->pending_picks;
194 10075 : pp->pollset = pollset;
195 10075 : pp->target = target;
196 10075 : pp->on_complete = on_complete;
197 10075 : p->pending_picks = pp;
198 10075 : gpr_mu_unlock(&p->mu);
199 10075 : return 0;
200 : }
201 : }
202 :
203 2127 : static void destroy_subchannels(grpc_exec_ctx *exec_ctx, void *arg,
204 : int iomgr_success) {
205 2086 : pick_first_lb_policy *p = arg;
206 : size_t i;
207 : grpc_transport_op op;
208 2127 : size_t num_subchannels = p->num_subchannels;
209 : grpc_subchannel **subchannels;
210 : grpc_subchannel *exclude_subchannel;
211 :
212 2127 : gpr_mu_lock(&p->mu);
213 2127 : subchannels = p->subchannels;
214 2127 : p->num_subchannels = 0;
215 2127 : p->subchannels = NULL;
216 2127 : exclude_subchannel = p->selected;
217 2127 : gpr_mu_unlock(&p->mu);
218 2127 : GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "destroy_subchannels");
219 :
220 5655 : for (i = 0; i < num_subchannels; i++) {
221 3528 : if (subchannels[i] != exclude_subchannel) {
222 1401 : memset(&op, 0, sizeof(op));
223 1401 : op.disconnect = 1;
224 1401 : grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], &op);
225 : }
226 3528 : GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pick_first");
227 : }
228 :
229 2127 : gpr_free(subchannels);
230 2127 : }
231 :
232 8471 : static void pf_connectivity_changed(grpc_exec_ctx *exec_ctx, void *arg,
233 : int iomgr_success) {
234 8257 : pick_first_lb_policy *p = arg;
235 : pending_pick *pp;
236 :
237 8471 : gpr_mu_lock(&p->mu);
238 :
239 8471 : if (p->shutdown) {
240 2178 : gpr_mu_unlock(&p->mu);
241 2178 : GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
242 10649 : return;
243 6293 : } else if (p->selected != NULL) {
244 1428 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
245 : p->checking_connectivity, "selected_changed");
246 1428 : if (p->checking_connectivity != GRPC_CHANNEL_FATAL_FAILURE) {
247 1428 : grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
248 : &p->checking_connectivity,
249 : &p->connectivity_changed);
250 : } else {
251 0 : GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
252 : }
253 : } else {
254 : loop:
255 4890 : switch (p->checking_connectivity) {
256 : case GRPC_CHANNEL_READY:
257 2127 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
258 : GRPC_CHANNEL_READY, "connecting_ready");
259 2127 : p->selected = p->subchannels[p->checking_subchannel];
260 2127 : GRPC_SUBCHANNEL_REF(p->selected, "picked_first");
261 : /* drop the pick list: we are connected now */
262 2127 : GRPC_LB_POLICY_REF(&p->base, "destroy_subchannels");
263 2127 : grpc_exec_ctx_enqueue(exec_ctx,
264 : grpc_closure_create(destroy_subchannels, p), 1);
265 : /* update any calls that were waiting for a pick */
266 14128 : while ((pp = p->pending_picks)) {
267 9874 : p->pending_picks = pp->next;
268 9874 : *pp->target = p->selected;
269 9874 : grpc_subchannel_del_interested_party(exec_ctx, p->selected,
270 : pp->pollset);
271 9918 : grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
272 9690 : gpr_free(pp);
273 : }
274 2127 : grpc_subchannel_notify_on_state_change(exec_ctx, p->selected,
275 : &p->checking_connectivity,
276 : &p->connectivity_changed);
277 2127 : break;
278 : case GRPC_CHANNEL_TRANSIENT_FAILURE:
279 271 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
280 : GRPC_CHANNEL_TRANSIENT_FAILURE,
281 : "connecting_transient_failure");
282 271 : del_interested_parties_locked(exec_ctx, p);
283 271 : p->checking_subchannel =
284 271 : (p->checking_subchannel + 1) % p->num_subchannels;
285 271 : p->checking_connectivity = grpc_subchannel_check_connectivity(
286 271 : p->subchannels[p->checking_subchannel]);
287 271 : add_interested_parties_locked(exec_ctx, p);
288 271 : if (p->checking_connectivity == GRPC_CHANNEL_TRANSIENT_FAILURE) {
289 492 : grpc_subchannel_notify_on_state_change(
290 246 : exec_ctx, p->subchannels[p->checking_subchannel],
291 : &p->checking_connectivity, &p->connectivity_changed);
292 : } else {
293 23 : goto loop;
294 : }
295 246 : break;
296 : case GRPC_CHANNEL_CONNECTING:
297 : case GRPC_CHANNEL_IDLE:
298 2492 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
299 : GRPC_CHANNEL_CONNECTING,
300 : "connecting_changed");
301 4984 : grpc_subchannel_notify_on_state_change(
302 2492 : exec_ctx, p->subchannels[p->checking_subchannel],
303 : &p->checking_connectivity, &p->connectivity_changed);
304 2492 : break;
305 : case GRPC_CHANNEL_FATAL_FAILURE:
306 0 : del_interested_parties_locked(exec_ctx, p);
307 0 : GPR_SWAP(grpc_subchannel *, p->subchannels[p->checking_subchannel],
308 : p->subchannels[p->num_subchannels - 1]);
309 0 : p->num_subchannels--;
310 0 : GRPC_SUBCHANNEL_UNREF(exec_ctx, p->subchannels[p->num_subchannels],
311 : "pick_first");
312 0 : if (p->num_subchannels == 0) {
313 0 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
314 : GRPC_CHANNEL_FATAL_FAILURE,
315 : "no_more_channels");
316 0 : while ((pp = p->pending_picks)) {
317 0 : p->pending_picks = pp->next;
318 0 : *pp->target = NULL;
319 0 : grpc_exec_ctx_enqueue(exec_ctx, pp->on_complete, 1);
320 0 : gpr_free(pp);
321 : }
322 0 : GRPC_LB_POLICY_UNREF(exec_ctx, &p->base, "pick_first_connectivity");
323 : } else {
324 0 : grpc_connectivity_state_set(exec_ctx, &p->state_tracker,
325 : GRPC_CHANNEL_TRANSIENT_FAILURE,
326 : "subchannel_failed");
327 0 : p->checking_subchannel %= p->num_subchannels;
328 0 : p->checking_connectivity = grpc_subchannel_check_connectivity(
329 0 : p->subchannels[p->checking_subchannel]);
330 0 : add_interested_parties_locked(exec_ctx, p);
331 0 : goto loop;
332 : }
333 : }
334 : }
335 :
336 6293 : gpr_mu_unlock(&p->mu);
337 : }
338 :
339 2182 : static void pf_broadcast(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
340 : grpc_transport_op *op) {
341 2176 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
342 : size_t i;
343 : size_t n;
344 : grpc_subchannel **subchannels;
345 : grpc_subchannel *selected;
346 :
347 2182 : gpr_mu_lock(&p->mu);
348 2182 : n = p->num_subchannels;
349 2182 : subchannels = gpr_malloc(n * sizeof(*subchannels));
350 2182 : selected = p->selected;
351 2182 : if (selected) {
352 2090 : GRPC_SUBCHANNEL_REF(selected, "pf_broadcast_to_selected");
353 : }
354 2279 : for (i = 0; i < n; i++) {
355 103 : subchannels[i] = p->subchannels[i];
356 103 : GRPC_SUBCHANNEL_REF(subchannels[i], "pf_broadcast");
357 : }
358 2182 : gpr_mu_unlock(&p->mu);
359 :
360 2285 : for (i = 0; i < n; i++) {
361 103 : if (selected == subchannels[i]) continue;
362 103 : grpc_subchannel_process_transport_op(exec_ctx, subchannels[i], op);
363 103 : GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannels[i], "pf_broadcast");
364 : }
365 2182 : if (p->selected) {
366 2090 : grpc_subchannel_process_transport_op(exec_ctx, selected, op);
367 2090 : GRPC_SUBCHANNEL_UNREF(exec_ctx, selected, "pf_broadcast_to_selected");
368 : }
369 2182 : gpr_free(subchannels);
370 2182 : }
371 :
372 2220 : static grpc_connectivity_state pf_check_connectivity(grpc_exec_ctx *exec_ctx,
373 : grpc_lb_policy *pol) {
374 2176 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
375 : grpc_connectivity_state st;
376 2220 : gpr_mu_lock(&p->mu);
377 2220 : st = grpc_connectivity_state_check(&p->state_tracker);
378 2220 : gpr_mu_unlock(&p->mu);
379 2220 : return st;
380 : }
381 :
382 8509 : void pf_notify_on_state_change(grpc_exec_ctx *exec_ctx, grpc_lb_policy *pol,
383 : grpc_connectivity_state *current,
384 : grpc_closure *notify) {
385 8257 : pick_first_lb_policy *p = (pick_first_lb_policy *)pol;
386 8509 : gpr_mu_lock(&p->mu);
387 8509 : grpc_connectivity_state_notify_on_state_change(exec_ctx, &p->state_tracker,
388 : current, notify);
389 8509 : gpr_mu_unlock(&p->mu);
390 8509 : }
391 :
392 : static const grpc_lb_policy_vtable pick_first_lb_policy_vtable = {
393 : pf_destroy, pf_shutdown, pf_pick, pf_cancel_pick, pf_exit_idle,
394 : pf_broadcast, pf_check_connectivity, pf_notify_on_state_change};
395 :
396 3452 : static void pick_first_factory_ref(grpc_lb_policy_factory *factory) {}
397 :
398 3450 : static void pick_first_factory_unref(grpc_lb_policy_factory *factory) {}
399 :
400 2338 : static grpc_lb_policy *create_pick_first(grpc_lb_policy_factory *factory,
401 : grpc_lb_policy_args *args) {
402 2338 : pick_first_lb_policy *p = gpr_malloc(sizeof(*p));
403 2338 : GPR_ASSERT(args->num_subchannels > 0);
404 2338 : memset(p, 0, sizeof(*p));
405 2338 : grpc_lb_policy_init(&p->base, &pick_first_lb_policy_vtable);
406 2338 : p->subchannels =
407 2338 : gpr_malloc(sizeof(grpc_subchannel *) * args->num_subchannels);
408 2338 : p->num_subchannels = args->num_subchannels;
409 2338 : grpc_connectivity_state_init(&p->state_tracker, GRPC_CHANNEL_IDLE,
410 : "pick_first");
411 2338 : memcpy(p->subchannels, args->subchannels,
412 2338 : sizeof(grpc_subchannel *) * args->num_subchannels);
413 2338 : grpc_closure_init(&p->connectivity_changed, pf_connectivity_changed, p);
414 2338 : gpr_mu_init(&p->mu);
415 2338 : return &p->base;
416 : }
417 :
418 : static const grpc_lb_policy_factory_vtable pick_first_factory_vtable = {
419 : pick_first_factory_ref, pick_first_factory_unref, create_pick_first,
420 : "pick_first"};
421 :
422 : static grpc_lb_policy_factory pick_first_lb_policy_factory = {
423 : &pick_first_factory_vtable};
424 :
425 6904 : grpc_lb_policy_factory *grpc_pick_first_lb_factory_create() {
426 6904 : return &pick_first_lb_policy_factory;
427 : }
|