Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/client_config/subchannel.h"
35 :
36 : #include <string.h>
37 :
38 : #include <grpc/support/alloc.h>
39 :
40 : #include "src/core/channel/channel_args.h"
41 : #include "src/core/channel/client_channel.h"
42 : #include "src/core/channel/connected_channel.h"
43 : #include "src/core/client_config/initial_connect_string.h"
44 : #include "src/core/iomgr/timer.h"
45 : #include "src/core/profiling/timers.h"
46 : #include "src/core/surface/channel.h"
47 : #include "src/core/transport/connectivity_state.h"
48 : #include "src/core/transport/connectivity_state.h"
49 :
50 : #define GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS 20
51 : #define GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS 1
52 : #define GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER 1.6
53 : #define GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS 120
54 : #define GRPC_SUBCHANNEL_RECONNECT_JITTER 0.2
55 :
56 : typedef struct {
57 : /* all fields protected by subchannel->mu */
58 : /** refcount */
59 : int refs;
60 : /** parent subchannel */
61 : grpc_subchannel *subchannel;
62 : } connection;
63 :
64 : typedef struct {
65 : grpc_closure closure;
66 : size_t version;
67 : grpc_subchannel *subchannel;
68 : grpc_connectivity_state connectivity_state;
69 : } state_watcher;
70 :
71 : typedef struct waiting_for_connect {
72 : struct waiting_for_connect *next;
73 : grpc_closure *notify;
74 : grpc_pollset *pollset;
75 : gpr_atm *target;
76 : grpc_subchannel *subchannel;
77 : grpc_closure continuation;
78 : } waiting_for_connect;
79 :
80 : struct grpc_subchannel {
81 : grpc_connector *connector;
82 :
83 : /** non-transport related channel filters */
84 : const grpc_channel_filter **filters;
85 : size_t num_filters;
86 : /** channel arguments */
87 : grpc_channel_args *args;
88 : /** address to connect to */
89 : struct sockaddr *addr;
90 : size_t addr_len;
91 : /** initial string to send to peer */
92 : gpr_slice initial_connect_string;
93 : /** master channel - the grpc_channel instance that ultimately owns
94 : this channel_data via its channel stack.
95 : We occasionally use this to bump the refcount on the master channel
96 : to keep ourselves alive through an asynchronous operation. */
97 : grpc_channel *master;
98 : /** have we seen a disconnection? */
99 : int disconnected;
100 :
101 : /** set during connection */
102 : grpc_connect_out_args connecting_result;
103 :
104 : /** callback for connection finishing */
105 : grpc_closure connected;
106 :
107 : /** pollset_set tracking who's interested in a connection
108 : being setup - owned by the master channel (in particular the
109 : client_channel
110 : filter there-in) */
111 : grpc_pollset_set *pollset_set;
112 :
113 : /** mutex protecting remaining elements */
114 : gpr_mu mu;
115 :
116 : /** active connection */
117 : connection *active;
118 : /** version number for the active connection */
119 : size_t active_version;
120 : /** refcount */
121 : int refs;
122 : /** are we connecting */
123 : int connecting;
124 : /** things waiting for a connection */
125 : waiting_for_connect *waiting;
126 : /** connectivity state tracking */
127 : grpc_connectivity_state_tracker state_tracker;
128 :
129 : /** next connect attempt time */
130 : gpr_timespec next_attempt;
131 : /** amount to backoff each failure */
132 : gpr_timespec backoff_delta;
133 : /** do we have an active alarm? */
134 : int have_alarm;
135 : /** our alarm */
136 : grpc_timer alarm;
137 : /** current random value */
138 : gpr_uint32 random;
139 : };
140 :
141 : struct grpc_subchannel_call {
142 : connection *connection;
143 : };
144 :
145 : #define SUBCHANNEL_CALL_TO_CALL_STACK(call) ((grpc_call_stack *)((call) + 1))
146 : #define CHANNEL_STACK_FROM_CONNECTION(con) ((grpc_channel_stack *)((con) + 1))
147 : #define CALLSTACK_TO_SUBCHANNEL_CALL(callstack) \
148 : (((grpc_subchannel_call *)(callstack)) - 1)
149 :
150 : static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
151 : connection *con,
152 : grpc_pollset *pollset);
153 : static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
154 : grpc_subchannel *c,
155 : const char *reason);
156 : static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c);
157 : static gpr_timespec compute_connect_deadline(grpc_subchannel *c);
158 : static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *subchannel,
159 : int iomgr_success);
160 :
161 : static void subchannel_ref_locked(grpc_subchannel *c
162 : GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
163 : static int subchannel_unref_locked(
164 : grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
165 : static void connection_ref_locked(connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS);
166 : static grpc_subchannel *connection_unref_locked(
167 : grpc_exec_ctx *exec_ctx,
168 : connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) GRPC_MUST_USE_RESULT;
169 : static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c);
170 :
171 : #ifdef GRPC_STREAM_REFCOUNT_DEBUG
172 : #define SUBCHANNEL_REF_LOCKED(p, r) \
173 : subchannel_ref_locked((p), __FILE__, __LINE__, (r))
174 : #define SUBCHANNEL_UNREF_LOCKED(p, r) \
175 : subchannel_unref_locked((p), __FILE__, __LINE__, (r))
176 : #define CONNECTION_REF_LOCKED(p, r) \
177 : connection_ref_locked((p), __FILE__, __LINE__, (r))
178 : #define CONNECTION_UNREF_LOCKED(cl, p, r) \
179 : connection_unref_locked((cl), (p), __FILE__, __LINE__, (r))
180 : #define REF_PASS_ARGS , file, line, reason
181 : #define REF_PASS_REASON , reason
182 : #define REF_LOG(name, p) \
183 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p ref %d -> %d %s", \
184 : (name), (p), (p)->refs, (p)->refs + 1, reason)
185 : #define UNREF_LOG(name, p) \
186 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "%s: %p unref %d -> %d %s", \
187 : (name), (p), (p)->refs, (p)->refs - 1, reason)
188 : #else
189 : #define SUBCHANNEL_REF_LOCKED(p, r) subchannel_ref_locked((p))
190 : #define SUBCHANNEL_UNREF_LOCKED(p, r) subchannel_unref_locked((p))
191 : #define CONNECTION_REF_LOCKED(p, r) connection_ref_locked((p))
192 : #define CONNECTION_UNREF_LOCKED(cl, p, r) connection_unref_locked((cl), (p))
193 : #define REF_PASS_ARGS
194 : #define REF_PASS_REASON
195 : #define REF_LOG(name, p) \
196 : do { \
197 : } while (0)
198 : #define UNREF_LOG(name, p) \
199 : do { \
200 : } while (0)
201 : #endif
202 :
203 : /*
204 : * connection implementation
205 : */
206 :
207 2303 : static void connection_destroy(grpc_exec_ctx *exec_ctx, connection *c) {
208 2303 : GPR_ASSERT(c->refs == 0);
209 2303 : grpc_channel_stack_destroy(exec_ctx, CHANNEL_STACK_FROM_CONNECTION(c));
210 2304 : gpr_free(c);
211 2304 : }
212 :
213 2168123 : static void connection_ref_locked(connection *c
214 : GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
215 : REF_LOG("CONNECTION", c);
216 2168213 : subchannel_ref_locked(c->subchannel REF_PASS_ARGS);
217 2169204 : ++c->refs;
218 2169114 : }
219 :
220 2169130 : static grpc_subchannel *connection_unref_locked(
221 : grpc_exec_ctx *exec_ctx, connection *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
222 2169111 : grpc_subchannel *destroy = NULL;
223 : UNREF_LOG("CONNECTION", c);
224 2169149 : if (subchannel_unref_locked(c->subchannel REF_PASS_ARGS)) {
225 157 : destroy = c->subchannel;
226 : }
227 2169135 : if (--c->refs == 0 && c->subchannel->active != c) {
228 419 : connection_destroy(exec_ctx, c);
229 : }
230 2169135 : return destroy;
231 : }
232 :
233 : /*
234 : * grpc_subchannel implementation
235 : */
236 :
237 2178676 : static void subchannel_ref_locked(grpc_subchannel *c
238 : GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
239 : REF_LOG("SUBCHANNEL", c);
240 2178941 : ++c->refs;
241 2178676 : }
242 :
243 2182496 : static int subchannel_unref_locked(grpc_subchannel *c
244 : GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
245 : UNREF_LOG("SUBCHANNEL", c);
246 2182696 : return --c->refs == 0;
247 : }
248 :
249 4340 : void grpc_subchannel_ref(grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
250 4340 : gpr_mu_lock(&c->mu);
251 4291 : subchannel_ref_locked(c REF_PASS_ARGS);
252 4340 : gpr_mu_unlock(&c->mu);
253 4340 : }
254 :
255 8869 : void grpc_subchannel_unref(grpc_exec_ctx *exec_ctx,
256 : grpc_subchannel *c GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
257 : int destroy;
258 8869 : gpr_mu_lock(&c->mu);
259 8769 : destroy = subchannel_unref_locked(c REF_PASS_ARGS);
260 8869 : gpr_mu_unlock(&c->mu);
261 8869 : if (destroy) subchannel_destroy(exec_ctx, c);
262 8869 : }
263 :
264 3834 : static void subchannel_destroy(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
265 3834 : if (c->active != NULL) {
266 0 : connection_destroy(exec_ctx, c->active);
267 : }
268 3834 : gpr_free((void *)c->filters);
269 3834 : grpc_channel_args_destroy(c->args);
270 3834 : gpr_free(c->addr);
271 3834 : gpr_slice_unref(c->initial_connect_string);
272 3834 : grpc_connectivity_state_destroy(exec_ctx, &c->state_tracker);
273 3834 : grpc_connector_unref(exec_ctx, c->connector);
274 3834 : gpr_free(c);
275 3834 : }
276 :
277 10614 : void grpc_subchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
278 : grpc_subchannel *c,
279 : grpc_pollset *pollset) {
280 10614 : grpc_pollset_set_add_pollset(exec_ctx, c->pollset_set, pollset);
281 10614 : }
282 :
283 10029 : void grpc_subchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
284 : grpc_subchannel *c,
285 : grpc_pollset *pollset) {
286 10029 : grpc_pollset_set_del_pollset(exec_ctx, c->pollset_set, pollset);
287 10488 : }
288 :
289 3889 : static gpr_uint32 random_seed() {
290 3889 : return (gpr_uint32)(gpr_time_to_millis(gpr_now(GPR_CLOCK_MONOTONIC)));
291 : }
292 :
293 3889 : grpc_subchannel *grpc_subchannel_create(grpc_connector *connector,
294 : grpc_subchannel_args *args) {
295 3889 : grpc_subchannel *c = gpr_malloc(sizeof(*c));
296 3889 : grpc_channel_element *parent_elem = grpc_channel_stack_last_element(
297 : grpc_channel_get_channel_stack(args->master));
298 3889 : memset(c, 0, sizeof(*c));
299 3889 : c->refs = 1;
300 3889 : c->connector = connector;
301 3889 : grpc_connector_ref(c->connector);
302 3889 : c->num_filters = args->filter_count;
303 3889 : c->filters = gpr_malloc(sizeof(grpc_channel_filter *) * c->num_filters);
304 3889 : memcpy((void *)c->filters, args->filters,
305 3889 : sizeof(grpc_channel_filter *) * c->num_filters);
306 3889 : c->addr = gpr_malloc(args->addr_len);
307 3889 : memcpy(c->addr, args->addr, args->addr_len);
308 3889 : c->addr_len = args->addr_len;
309 3889 : grpc_set_initial_connect_string(&c->addr, &c->addr_len,
310 : &c->initial_connect_string);
311 3889 : c->args = grpc_channel_args_copy(args->args);
312 3889 : c->master = args->master;
313 3889 : c->pollset_set = grpc_client_channel_get_connecting_pollset_set(parent_elem);
314 3889 : c->random = random_seed();
315 3889 : grpc_closure_init(&c->connected, subchannel_connected, c);
316 3889 : grpc_connectivity_state_init(&c->state_tracker, GRPC_CHANNEL_IDLE,
317 : "subchannel");
318 3889 : gpr_mu_init(&c->mu);
319 3889 : return c;
320 : }
321 :
322 331 : static void cancel_waiting_calls(grpc_exec_ctx *exec_ctx,
323 : grpc_subchannel *subchannel,
324 : int iomgr_success) {
325 : waiting_for_connect *w4c;
326 331 : gpr_mu_lock(&subchannel->mu);
327 331 : w4c = subchannel->waiting;
328 331 : subchannel->waiting = NULL;
329 331 : gpr_mu_unlock(&subchannel->mu);
330 662 : while (w4c != NULL) {
331 0 : waiting_for_connect *next = w4c->next;
332 0 : grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
333 : w4c->pollset);
334 0 : if (w4c->notify) {
335 0 : w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
336 : }
337 :
338 0 : GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
339 0 : gpr_free(w4c);
340 :
341 0 : w4c = next;
342 : }
343 331 : }
344 :
345 22 : void grpc_subchannel_cancel_create_call(grpc_exec_ctx *exec_ctx,
346 : grpc_subchannel *subchannel,
347 : gpr_atm *target) {
348 : waiting_for_connect *w4c;
349 22 : int unref_count = 0;
350 22 : gpr_mu_lock(&subchannel->mu);
351 22 : w4c = subchannel->waiting;
352 22 : subchannel->waiting = NULL;
353 66 : while (w4c != NULL) {
354 22 : waiting_for_connect *next = w4c->next;
355 22 : if (w4c->target == target) {
356 22 : grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel,
357 : w4c->pollset);
358 22 : grpc_exec_ctx_enqueue(exec_ctx, w4c->notify, 0);
359 :
360 22 : unref_count++;
361 22 : gpr_free(w4c);
362 : } else {
363 0 : w4c->next = subchannel->waiting;
364 0 : subchannel->waiting = w4c;
365 : }
366 :
367 22 : w4c = next;
368 : }
369 22 : gpr_mu_unlock(&subchannel->mu);
370 :
371 66 : while (unref_count-- > 0) {
372 22 : GRPC_SUBCHANNEL_UNREF(exec_ctx, subchannel, "waiting_for_connect");
373 : }
374 22 : }
375 :
376 3114 : static void continue_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
377 : grpc_connect_in_args args;
378 :
379 3114 : args.interested_parties = c->pollset_set;
380 3114 : args.addr = c->addr;
381 3114 : args.addr_len = c->addr_len;
382 3114 : args.deadline = compute_connect_deadline(c);
383 3114 : args.channel_args = c->args;
384 3114 : args.initial_connect_string = c->initial_connect_string;
385 :
386 3114 : grpc_connector_connect(exec_ctx, c->connector, &args, &c->connecting_result,
387 : &c->connected);
388 3114 : }
389 :
390 2720 : static void start_connect(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
391 2720 : c->backoff_delta = gpr_time_from_seconds(
392 : GRPC_SUBCHANNEL_INITIAL_CONNECT_BACKOFF_SECONDS, GPR_TIMESPAN);
393 2720 : c->next_attempt =
394 2720 : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
395 2720 : continue_connect(exec_ctx, c);
396 2720 : }
397 :
398 326 : static void continue_creating_call(grpc_exec_ctx *exec_ctx, void *arg,
399 : int iomgr_success) {
400 : int call_creation_finished_ok;
401 326 : waiting_for_connect *w4c = arg;
402 326 : grpc_subchannel_del_interested_party(exec_ctx, w4c->subchannel, w4c->pollset);
403 326 : call_creation_finished_ok = grpc_subchannel_create_call(
404 : exec_ctx, w4c->subchannel, w4c->pollset, w4c->target, w4c->notify);
405 326 : GPR_ASSERT(call_creation_finished_ok == 1);
406 326 : w4c->notify->cb(exec_ctx, w4c->notify->cb_arg, iomgr_success);
407 326 : GRPC_SUBCHANNEL_UNREF(exec_ctx, w4c->subchannel, "waiting_for_connect");
408 326 : gpr_free(w4c);
409 326 : }
410 :
411 2167184 : int grpc_subchannel_create_call(grpc_exec_ctx *exec_ctx, grpc_subchannel *c,
412 : grpc_pollset *pollset, gpr_atm *target,
413 : grpc_closure *notify) {
414 : connection *con;
415 : grpc_subchannel_call *call;
416 : GPR_TIMER_BEGIN("grpc_subchannel_create_call", 0);
417 2167184 : gpr_mu_lock(&c->mu);
418 2167681 : if (c->active != NULL) {
419 2167243 : con = c->active;
420 2167243 : CONNECTION_REF_LOCKED(con, "call");
421 2167302 : gpr_mu_unlock(&c->mu);
422 :
423 2167282 : call = create_call(exec_ctx, con, pollset);
424 2167328 : if (!gpr_atm_rel_cas(target, 0, (gpr_atm)(gpr_uintptr)call)) {
425 1 : GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "failed to set");
426 : }
427 : GPR_TIMER_END("grpc_subchannel_create_call", 0);
428 2166889 : return 1;
429 : } else {
430 348 : waiting_for_connect *w4c = gpr_malloc(sizeof(*w4c));
431 348 : w4c->next = c->waiting;
432 348 : w4c->notify = notify;
433 348 : w4c->pollset = pollset;
434 348 : w4c->target = target;
435 348 : w4c->subchannel = c;
436 : /* released when clearing w4c */
437 348 : SUBCHANNEL_REF_LOCKED(c, "waiting_for_connect");
438 348 : grpc_closure_init(&w4c->continuation, continue_creating_call, w4c);
439 348 : c->waiting = w4c;
440 348 : grpc_subchannel_add_interested_party(exec_ctx, c, pollset);
441 348 : if (!c->connecting) {
442 39 : c->connecting = 1;
443 39 : connectivity_state_changed_locked(exec_ctx, c, "create_call");
444 : /* released by connection */
445 39 : SUBCHANNEL_REF_LOCKED(c, "connecting");
446 39 : GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
447 39 : gpr_mu_unlock(&c->mu);
448 :
449 39 : start_connect(exec_ctx, c);
450 : } else {
451 309 : gpr_mu_unlock(&c->mu);
452 : }
453 : GPR_TIMER_END("grpc_subchannel_create_call", 0);
454 348 : return 0;
455 : }
456 : }
457 :
458 271 : grpc_connectivity_state grpc_subchannel_check_connectivity(grpc_subchannel *c) {
459 : grpc_connectivity_state state;
460 271 : gpr_mu_lock(&c->mu);
461 271 : state = grpc_connectivity_state_check(&c->state_tracker);
462 271 : gpr_mu_unlock(&c->mu);
463 271 : return state;
464 : }
465 :
466 8874 : void grpc_subchannel_notify_on_state_change(grpc_exec_ctx *exec_ctx,
467 : grpc_subchannel *c,
468 : grpc_connectivity_state *state,
469 : grpc_closure *notify) {
470 8623 : int do_connect = 0;
471 8874 : gpr_mu_lock(&c->mu);
472 8874 : if (grpc_connectivity_state_notify_on_state_change(
473 : exec_ctx, &c->state_tracker, state, notify)) {
474 2596 : do_connect = 1;
475 2681 : c->connecting = 1;
476 : /* released by connection */
477 2596 : SUBCHANNEL_REF_LOCKED(c, "connecting");
478 2681 : GRPC_CHANNEL_INTERNAL_REF(c->master, "connecting");
479 2681 : connectivity_state_changed_locked(exec_ctx, c, "state_change");
480 : }
481 8874 : gpr_mu_unlock(&c->mu);
482 :
483 8874 : if (do_connect) {
484 2681 : start_connect(exec_ctx, c);
485 : }
486 8874 : }
487 :
488 246 : int grpc_subchannel_state_change_unsubscribe(grpc_exec_ctx *exec_ctx,
489 : grpc_subchannel *c,
490 : grpc_closure *subscribed_notify) {
491 : int success;
492 246 : gpr_mu_lock(&c->mu);
493 246 : success = grpc_connectivity_state_change_unsubscribe(
494 : exec_ctx, &c->state_tracker, subscribed_notify);
495 246 : gpr_mu_unlock(&c->mu);
496 246 : return success;
497 : }
498 :
499 3614 : void grpc_subchannel_process_transport_op(grpc_exec_ctx *exec_ctx,
500 : grpc_subchannel *c,
501 : grpc_transport_op *op) {
502 3565 : connection *con = NULL;
503 : grpc_subchannel *destroy;
504 3565 : int cancel_alarm = 0;
505 3614 : gpr_mu_lock(&c->mu);
506 3614 : if (c->active != NULL) {
507 1882 : con = c->active;
508 1882 : CONNECTION_REF_LOCKED(con, "transport-op");
509 : }
510 3614 : if (op->disconnect) {
511 3614 : c->disconnected = 1;
512 3614 : connectivity_state_changed_locked(exec_ctx, c, "disconnect");
513 3614 : if (c->have_alarm) {
514 45 : cancel_alarm = 1;
515 : }
516 : }
517 3614 : gpr_mu_unlock(&c->mu);
518 :
519 3614 : if (con != NULL) {
520 1882 : grpc_channel_stack *channel_stack = CHANNEL_STACK_FROM_CONNECTION(con);
521 1882 : grpc_channel_element *top_elem =
522 : grpc_channel_stack_element(channel_stack, 0);
523 1882 : top_elem->filter->start_transport_op(exec_ctx, top_elem, op);
524 :
525 1882 : gpr_mu_lock(&c->mu);
526 1882 : destroy = CONNECTION_UNREF_LOCKED(exec_ctx, con, "transport-op");
527 1882 : gpr_mu_unlock(&c->mu);
528 1882 : if (destroy) {
529 0 : subchannel_destroy(exec_ctx, destroy);
530 : }
531 : }
532 :
533 3614 : if (cancel_alarm) {
534 47 : grpc_timer_cancel(exec_ctx, &c->alarm);
535 : }
536 :
537 3614 : if (op->disconnect) {
538 3614 : grpc_connector_shutdown(exec_ctx, c->connector);
539 : }
540 3614 : }
541 :
542 2346 : static void on_state_changed(grpc_exec_ctx *exec_ctx, void *p,
543 : int iomgr_success) {
544 2306 : state_watcher *sw = p;
545 2346 : grpc_subchannel *c = sw->subchannel;
546 2346 : gpr_mu *mu = &c->mu;
547 : int destroy;
548 : grpc_transport_op op;
549 : grpc_channel_element *elem;
550 2306 : connection *destroy_connection = NULL;
551 :
552 2346 : gpr_mu_lock(mu);
553 :
554 : /* if we failed or there is a version number mismatch, just leave
555 : this closure */
556 2346 : if (!iomgr_success || sw->subchannel->active_version != sw->version) {
557 : goto done;
558 : }
559 :
560 2346 : switch (sw->connectivity_state) {
561 : case GRPC_CHANNEL_CONNECTING:
562 : case GRPC_CHANNEL_READY:
563 : case GRPC_CHANNEL_IDLE:
564 : /* all is still good: keep watching */
565 0 : memset(&op, 0, sizeof(op));
566 0 : op.connectivity_state = &sw->connectivity_state;
567 0 : op.on_connectivity_state_change = &sw->closure;
568 0 : elem = grpc_channel_stack_element(
569 0 : CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
570 0 : elem->filter->start_transport_op(exec_ctx, elem, &op);
571 : /* early out */
572 0 : gpr_mu_unlock(mu);
573 2346 : return;
574 : case GRPC_CHANNEL_FATAL_FAILURE:
575 : case GRPC_CHANNEL_TRANSIENT_FAILURE:
576 : /* things have gone wrong, deactivate and enter idle */
577 2346 : if (sw->subchannel->active->refs == 0) {
578 1881 : destroy_connection = sw->subchannel->active;
579 : }
580 2346 : sw->subchannel->active = NULL;
581 2346 : grpc_connectivity_state_set(exec_ctx, &c->state_tracker,
582 2346 : c->disconnected
583 : ? GRPC_CHANNEL_FATAL_FAILURE
584 : : GRPC_CHANNEL_TRANSIENT_FAILURE,
585 : "connection_failed");
586 2346 : break;
587 : }
588 :
589 : done:
590 2346 : connectivity_state_changed_locked(exec_ctx, c, "transport_state_changed");
591 2306 : destroy = SUBCHANNEL_UNREF_LOCKED(c, "state_watcher");
592 2346 : gpr_free(sw);
593 2346 : gpr_mu_unlock(mu);
594 2346 : if (destroy) {
595 1718 : subchannel_destroy(exec_ctx, c);
596 : }
597 2345 : if (destroy_connection != NULL) {
598 1884 : connection_destroy(exec_ctx, destroy_connection);
599 : }
600 : }
601 :
602 2347 : static void publish_transport(grpc_exec_ctx *exec_ctx, grpc_subchannel *c) {
603 : size_t channel_stack_size;
604 : connection *con;
605 : grpc_channel_stack *stk;
606 : size_t num_filters;
607 : const grpc_channel_filter **filters;
608 : waiting_for_connect *w4c;
609 : grpc_transport_op op;
610 : state_watcher *sw;
611 2306 : connection *destroy_connection = NULL;
612 : grpc_channel_element *elem;
613 :
614 : /* build final filter list */
615 2347 : num_filters = c->num_filters + c->connecting_result.num_filters + 1;
616 2347 : filters = gpr_malloc(sizeof(*filters) * num_filters);
617 2347 : memcpy((void *)filters, c->filters, sizeof(*filters) * c->num_filters);
618 2347 : memcpy((void *)(filters + c->num_filters), c->connecting_result.filters,
619 2347 : sizeof(*filters) * c->connecting_result.num_filters);
620 2347 : filters[num_filters - 1] = &grpc_connected_channel_filter;
621 :
622 : /* construct channel stack */
623 2347 : channel_stack_size = grpc_channel_stack_size(filters, num_filters);
624 2346 : con = gpr_malloc(sizeof(connection) + channel_stack_size);
625 2347 : stk = (grpc_channel_stack *)(con + 1);
626 2347 : con->refs = 0;
627 2347 : con->subchannel = c;
628 2347 : grpc_channel_stack_init(exec_ctx, filters, num_filters, c->master, c->args,
629 : stk);
630 2347 : grpc_connected_channel_bind_transport(stk, c->connecting_result.transport);
631 2347 : gpr_free((void *)c->connecting_result.filters);
632 2347 : memset(&c->connecting_result, 0, sizeof(c->connecting_result));
633 :
634 : /* initialize state watcher */
635 2347 : sw = gpr_malloc(sizeof(*sw));
636 2347 : grpc_closure_init(&sw->closure, on_state_changed, sw);
637 2347 : sw->subchannel = c;
638 2347 : sw->connectivity_state = GRPC_CHANNEL_READY;
639 :
640 2347 : gpr_mu_lock(&c->mu);
641 :
642 2347 : if (c->disconnected) {
643 0 : gpr_mu_unlock(&c->mu);
644 0 : gpr_free(sw);
645 0 : gpr_free((void *)filters);
646 0 : grpc_channel_stack_destroy(exec_ctx, stk);
647 0 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
648 0 : GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
649 2347 : return;
650 : }
651 :
652 : /* publish */
653 2347 : if (c->active != NULL && c->active->refs == 0) {
654 0 : destroy_connection = c->active;
655 : }
656 2347 : c->active = con;
657 2347 : c->active_version++;
658 2347 : sw->version = c->active_version;
659 2347 : c->connecting = 0;
660 :
661 : /* watch for changes; subchannel ref for connecting is donated
662 : to the state watcher */
663 2347 : memset(&op, 0, sizeof(op));
664 2347 : op.connectivity_state = &sw->connectivity_state;
665 2347 : op.on_connectivity_state_change = &sw->closure;
666 2347 : op.bind_pollset_set = c->pollset_set;
667 2306 : SUBCHANNEL_REF_LOCKED(c, "state_watcher");
668 2347 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
669 2347 : GPR_ASSERT(!SUBCHANNEL_UNREF_LOCKED(c, "connecting"));
670 2347 : elem =
671 2347 : grpc_channel_stack_element(CHANNEL_STACK_FROM_CONNECTION(c->active), 0);
672 2347 : elem->filter->start_transport_op(exec_ctx, elem, &op);
673 :
674 : /* signal completion */
675 2347 : connectivity_state_changed_locked(exec_ctx, c, "connected");
676 2347 : w4c = c->waiting;
677 2347 : c->waiting = NULL;
678 :
679 2347 : gpr_mu_unlock(&c->mu);
680 :
681 5020 : while (w4c != NULL) {
682 326 : waiting_for_connect *next = w4c->next;
683 326 : grpc_exec_ctx_enqueue(exec_ctx, &w4c->continuation, 1);
684 326 : w4c = next;
685 : }
686 :
687 2347 : gpr_free((void *)filters);
688 :
689 2347 : if (destroy_connection != NULL) {
690 0 : connection_destroy(exec_ctx, destroy_connection);
691 : }
692 : }
693 :
694 : /* Generate a random number between 0 and 1. */
695 264 : static double generate_uniform_random_number(grpc_subchannel *c) {
696 264 : c->random = (1103515245 * c->random + 12345) % ((gpr_uint32)1 << 31);
697 264 : return c->random / (double)((gpr_uint32)1 << 31);
698 : }
699 :
700 : /* Update backoff_delta and next_attempt in subchannel */
701 394 : static void update_reconnect_parameters(grpc_subchannel *c) {
702 : size_t i;
703 : gpr_int32 backoff_delta_millis, jitter;
704 394 : gpr_int32 max_backoff_millis =
705 : GRPC_SUBCHANNEL_RECONNECT_MAX_BACKOFF_SECONDS * 1000;
706 : double jitter_range;
707 :
708 394 : if (c->args) {
709 525 : for (i = 0; i < c->args->num_args; i++) {
710 261 : if (0 == strcmp(c->args->args[i].key,
711 : "grpc.testing.fixed_reconnect_backoff")) {
712 130 : GPR_ASSERT(c->args->args[i].type == GRPC_ARG_INTEGER);
713 130 : c->next_attempt = gpr_time_add(
714 : gpr_now(GPR_CLOCK_MONOTONIC),
715 130 : gpr_time_from_millis(c->args->args[i].value.integer, GPR_TIMESPAN));
716 524 : return;
717 : }
718 : }
719 : }
720 :
721 264 : backoff_delta_millis =
722 264 : (gpr_int32)(gpr_time_to_millis(c->backoff_delta) *
723 : GRPC_SUBCHANNEL_RECONNECT_BACKOFF_MULTIPLIER);
724 264 : if (backoff_delta_millis > max_backoff_millis) {
725 0 : backoff_delta_millis = max_backoff_millis;
726 : }
727 264 : c->backoff_delta = gpr_time_from_millis(backoff_delta_millis, GPR_TIMESPAN);
728 264 : c->next_attempt =
729 264 : gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC), c->backoff_delta);
730 :
731 264 : jitter_range = GRPC_SUBCHANNEL_RECONNECT_JITTER * backoff_delta_millis;
732 264 : jitter =
733 264 : (gpr_int32)((2 * generate_uniform_random_number(c) - 1) * jitter_range);
734 264 : c->next_attempt =
735 264 : gpr_time_add(c->next_attempt, gpr_time_from_millis(jitter, GPR_TIMESPAN));
736 : }
737 :
738 725 : static void on_alarm(grpc_exec_ctx *exec_ctx, void *arg, int iomgr_success) {
739 723 : grpc_subchannel *c = arg;
740 725 : gpr_mu_lock(&c->mu);
741 725 : c->have_alarm = 0;
742 725 : if (c->disconnected) {
743 329 : iomgr_success = 0;
744 : }
745 725 : connectivity_state_changed_locked(exec_ctx, c, "alarm");
746 725 : gpr_mu_unlock(&c->mu);
747 725 : if (iomgr_success) {
748 394 : update_reconnect_parameters(c);
749 394 : continue_connect(exec_ctx, c);
750 : } else {
751 331 : cancel_waiting_calls(exec_ctx, c, iomgr_success);
752 331 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->master, "connecting");
753 331 : GRPC_SUBCHANNEL_UNREF(exec_ctx, c, "connecting");
754 : }
755 725 : }
756 :
757 3072 : static void subchannel_connected(grpc_exec_ctx *exec_ctx, void *arg,
758 : int iomgr_success) {
759 3029 : grpc_subchannel *c = arg;
760 3072 : if (c->connecting_result.transport != NULL) {
761 2347 : publish_transport(exec_ctx, c);
762 : } else {
763 725 : gpr_timespec now = gpr_now(GPR_CLOCK_MONOTONIC);
764 725 : gpr_mu_lock(&c->mu);
765 725 : GPR_ASSERT(!c->have_alarm);
766 725 : c->have_alarm = 1;
767 725 : connectivity_state_changed_locked(exec_ctx, c, "connect_failed");
768 725 : grpc_timer_init(exec_ctx, &c->alarm, c->next_attempt, on_alarm, c, now);
769 725 : gpr_mu_unlock(&c->mu);
770 : }
771 3072 : }
772 :
773 3114 : static gpr_timespec compute_connect_deadline(grpc_subchannel *c) {
774 3114 : gpr_timespec current_deadline =
775 : gpr_time_add(c->next_attempt, c->backoff_delta);
776 3114 : gpr_timespec min_deadline = gpr_time_add(
777 : gpr_now(GPR_CLOCK_MONOTONIC),
778 : gpr_time_from_seconds(GRPC_SUBCHANNEL_MIN_CONNECT_TIMEOUT_SECONDS,
779 : GPR_TIMESPAN));
780 3114 : return gpr_time_cmp(current_deadline, min_deadline) > 0 ? current_deadline
781 : : min_deadline;
782 : }
783 :
784 12258 : static grpc_connectivity_state compute_connectivity_locked(grpc_subchannel *c) {
785 12477 : if (c->disconnected) {
786 6060 : return GRPC_CHANNEL_FATAL_FAILURE;
787 : }
788 6366 : if (c->connecting) {
789 3555 : if (c->have_alarm) {
790 439 : return GRPC_CHANNEL_TRANSIENT_FAILURE;
791 : }
792 3029 : return GRPC_CHANNEL_CONNECTING;
793 : }
794 2811 : if (c->active) {
795 2306 : return GRPC_CHANNEL_READY;
796 : }
797 424 : return GRPC_CHANNEL_IDLE;
798 : }
799 :
800 12477 : static void connectivity_state_changed_locked(grpc_exec_ctx *exec_ctx,
801 : grpc_subchannel *c,
802 : const char *reason) {
803 12258 : grpc_connectivity_state current = compute_connectivity_locked(c);
804 12477 : grpc_connectivity_state_set(exec_ctx, &c->state_tracker, current, reason);
805 12477 : }
806 :
807 : /*
808 : * grpc_subchannel_call implementation
809 : */
810 :
811 2166392 : static void subchannel_call_destroy(grpc_exec_ctx *exec_ctx, void *call,
812 : int success) {
813 2166373 : grpc_subchannel_call *c = call;
814 2166392 : gpr_mu *mu = &c->connection->subchannel->mu;
815 : grpc_subchannel *destroy;
816 : GPR_TIMER_BEGIN("grpc_subchannel_call_unref.destroy", 0);
817 2166392 : grpc_call_stack_destroy(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
818 2167222 : gpr_mu_lock(mu);
819 2167251 : destroy = CONNECTION_UNREF_LOCKED(exec_ctx, c->connection, "call");
820 2167251 : gpr_mu_unlock(mu);
821 2167249 : gpr_free(c);
822 2167206 : if (destroy != NULL) {
823 157 : subchannel_destroy(exec_ctx, destroy);
824 : }
825 : GPR_TIMER_END("grpc_subchannel_call_unref.destroy", 0);
826 2167206 : }
827 :
828 2167227 : void grpc_subchannel_call_ref(grpc_subchannel_call *c
829 : GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
830 : #ifdef GRPC_STREAM_REFCOUNT_DEBUG
831 : grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c), reason);
832 : #else
833 2167227 : grpc_call_stack_ref(SUBCHANNEL_CALL_TO_CALL_STACK(c));
834 : #endif
835 2167304 : }
836 :
837 4332322 : void grpc_subchannel_call_unref(grpc_exec_ctx *exec_ctx,
838 : grpc_subchannel_call *c
839 : GRPC_SUBCHANNEL_REF_EXTRA_ARGS) {
840 : #ifdef GRPC_STREAM_REFCOUNT_DEBUG
841 : grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c), reason);
842 : #else
843 4332322 : grpc_call_stack_unref(exec_ctx, SUBCHANNEL_CALL_TO_CALL_STACK(c));
844 : #endif
845 4333313 : }
846 :
847 434 : char *grpc_subchannel_call_get_peer(grpc_exec_ctx *exec_ctx,
848 : grpc_subchannel_call *call) {
849 434 : grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
850 434 : grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
851 434 : return top_elem->filter->get_peer(exec_ctx, top_elem);
852 : }
853 :
854 4171414 : void grpc_subchannel_call_process_op(grpc_exec_ctx *exec_ctx,
855 : grpc_subchannel_call *call,
856 : grpc_transport_stream_op *op) {
857 4171414 : grpc_call_stack *call_stack = SUBCHANNEL_CALL_TO_CALL_STACK(call);
858 4171414 : grpc_call_element *top_elem = grpc_call_stack_element(call_stack, 0);
859 4171467 : top_elem->filter->start_transport_stream_op(exec_ctx, top_elem, op);
860 4173196 : }
861 :
862 2165267 : static grpc_subchannel_call *create_call(grpc_exec_ctx *exec_ctx,
863 : connection *con,
864 : grpc_pollset *pollset) {
865 2165267 : grpc_channel_stack *chanstk = CHANNEL_STACK_FROM_CONNECTION(con);
866 2165267 : grpc_subchannel_call *call =
867 2165267 : gpr_malloc(sizeof(grpc_subchannel_call) + chanstk->call_stack_size);
868 2166955 : grpc_call_stack *callstk = SUBCHANNEL_CALL_TO_CALL_STACK(call);
869 2166955 : call->connection = con;
870 2166955 : grpc_call_stack_init(exec_ctx, chanstk, 1, subchannel_call_destroy, call,
871 : NULL, NULL, callstk);
872 2167307 : grpc_call_stack_set_pollset(exec_ctx, callstk, pollset);
873 2167328 : return call;
874 : }
875 :
876 270 : grpc_channel *grpc_subchannel_get_master(grpc_subchannel *subchannel) {
877 270 : return subchannel->master;
878 : }
879 :
880 0 : grpc_call_stack *grpc_subchannel_call_get_call_stack(
881 : grpc_subchannel_call *subchannel_call) {
882 0 : return SUBCHANNEL_CALL_TO_CALL_STACK(subchannel_call);
883 : }
|