Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/client_config/subchannel.h"
35 :
36 : #include <string.h>
37 :
38 : #include <grpc/support/alloc.h>
39 :
40 : #include "src/core/channel/channel_args.h"
41 : #include "src/core/channel/client_channel.h"
42 : #include "src/core/channel/connected_channel.h"
43 : #include "src/core/iomgr/alarm.h"
44 : #include "src/core/transport/connectivity_state.h"
45 : #include "src/core/surface/channel.h"
46 :
47 : #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
48 : #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
49 : #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
50 : #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
51 : #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
52 :
53 : typedef struct {
54 : /* all fields protected by subchannel->mu */
55 : /** refcount */
56 : int refs;
57 : /** parent subchannel */
58 : grpc_subchannel *subchannel;
59 : } connection;
60 :
61 : typedef struct {
62 : grpc_closure closure;
63 : size_t version;
64 : grpc_subchannel *subchannel;
65 : grpc_connectivity_state connectivity_state;
66 : } state_watcher;
67 :
68 : typedef struct waiting_for_connect {
69 : struct waiting_for_connect *next;
70 : grpc_closure *notify;
71 : grpc_pollset *pollset;
72 : grpc_subchannel_call **target;
73 : grpc_subchannel *subchannel;
74 : grpc_closure continuation;
75 : } waiting_for_connect;
76 :
77 : struct grpc_subchannel {
78 : grpc_connector *connector;
79 :
80 : /** non-transport related channel filters */
81 : const grpc_channel_filter **filters;
82 : size_t num_filters;
83 : /** channel arguments */
84 : grpc_channel_args *args;
85 : /** address to connect to */
86 : struct sockaddr *addr;
87 : size_t addr_len;
88 : /** metadata context */
89 : grpc_mdctx *mdctx;
90 : /** master channel - the grpc_channel instance that ultimately owns
91 : this channel_data via its channel stack.
92 : We occasionally use this to bump the refcount on the master channel
93 : to keep ourselves alive through an asynchronous operation. */
94 : grpc_channel *master;
95 : /** have we seen a disconnection? */
96 : int disconnected;
97 :
98 : /** set during connection */
99 : grpc_connect_out_args connecting_result;
100 :
101 : /** callback for connection finishing */
102 : grpc_closure connected;
103 :
104 : /** pollset_set tracking who's interested in a connection
105 : being setup - owned by the master channel (in particular the
106 : client_channel
107 : filter there-in) */
108 : grpc_pollset_set *pollset_set;
109 :
110 : /** mutex protecting remaining elements */
111 : gpr_mu mu;
112 :
113 : /** active connection */
114 : connection *active;
115 : /** version number for the active connection */
116 : size_t active_version;
117 : /** refcount */
118 : int refs;
119 : /** are we connecting */
120 : int connecting;
121 : /** things waiting for a connection */
122 : waiting_for_connect *waiting;
123 : /** connectivity state tracking */
124 : grpc_connectivity_state_tracker state_tracker;
125 :
126 : /** next connect attempt time */
127 : gpr_timespec next_attempt;
128 : /** amount to backoff each failure */
129 : gpr_timespec backoff_delta;
130 : /** do we have an active alarm? */
131 : int have_alarm;
132 : /** our alarm */
133 : grpc_alarm alarm;
134 : /** current random value */
135 : gpr_uint32 random;
136 : };
137 :
138 : struct grpc_subchannel_call {
139 : connection *connection;
140 : gpr_refcount refs;
141 : };
142 :
143 : #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
144 : #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
145 :
146 : static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
147 : connection *con);
148 : static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
149 : grpc_subchannel *c,
150 : const char *reason);
151 : static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
152 : static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
153 : static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
154 : int iomgr_success);
155 :
156 : static void subchannel_ref_locked(grpc_subchannel *c
157 : GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
158 : static int subchannel_unref_locked(
159 : grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
160 : static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
161 : static grpc_subchannel *connection_unref_locked(
162 : grpc_exec_ctx *exec_ctx,
163 : connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
164 : static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
165 :
166 : #ifdef GRPC_SUBCHANNEL_REFCOUNT_DEBUG
167 : #define SUBCHANNEL_REF_LOCKED(p, r) \
168 : subchannel_ref_locked((p), __FILE__, __LINE__, (r))
169 : #define SUBCHANNEL_UNREF_LOCKED(p, r) \
170 : subchannel_unref_locked((p), __FILE__, __LINE__, (r))
171 : #define CONNECTION_REF_LOCKED(p, r) \
172 : connection_ref_locked((p), __FILE__, __LINE__, (r))
173 : #define CONNECTION_UNREF_LOCKED(cl, p, r) \
174 : connection_unref_locked((cl), (p), __FILE__, __LINE__, (r))
175 : #define REF_PASS_ARGS , file, line, reason
176 : #define REF_LOG(name, p) \
177 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
178 : (name), (p), (p)->refs, (p)->refs + 1, reason)
179 : #define UNREF_LOG(name, p) \
180 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
181 : (name), (p), (p)->refs, (p)->refs - 1, reason)
182 : #else
183 : #define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
184 : #define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
185 : #define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
186 : #define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p))
187 : #define REF_PASS_ARGS
188 : #define REF_LOG(name, p) \
189 : do { \
190 : } while (0)
191 : #define UNREF_LOG(name, p) \
192 : do { \
193 : } while (0)
194 : #endif
195 :
196 : /*
197 : * connection implementation
198 : */
199 :
200 1548 : static void connection_destroy(grpc_exec_ctx *exec_ctx, connection *c) {
201 1548 : GPR_ASSERT(c->refs == 0);
202 1548 : grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
203 1548 : gpr_free(c);
204 1548 : }
205 :
206 1403492 : static void connection_ref_locked(connection *c
207 : GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
208 : REF_LOG("CONNECTION", c);
209 1403492 : subchannel_ref_locked(c->subchannel REF_PASS_ARGS);
210 1403490 : ++c->refs;
211 1403490 : }
212 :
213 1402975 : static grpc_subchannel *connection_unref_locked(
214 : grpc_exec_ctx *exec_ctx, connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
215 1402975 : grpc_subchannel *destroy = NULL;
216 : UNREF_LOG("CONNECTION", c);
217 1402975 : if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
218 45 : destroy = c->subchannel;
219 : }
220 1403506 : if (--c->refs == 0 && c->subchannel->active != c) {
221 292 : connection_destroy(exec_ctx, c);
222 : }
223 1403134 : return destroy;
224 : }
225 :
226 : /*
227 : * grpc_subchannel implementation
228 : */
229 :
230 1409725 : static void subchannel_ref_locked(grpc_subchannel *c
231 : GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
232 : REF_LOG("SUBCHANNEL", c);
233 1409725 : ++c->refs;
234 1409725 : }
235 :
236 1410935 : static int subchannel_unref_locked(grpc_subchannel *c
237 : GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
238 : UNREF_LOG("SUBCHANNEL", c);
239 1410935 : return --c->refs == 0;
240 : }
241 :
242 2755 : void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
243 2755 : gpr_mu_lock(&c->mu);
244 2755 : subchannel_ref_locked(c REF_PASS_ARGS);
245 2755 : gpr_mu_unlock(&c->mu);
246 2755 : }
247 :
248 4611 : void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
249 : grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
250 : int destroy;
251 4611 : gpr_mu_lock(&c->mu);
252 4611 : destroy = subchannel_unref_locked(c REF_PASS_ARGS);
253 4611 : gpr_mu_unlock(&c->mu);
254 4611 : if (destroy) subchannel_destroy(exec_ctx, c);
255 4611 : }
256 :
257 1470 : static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
258 1470 : if (c->active != NULL) {
259 0 : connection_destroy(exec_ctx, c->active);
260 : }
261 1470 : gpr_free((void *)c->filters);
262 1470 : grpc_channel_args_destroy(c->args);
263 1470 : gpr_free(c->addr);
264 1470 : grpc_mdctx_unref(c->mdctx);
265 1470 : grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
266 1470 : grpc_connector_unref(exec_ctx, c->connector);
267 1470 : gpr_free(c);
268 1470 : }
269 :
270 10976 : void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
271 : grpc_subchannel *c,
272 : grpc_pollset *pollset) {
273 10976 : grpc_pollset_set_add_pollset(exec_ctx, c->pollset_set, pollset);
274 10976 : }
275 :
276 10648 : void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
277 : grpc_subchannel *c,
278 : grpc_pollset *pollset) {
279 10648 : grpc_pollset_set_del_pollset(exec_ctx, c->pollset_set, pollset);
280 10920 : }
281 :
282 1470 : static gpr_uint32 random_seed() {
283 1470 : return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC)));
284 : }
285 :
286 1470 : grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
287 : grpc_subchannel_args *args) {
288 1470 : grpc_subchannel *c = gpr_malloc(sizeof(*c));
289 1470 : grpc_channel_element *parent_elem = grpc_channel_stack_last_element(
290 : grpc_channel_get_channel_stack(args->master));
291 1470 : memset(c, 0, sizeof(*c));
292 1470 : c->refs = 1;
293 1470 : c->connector = connector;
294 1470 : grpc_connector_ref(c->connector);
295 1470 : c->num_filters = args->filter_count;
296 1470 : c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
297 1470 : memcpy((void *)c->filters, args->filters,
298 1470 : sizeof(grpc_channel_filter *) * c->num_filters);
299 1470 : c->addr = gpr_malloc(args->addr_len);
300 1470 : memcpy(c->addr, args->addr, args->addr_len);
301 1470 : c->addr_len = args->addr_len;
302 1470 : c->args = grpc_channel_args_copy(args->args);
303 1470 : c->mdctx = args->mdctx;
304 1470 : c->master = args->master;
305 1470 : c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
306 1470 : c->random = random_seed();
307 1470 : grpc_mdctx_ref(c->mdctx);
308 1470 : grpc_closure_init(&c->connected, subchannel_connected, c);
309 1470 : grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
310 : "subchannel");
311 1470 : gpr_mu_init(&c->mu);
312 1470 : return c;
313 : }
314 :
315 1921 : static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
316 : grpc_connect_in_args args;
317 :
318 1921 : args.interested_parties = c->pollset_set;
319 1921 : args.addr = c->addr;
320 1921 : args.addr_len = c->addr_len;
321 1921 : args.deadline = compute_connect_deadline(c);
322 1921 : args.channel_args = c->args;
323 :
324 1921 : grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
325 : &c->connected);
326 1921 : }
327 :
328 1800 : static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
329 1800 : c->backoff_delta = gpr_time_from_seconds(
330 : GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
331 1800 : c->next_attempt =
332 1800 : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
333 1800 : continue_connect(exec_ctx, c);
334 1800 : }
335 :
336 134 : static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
337 : int iomgr_success) {
338 134 : waiting_for_connect *w4c = arg;
339 134 : grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
340 134 : grpc_subchannel_create_call(exec_ctx, w4c->subchannel, w4c->pollset,
341 : w4c->target, w4c->notify);
342 134 : GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
343 134 : gpr_free(w4c);
344 134 : }
345 :
346 1402184 : void grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
347 : grpc_pollset *pollset,
348 : grpc_subchannel_call **target,
349 : grpc_closure *notify) {
350 : connection *con;
351 1402184 : gpr_mu_lock(&c->mu);
352 1402479 : if (c->active != NULL) {
353 1402345 : con = c->active;
354 1402345 : CONNECTION_REF_LOCKED(con, "call");
355 1402343 : gpr_mu_unlock(&c->mu);
356 :
357 1402336 : *target = create_call(exec_ctx, con);
358 1402338 : notify->cb(exec_ctx, notify->cb_arg, 1);
359 : } else {
360 134 : waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
361 134 : w4c->next = c->waiting;
362 134 : w4c->notify = notify;
363 134 : w4c->pollset = pollset;
364 134 : w4c->target = target;
365 134 : w4c->subchannel = c;
366 : /* released when clearing w4c */
367 134 : SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
368 134 : grpc_closure_init(&w4c->continuation, continue_creating_call, w4c);
369 134 : c->waiting = w4c;
370 134 : grpc_subchannel_add_interested_party(exec_ctx, c, pollset);
371 134 : if (!c->connecting) {
372 0 : c->connecting = 1;
373 0 : connectivity_state_changed_locked(exec_ctx, c, "create_call");
374 : /* released by connection */
375 0 : SUBCHANNEL_REF_LOCKED(c, "connecting");
376 0 : GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
377 0 : gpr_mu_unlock(&c->mu);
378 :
379 0 : start_connect(exec_ctx, c);
380 : } else {
381 134 : gpr_mu_unlock(&c->mu);
382 : }
383 : }
384 1402444 : }
385 :
386 70 : grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
387 : grpc_connectivity_state state;
388 70 : gpr_mu_lock(&c->mu);
389 70 : state = grpc_connectivity_state_check(&c->state_tracker);
390 70 : gpr_mu_unlock(&c->mu);
391 70 : return state;
392 : }
393 :
394 5828 : void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
395 : grpc_subchannel *c,
396 : grpc_connectivity_state *state,
397 : grpc_closure *notify) {
398 5828 : int do_connect = 0;
399 5828 : gpr_mu_lock(&c->mu);
400 5828 : if (grpc_connectivity_state_notify_on_state_change(
401 : exec_ctx, &c->state_tracker, state, notify)) {
402 1800 : do_connect = 1;
403 1800 : c->connecting = 1;
404 : /* released by connection */
405 1800 : SUBCHANNEL_REF_LOCKED(c, "connecting");
406 1800 : GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
407 1800 : connectivity_state_changed_locked(exec_ctx, c, "state_change");
408 : }
409 5828 : gpr_mu_unlock(&c->mu);
410 :
411 5828 : if (do_connect) {
412 1800 : start_connect(exec_ctx, c);
413 : }
414 5828 : }
415 :
416 0 : int grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
417 : grpc_subchannel *c,
418 : grpc_closure *subscribed_notify) {
419 : int success;
420 0 : gpr_mu_lock(&c->mu);
421 0 : success = grpc_connectivity_state_change_unsubscribe(
422 : exec_ctx, &c->state_tracker, subscribed_notify);
423 0 : gpr_mu_unlock(&c->mu);
424 0 : return success;
425 : }
426 :
427 1411 : void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
428 : grpc_subchannel *c,
429 : grpc_transport_op *op) {
430 1411 : connection *con = NULL;
431 : grpc_subchannel *destroy;
432 1411 : int cancel_alarm = 0;
433 1411 : gpr_mu_lock(&c->mu);
434 1411 : if (c->active != NULL) {
435 1147 : con = c->active;
436 1147 : CONNECTION_REF_LOCKED(con, "transport-op");
437 : }
438 1411 : if (op->disconnect) {
439 1411 : c->disconnected = 1;
440 1411 : connectivity_state_changed_locked(exec_ctx, c, "disconnect");
441 1411 : if (c->have_alarm) {
442 40 : cancel_alarm = 1;
443 : }
444 : }
445 1411 : gpr_mu_unlock(&c->mu);
446 :
447 1411 : if (con != NULL) {
448 1147 : grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
449 1147 : grpc_channel_element *top_elem =
450 : grpc_channel_stack_element(channel_stack, 0);
451 1147 : top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
452 :
453 1147 : gpr_mu_lock(&c->mu);
454 1147 : destroy = CONNECTION_UNREF_LOCKED(exec_ctx, con, "transport-op");
455 1147 : gpr_mu_unlock(&c->mu);
456 1147 : if (destroy) {
457 0 : subchannel_destroy(exec_ctx, destroy);
458 : }
459 : }
460 :
461 1411 : if (cancel_alarm) {
462 40 : grpc_alarm_cancel(exec_ctx, &c->alarm);
463 : }
464 :
465 1411 : if (op->disconnect) {
466 1411 : grpc_connector_shutdown(exec_ctx, c->connector);
467 : }
468 1411 : }
469 :
470 1548 : static void on_state_changed(grpc_exec_ctx *exec_ctx, void *p,
471 : int iomgr_success) {
472 1548 : state_watcher *sw = p;
473 1548 : grpc_subchannel *c = sw->subchannel;
474 1548 : gpr_mu *mu = &c->mu;
475 : int destroy;
476 : grpc_transport_op op;
477 : grpc_channel_element *elem;
478 1548 : connection *destroy_connection = NULL;
479 :
480 1548 : gpr_mu_lock(mu);
481 :
482 : /* if we failed or there is a version number mismatch, just leave
483 : this closure */
484 1548 : if (!iomgr_success || sw->subchannel->active_version != sw->version) {
485 : goto done;
486 : }
487 :
488 1548 : switch (sw->connectivity_state) {
489 : case GRPC_CHANNEL_CONNECTING:
490 : case GRPC_CHANNEL_READY:
491 : case GRPC_CHANNEL_IDLE:
492 : /* all is still good: keep watching */
493 0 : memset(&op, 0, sizeof(op));
494 0 : op.connectivity_state = &sw->connectivity_state;
495 0 : op.on_connectivity_state_change = &sw->closure;
496 0 : elem = grpc_channel_stack_element(
497 0 : CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
498 0 : elem->filter->start_transport_op(exec_ctx, elem, &op);
499 : /* early out */
500 0 : gpr_mu_unlock(mu);
501 1548 : return;
502 : case GRPC_CHANNEL_FATAL_FAILURE:
503 : case GRPC_CHANNEL_TRANSIENT_FAILURE:
504 : /* things have gone wrong, deactivate and enter idle */
505 1548 : if (sw->subchannel->active->refs == 0) {
506 1256 : destroy_connection = sw->subchannel->active;
507 : }
508 1548 : sw->subchannel->active = NULL;
509 1548 : grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
510 1548 : c->disconnected
511 : ? GRPC_CHANNEL_FATAL_FAILURE
512 : : GRPC_CHANNEL_TRANSIENT_FAILURE,
513 : "connection_failed");
514 1548 : break;
515 : }
516 :
517 : done:
518 1548 : connectivity_state_changed_locked(exec_ctx, c, "transport_state_changed");
519 1548 : destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
520 1548 : gpr_free(sw);
521 1548 : gpr_mu_unlock(mu);
522 1548 : if (destroy) {
523 1099 : subchannel_destroy(exec_ctx, c);
524 : }
525 1548 : if (destroy_connection != NULL) {
526 1256 : connection_destroy(exec_ctx, destroy_connection);
527 : }
528 : }
529 :
530 1547 : static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
531 : size_t channel_stack_size;
532 : connection *con;
533 : grpc_channel_stack *stk;
534 : size_t num_filters;
535 : const grpc_channel_filter **filters;
536 : waiting_for_connect *w4c;
537 : grpc_transport_op op;
538 : state_watcher *sw;
539 1547 : connection *destroy_connection = NULL;
540 : grpc_channel_element *elem;
541 :
542 : /* build final filter list */
543 1547 : num_filters = c->num_filters + c->connecting_result.num_filters + 1;
544 1547 : filters = gpr_malloc(sizeof(*filters) * num_filters);
545 1547 : memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
546 1547 : memcpy((void *)(filters + c->num_filters), c->connecting_result.filters,
547 1547 : sizeof(*filters) * c->connecting_result.num_filters);
548 1547 : filters[num_filters - 1] = &grpc_connected_channel_filter;
549 :
550 : /* construct channel stack */
551 1547 : channel_stack_size = grpc_channel_stack_size(filters, num_filters);
552 1546 : con = gpr_malloc(sizeof(connection) + channel_stack_size);
553 1547 : stk = (grpc_channel_stack *)(con + 1);
554 1547 : con->refs = 0;
555 1547 : con->subchannel = c;
556 1547 : grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args,
557 : c->mdctx, stk);
558 1546 : grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
559 1547 : gpr_free((void *)c->connecting_result.filters);
560 1548 : memset(&c->connecting_result, 0, sizeof(c->connecting_result));
561 :
562 : /* initialize state watcher */
563 1548 : sw = gpr_malloc(sizeof(*sw));
564 1547 : grpc_closure_init(&sw->closure, on_state_changed, sw);
565 1547 : sw->subchannel = c;
566 1547 : sw->connectivity_state = GRPC_CHANNEL_READY;
567 :
568 1547 : gpr_mu_lock(&c->mu);
569 :
570 1548 : if (c->disconnected) {
571 0 : gpr_mu_unlock(&c->mu);
572 0 : gpr_free(sw);
573 0 : gpr_free((void *)filters);
574 0 : grpc_channel_stack_destroy(exec_ctx, stk);
575 0 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
576 0 : GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
577 1548 : return;
578 : }
579 :
580 : /* publish */
581 1548 : if (c->active != NULL && c->active->refs == 0) {
582 0 : destroy_connection = c->active;
583 : }
584 1548 : c->active = con;
585 1548 : c->active_version++;
586 1548 : sw->version = c->active_version;
587 1548 : c->connecting = 0;
588 :
589 : /* watch for changes; subchannel ref for connecting is donated
590 : to the state watcher */
591 1548 : memset(&op, 0, sizeof(op));
592 1548 : op.connectivity_state = &sw->connectivity_state;
593 1548 : op.on_connectivity_state_change = &sw->closure;
594 1548 : op.bind_pollset_set = c->pollset_set;
595 1548 : SUBCHANNEL_REF_LOCKED(c, "state_watcher");
596 1548 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
597 1548 : GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
598 1548 : elem =
599 1548 : grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
600 1548 : elem->filter->start_transport_op(exec_ctx, elem, &op);
601 :
602 : /* signal completion */
603 1548 : connectivity_state_changed_locked(exec_ctx, c, "connected");
604 1548 : w4c = c->waiting;
605 1548 : c->waiting = NULL;
606 :
607 1548 : gpr_mu_unlock(&c->mu);
608 :
609 3230 : while (w4c != NULL) {
610 134 : waiting_for_connect *next = w4c->next;
611 134 : grpc_exec_ctx_enqueue(exec_ctx, &w4c->continuation, 1);
612 134 : w4c = next;
613 : }
614 :
615 1548 : gpr_free((void *)filters);
616 :
617 1548 : if (destroy_connection != NULL) {
618 0 : connection_destroy(exec_ctx, destroy_connection);
619 : }
620 : }
621 :
622 : /* Generate a random number between 0 and 1. */
623 121 : static double generate_uniform_random_number(grpc_subchannel *c) {
624 121 : c->random = (1103515245 * c->random + 12345) % ((gpr_uint32)1 << 31);
625 121 : return c->random / (double)((gpr_uint32)1 << 31);
626 : }
627 :
628 : /* Update backoff_delta and next_attempt in subchannel */
629 121 : static void update_reconnect_parameters(grpc_subchannel *c) {
630 : gpr_int32 backoff_delta_millis, jitter;
631 121 : gpr_int32 max_backoff_millis =
632 : GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
633 : double jitter_range;
634 121 : backoff_delta_millis =
635 121 : (gpr_int32)(gpr_time_to_millis(c->backoff_delta) *
636 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER);
637 121 : if (backoff_delta_millis > max_backoff_millis) {
638 0 : backoff_delta_millis = max_backoff_millis;
639 : }
640 121 : c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN);
641 121 : c->next_attempt =
642 121 : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
643 :
644 121 : jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis;
645 121 : jitter =
646 121 : (gpr_int32)((2 * generate_uniform_random_number(c) - 1) * jitter_range);
647 121 : c->next_attempt =
648 121 : gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
649 121 : }
650 :
651 373 : static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
652 373 : grpc_subchannel *c = arg;
653 373 : gpr_mu_lock(&c->mu);
654 373 : c->have_alarm = 0;
655 373 : if (c->disconnected) {
656 252 : iomgr_success = 0;
657 : }
658 373 : connectivity_state_changed_locked(exec_ctx, c, "alarm");
659 373 : if (iomgr_success) {
660 121 : gpr_mu_unlock(&c->mu);
661 121 : update_reconnect_parameters(c);
662 121 : continue_connect(exec_ctx, c);
663 : } else {
664 : waiting_for_connect *w4c;
665 252 : w4c = c->waiting;
666 252 : c->waiting = NULL;
667 252 : gpr_mu_unlock(&c->mu);
668 504 : while (w4c != NULL) {
669 0 : waiting_for_connect *next = w4c->next;
670 0 : grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
671 : w4c->pollset);
672 0 : w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, 0);
673 0 : GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
674 0 : gpr_free(w4c);
675 0 : w4c = next;
676 : }
677 252 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
678 252 : GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
679 : }
680 373 : }
681 :
682 1920 : static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
683 : int iomgr_success) {
684 1920 : grpc_subchannel *c = arg;
685 1920 : if (c->connecting_result.transport != NULL) {
686 1547 : publish_transport(exec_ctx, c);
687 : } else {
688 373 : gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
689 373 : gpr_mu_lock(&c->mu);
690 373 : GPR_ASSERT(!c->have_alarm);
691 373 : c->have_alarm = 1;
692 373 : connectivity_state_changed_locked(exec_ctx, c, "connect_failed");
693 373 : grpc_alarm_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
694 373 : gpr_mu_unlock(&c->mu);
695 : }
696 1921 : }
697 :
698 1921 : static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
699 1921 : gpr_timespec current_deadline =
700 : gpr_time_add(c->next_attempt, c->backoff_delta);
701 1921 : gpr_timespec min_deadline = gpr_time_add(
702 : gpr_now(GPR_CLOCK_MONOTONIC),
703 : gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS,
704 : GPR_TIMESPAN));
705 1921 : return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline
706 : : min_deadline;
707 : }
708 :
709 7053 : static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
710 7053 : if (c->disconnected) {
711 3022 : return GRPC_CHANNEL_FATAL_FAILURE;
712 : }
713 4031 : if (c->connecting) {
714 2082 : if (c->have_alarm) {
715 161 : return GRPC_CHANNEL_TRANSIENT_FAILURE;
716 : }
717 1921 : return GRPC_CHANNEL_CONNECTING;
718 : }
719 1949 : if (c->active) {
720 1548 : return GRPC_CHANNEL_READY;
721 : }
722 401 : return GRPC_CHANNEL_IDLE;
723 : }
724 :
725 7053 : static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
726 : grpc_subchannel *c,
727 : const char *reason) {
728 7053 : grpc_connectivity_state current = compute_connectivity_locked(c);
729 7053 : grpc_connectivity_state_set(exec_ctx, &c->state_tracker, current, reason);
730 7053 : }
731 :
732 : /*
733 : * grpc_subchannel_call implementation
734 : */
735 :
736 390 : void grpc_subchannel_call_ref(grpc_subchannel_call *c
737 : GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
738 390 : gpr_ref(&c->refs);
739 390 : }
740 :
741 1402711 : void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
742 : grpc_subchannel_call *c
743 : GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
744 1402711 : if (gpr_unref(&c->refs)) {
745 1402342 : gpr_mu *mu = &c->connection->subchannel->mu;
746 : grpc_subchannel *destroy;
747 1402342 : grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
748 1402352 : gpr_mu_lock(mu);
749 1402358 : destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call");
750 1402356 : gpr_mu_unlock(mu);
751 1402353 : gpr_free(c);
752 1402309 : if (destroy != NULL) {
753 45 : subchannel_destroy(exec_ctx, destroy);
754 : }
755 : }
756 1402699 : }
757 :
758 390 : char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
759 : grpc_subchannel_call *call) {
760 390 : grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
761 390 : grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
762 390 : return top_elem->filter->get_peer(exec_ctx, top_elem);
763 : }
764 :
765 2845904 : void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
766 : grpc_subchannel_call *call,
767 : grpc_transport_stream_op *op) {
768 2845904 : grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
769 2845904 : grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
770 2845601 : top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
771 2846988 : }
772 :
773 1401490 : static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
774 : connection *con) {
775 1401490 : grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
776 1401490 : grpc_subchannel_call *call =
777 1401490 : gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
778 1402194 : grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
779 1402194 : call->connection = con;
780 1402194 : gpr_ref_init(&call->refs, 1);
781 1402218 : grpc_call_stack_init(exec_ctx, chanstk, NULL, NULL, callstk);
782 1402344 : return call;
783 : }
|