Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/channel/client_channel.h"
35 :
36 : #include <stdio.h>
37 : #include <string.h>
38 :
39 : #include "src/core/channel/channel_args.h"
40 : #include "src/core/channel/connected_channel.h"
41 : #include "src/core/surface/channel.h"
42 : #include "src/core/iomgr/iomgr.h"
43 : #include "src/core/support/string.h"
44 : #include "src/core/transport/connectivity_state.h"
45 : #include <grpc/support/alloc.h>
46 : #include <grpc/support/log.h>
47 : #include <grpc/support/sync.h>
48 : #include <grpc/support/useful.h>
49 :
50 : /* Client channel implementation */
51 :
52 : typedef struct call_data call_data;
53 :
54 : typedef struct client_channel_channel_data {
55 : /** metadata context for this channel */
56 : grpc_mdctx *mdctx;
57 : /** resolver for this channel */
58 : grpc_resolver *resolver;
59 : /** have we started resolving this channel */
60 : int started_resolving;
61 : /** master channel - the grpc_channel instance that ultimately owns
62 : this channel_data via its channel stack.
63 : We occasionally use this to bump the refcount on the master channel
64 : to keep ourselves alive through an asynchronous operation. */
65 : grpc_channel *master;
66 :
67 : /** mutex protecting client configuration, including all
68 : variables below in this data structure */
69 : gpr_mu mu_config;
70 : /** currently active load balancer - guarded by mu_config */
71 : grpc_lb_policy *lb_policy;
72 : /** incoming configuration - set by resolver.next
73 : guarded by mu_config */
74 : grpc_client_config *incoming_configuration;
75 : /** a list of closures that are all waiting for config to come in */
76 : grpc_closure_list waiting_for_config_closures;
77 : /** resolver callback */
78 : grpc_closure on_config_changed;
79 : /** connectivity state being tracked */
80 : grpc_connectivity_state_tracker state_tracker;
81 : /** when an lb_policy arrives, should we try to exit idle */
82 : int exit_idle_when_lb_policy_arrives;
83 : /** pollset_set of interested parties in a new connection */
84 : grpc_pollset_set pollset_set;
85 : } channel_data;
86 :
87 : /** We create one watcher for each new lb_policy that is returned from a
88 : resolver,
89 : to watch for state changes from the lb_policy. When a state change is seen,
90 : we
91 : update the channel, and create a new watcher */
92 : typedef struct {
93 : channel_data *chand;
94 : grpc_closure on_changed;
95 : grpc_connectivity_state state;
96 : grpc_lb_policy *lb_policy;
97 : } lb_policy_connectivity_watcher;
98 :
99 : typedef enum {
100 : CALL_CREATED,
101 : CALL_WAITING_FOR_SEND,
102 : CALL_WAITING_FOR_CONFIG,
103 : CALL_WAITING_FOR_PICK,
104 : CALL_WAITING_FOR_CALL,
105 : CALL_ACTIVE,
106 : CALL_CANCELLED
107 : } call_state;
108 :
109 : struct call_data {
110 : /* owning element */
111 : grpc_call_element *elem;
112 :
113 : gpr_mu mu_state;
114 :
115 : call_state state;
116 : gpr_timespec deadline;
117 : grpc_subchannel *picked_channel;
118 : grpc_closure async_setup_task;
119 : grpc_transport_stream_op waiting_op;
120 : /* our child call stack */
121 : grpc_subchannel_call *subchannel_call;
122 : grpc_linked_mdelem status;
123 : grpc_linked_mdelem details;
124 : };
125 :
126 : static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
127 : grpc_transport_stream_op *new_op)
128 : GRPC_MUST_USE_RESULT;
129 :
130 667 : static void handle_op_after_cancellation(grpc_exec_ctx *exec_ctx,
131 : grpc_call_element *elem,
132 : grpc_transport_stream_op *op) {
133 667 : call_data *calld = elem->call_data;
134 667 : channel_data *chand = elem->channel_data;
135 667 : if (op->send_ops) {
136 187 : grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
137 187 : op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
138 : }
139 667 : if (op->recv_ops) {
140 : char status[GPR_LTOA_MIN_BUFSIZE];
141 : grpc_metadata_batch mdb;
142 330 : gpr_ltoa(GRPC_STATUS_CANCELLED, status);
143 330 : calld->status.md =
144 330 : grpc_mdelem_from_strings(chand->mdctx, "grpc-status", status);
145 330 : calld->details.md =
146 330 : grpc_mdelem_from_strings(chand->mdctx, "grpc-message", "Cancelled");
147 330 : calld->status.prev = calld->details.next = NULL;
148 330 : calld->status.next = &calld->details;
149 330 : calld->details.prev = &calld->status;
150 330 : mdb.list.head = &calld->status;
151 330 : mdb.list.tail = &calld->details;
152 330 : mdb.garbage.head = mdb.garbage.tail = NULL;
153 330 : mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
154 330 : grpc_sopb_add_metadata(op->recv_ops, mdb);
155 330 : *op->recv_state = GRPC_STREAM_CLOSED;
156 330 : op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
157 : }
158 667 : if (op->on_consumed) {
159 446 : op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
160 : }
161 667 : }
162 :
163 : typedef struct {
164 : grpc_closure closure;
165 : grpc_call_element *elem;
166 : } waiting_call;
167 :
168 : static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
169 : grpc_call_element *elem,
170 : grpc_transport_stream_op *op,
171 : int continuation);
172 :
173 2752 : static void continue_with_pick(grpc_exec_ctx *exec_ctx, void *arg,
174 : int iomgr_success) {
175 2752 : waiting_call *wc = arg;
176 2752 : call_data *calld = wc->elem->call_data;
177 2752 : perform_transport_stream_op(exec_ctx, wc->elem, &calld->waiting_op, 1);
178 2752 : gpr_free(wc);
179 2752 : }
180 :
181 2752 : static void add_to_lb_policy_wait_queue_locked_state_config(
182 : grpc_call_element *elem) {
183 2752 : channel_data *chand = elem->channel_data;
184 2752 : waiting_call *wc = gpr_malloc(sizeof(*wc));
185 2752 : grpc_closure_init(&wc->closure, continue_with_pick, wc);
186 2752 : wc->elem = elem;
187 2752 : grpc_closure_list_add(&chand->waiting_for_config_closures, &wc->closure, 1);
188 2752 : }
189 :
190 1403310 : static int is_empty(void *p, int len) {
191 1403310 : char *ptr = p;
192 : int i;
193 1445574 : for (i = 0; i < len; i++) {
194 1445574 : if (ptr[i] != 0) return 0;
195 : }
196 0 : return 1;
197 : }
198 :
199 1402307 : static void started_call(grpc_exec_ctx *exec_ctx, void *arg,
200 : int iomgr_success) {
201 1402307 : call_data *calld = arg;
202 : grpc_transport_stream_op op;
203 : int have_waiting;
204 :
205 1402307 : gpr_mu_lock(&calld->mu_state);
206 1402260 : if (calld->state == CALL_CANCELLED && calld->subchannel_call != NULL) {
207 0 : memset(&op, 0, sizeof(op));
208 0 : op.cancel_with_status = GRPC_STATUS_CANCELLED;
209 0 : gpr_mu_unlock(&calld->mu_state);
210 0 : grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call, &op);
211 1402260 : } else if (calld->state == CALL_WAITING_FOR_CALL) {
212 1402260 : have_waiting = !is_empty(&calld->waiting_op, sizeof(calld->waiting_op));
213 1402326 : if (calld->subchannel_call != NULL) {
214 1402326 : calld->state = CALL_ACTIVE;
215 1402326 : gpr_mu_unlock(&calld->mu_state);
216 1402345 : if (have_waiting) {
217 1402346 : grpc_subchannel_call_process_op(exec_ctx, calld->subchannel_call,
218 : &calld->waiting_op);
219 : }
220 : } else {
221 0 : calld->state = CALL_CANCELLED;
222 0 : gpr_mu_unlock(&calld->mu_state);
223 0 : if (have_waiting) {
224 0 : handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
225 : }
226 : }
227 : } else {
228 0 : GPR_ASSERT(calld->state == CALL_CANCELLED);
229 0 : gpr_mu_unlock(&calld->mu_state);
230 : }
231 1402317 : }
232 :
233 1401913 : static void picked_target(grpc_exec_ctx *exec_ctx, void *arg,
234 : int iomgr_success) {
235 1401913 : call_data *calld = arg;
236 : grpc_pollset *pollset;
237 :
238 1401913 : if (calld->picked_channel == NULL) {
239 : /* treat this like a cancellation */
240 34 : calld->waiting_op.cancel_with_status = GRPC_STATUS_UNAVAILABLE;
241 34 : perform_transport_stream_op(exec_ctx, calld->elem, &calld->waiting_op, 1);
242 : } else {
243 1401879 : gpr_mu_lock(&calld->mu_state);
244 1402284 : if (calld->state == CALL_CANCELLED) {
245 1 : gpr_mu_unlock(&calld->mu_state);
246 1 : handle_op_after_cancellation(exec_ctx, calld->elem, &calld->waiting_op);
247 : } else {
248 1402283 : GPR_ASSERT(calld->state == CALL_WAITING_FOR_PICK);
249 1402283 : calld->state = CALL_WAITING_FOR_CALL;
250 1402283 : pollset = calld->waiting_op.bind_pollset;
251 1402283 : gpr_mu_unlock(&calld->mu_state);
252 1402338 : grpc_closure_init(&calld->async_setup_task, started_call, calld);
253 1402336 : grpc_subchannel_create_call(exec_ctx, calld->picked_channel, pollset,
254 : &calld->subchannel_call,
255 : &calld->async_setup_task);
256 : }
257 : }
258 1402357 : }
259 :
260 9617 : static grpc_closure *merge_into_waiting_op(grpc_call_element *elem,
261 : grpc_transport_stream_op *new_op) {
262 9617 : call_data *calld = elem->call_data;
263 9617 : grpc_closure *consumed_op = NULL;
264 9617 : grpc_transport_stream_op *waiting_op = &calld->waiting_op;
265 9617 : GPR_ASSERT((waiting_op->send_ops != NULL) + (new_op->send_ops != NULL) <= 1);
266 9617 : GPR_ASSERT((waiting_op->recv_ops != NULL) + (new_op->recv_ops != NULL) <= 1);
267 9617 : if (new_op->send_ops != NULL) {
268 286 : waiting_op->send_ops = new_op->send_ops;
269 286 : waiting_op->is_last_send = new_op->is_last_send;
270 286 : waiting_op->on_done_send = new_op->on_done_send;
271 : }
272 9617 : if (new_op->recv_ops != NULL) {
273 9222 : waiting_op->recv_ops = new_op->recv_ops;
274 9222 : waiting_op->recv_state = new_op->recv_state;
275 9222 : waiting_op->on_done_recv = new_op->on_done_recv;
276 : }
277 9617 : if (new_op->on_consumed != NULL) {
278 109 : if (waiting_op->on_consumed != NULL) {
279 109 : consumed_op = waiting_op->on_consumed;
280 : }
281 109 : waiting_op->on_consumed = new_op->on_consumed;
282 : }
283 9617 : if (new_op->cancel_with_status != GRPC_STATUS_OK) {
284 109 : waiting_op->cancel_with_status = new_op->cancel_with_status;
285 : }
286 9617 : return consumed_op;
287 : }
288 :
289 730 : static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
290 730 : call_data *calld = elem->call_data;
291 730 : channel_data *chand = elem->channel_data;
292 : grpc_subchannel_call *subchannel_call;
293 : char *result;
294 :
295 730 : gpr_mu_lock(&calld->mu_state);
296 730 : if (calld->state == CALL_ACTIVE) {
297 390 : subchannel_call = calld->subchannel_call;
298 390 : GRPC_SUBCHANNEL_CALL_REF(subchannel_call, "get_peer");
299 390 : gpr_mu_unlock(&calld->mu_state);
300 390 : result = grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
301 390 : GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "get_peer");
302 390 : return result;
303 : } else {
304 340 : gpr_mu_unlock(&calld->mu_state);
305 340 : return grpc_channel_get_target(chand->master);
306 : }
307 : }
308 :
309 2857372 : static void perform_transport_stream_op(grpc_exec_ctx *exec_ctx,
310 : grpc_call_element *elem,
311 : grpc_transport_stream_op *op,
312 : int continuation) {
313 2857372 : call_data *calld = elem->call_data;
314 2857372 : channel_data *chand = elem->channel_data;
315 : grpc_subchannel_call *subchannel_call;
316 : grpc_lb_policy *lb_policy;
317 : grpc_transport_stream_op op2;
318 2857372 : GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
319 2857372 : GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
320 :
321 2857372 : gpr_mu_lock(&calld->mu_state);
322 2859148 : switch (calld->state) {
323 : case CALL_ACTIVE:
324 1444838 : GPR_ASSERT(!continuation);
325 1444838 : subchannel_call = calld->subchannel_call;
326 1444838 : gpr_mu_unlock(&calld->mu_state);
327 1444832 : grpc_subchannel_call_process_op(exec_ctx, subchannel_call, op);
328 1444825 : break;
329 : case CALL_CANCELLED:
330 237 : gpr_mu_unlock(&calld->mu_state);
331 237 : handle_op_after_cancellation(exec_ctx, elem, op);
332 237 : break;
333 : case CALL_WAITING_FOR_SEND:
334 395 : GPR_ASSERT(!continuation);
335 395 : grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
336 504 : if (!calld->waiting_op.send_ops &&
337 109 : calld->waiting_op.cancel_with_status == GRPC_STATUS_OK) {
338 0 : gpr_mu_unlock(&calld->mu_state);
339 0 : break;
340 : }
341 395 : *op = calld->waiting_op;
342 395 : memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
343 395 : continuation = 1;
344 : /* fall through */
345 : case CALL_WAITING_FOR_CONFIG:
346 : case CALL_WAITING_FOR_PICK:
347 : case CALL_WAITING_FOR_CALL:
348 12404 : if (!continuation) {
349 9321 : if (op->cancel_with_status != GRPC_STATUS_OK) {
350 99 : calld->state = CALL_CANCELLED;
351 99 : op2 = calld->waiting_op;
352 99 : memset(&calld->waiting_op, 0, sizeof(calld->waiting_op));
353 99 : if (op->on_consumed) {
354 99 : calld->waiting_op.on_consumed = op->on_consumed;
355 99 : op->on_consumed = NULL;
356 0 : } else if (op2.on_consumed) {
357 0 : calld->waiting_op.on_consumed = op2.on_consumed;
358 0 : op2.on_consumed = NULL;
359 : }
360 99 : gpr_mu_unlock(&calld->mu_state);
361 99 : handle_op_after_cancellation(exec_ctx, elem, op);
362 99 : handle_op_after_cancellation(exec_ctx, elem, &op2);
363 : } else {
364 9222 : grpc_exec_ctx_enqueue(exec_ctx, merge_into_waiting_op(elem, op), 1);
365 9222 : gpr_mu_unlock(&calld->mu_state);
366 : }
367 9321 : break;
368 : }
369 : /* fall through */
370 : case CALL_CREATED:
371 1405291 : if (op->cancel_with_status != GRPC_STATUS_OK) {
372 231 : calld->state = CALL_CANCELLED;
373 231 : gpr_mu_unlock(&calld->mu_state);
374 231 : handle_op_after_cancellation(exec_ctx, elem, op);
375 : } else {
376 1405060 : calld->waiting_op = *op;
377 :
378 1405060 : if (op->send_ops == NULL) {
379 : /* need to have some send ops before we can select the
380 : lb target */
381 395 : calld->state = CALL_WAITING_FOR_SEND;
382 395 : gpr_mu_unlock(&calld->mu_state);
383 : } else {
384 1404665 : gpr_mu_lock(&chand->mu_config);
385 1405120 : lb_policy = chand->lb_policy;
386 1405120 : if (lb_policy) {
387 1402368 : grpc_transport_stream_op *waiting_op = &calld->waiting_op;
388 1402368 : grpc_pollset *bind_pollset = waiting_op->bind_pollset;
389 1402368 : grpc_metadata_batch *initial_metadata =
390 1402368 : &waiting_op->send_ops->ops[0].data.metadata;
391 1402368 : GRPC_LB_POLICY_REF(lb_policy, "pick");
392 1402377 : gpr_mu_unlock(&chand->mu_config);
393 1402381 : calld->state = CALL_WAITING_FOR_PICK;
394 :
395 1402381 : GPR_ASSERT(waiting_op->bind_pollset);
396 1402381 : GPR_ASSERT(waiting_op->send_ops);
397 1402381 : GPR_ASSERT(waiting_op->send_ops->nops >= 1);
398 1402381 : GPR_ASSERT(waiting_op->send_ops->ops[0].type == GRPC_OP_METADATA);
399 1402381 : gpr_mu_unlock(&calld->mu_state);
400 :
401 1402382 : grpc_closure_init(&calld->async_setup_task, picked_target, calld);
402 1402382 : grpc_lb_policy_pick(exec_ctx, lb_policy, bind_pollset,
403 : initial_metadata, &calld->picked_channel,
404 : &calld->async_setup_task);
405 :
406 1402380 : GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "pick");
407 2752 : } else if (chand->resolver != NULL) {
408 2752 : calld->state = CALL_WAITING_FOR_CONFIG;
409 2752 : add_to_lb_policy_wait_queue_locked_state_config(elem);
410 2752 : if (!chand->started_resolving && chand->resolver != NULL) {
411 1429 : GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
412 1429 : chand->started_resolving = 1;
413 1429 : grpc_resolver_next(exec_ctx, chand->resolver,
414 : &chand->incoming_configuration,
415 : &chand->on_config_changed);
416 : }
417 2752 : gpr_mu_unlock(&chand->mu_config);
418 2752 : gpr_mu_unlock(&calld->mu_state);
419 : } else {
420 0 : calld->state = CALL_CANCELLED;
421 0 : gpr_mu_unlock(&chand->mu_config);
422 0 : gpr_mu_unlock(&calld->mu_state);
423 0 : handle_op_after_cancellation(exec_ctx, elem, op);
424 : }
425 : }
426 : }
427 1405765 : break;
428 : }
429 2859609 : }
430 :
431 2853679 : static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
432 : grpc_call_element *elem,
433 : grpc_transport_stream_op *op) {
434 2853679 : perform_transport_stream_op(exec_ctx, elem, op, 0);
435 2855883 : }
436 :
437 : static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
438 : grpc_lb_policy *lb_policy,
439 : grpc_connectivity_state current_state);
440 :
441 5592 : static void on_lb_policy_state_changed_locked(
442 : grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
443 : /* check if the notification is for a stale policy */
444 11184 : if (w->lb_policy != w->chand->lb_policy) return;
445 :
446 4206 : grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state,
447 : "lb_changed");
448 4206 : if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
449 4206 : watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
450 : }
451 : }
452 :
453 5592 : static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
454 : int iomgr_success) {
455 5592 : lb_policy_connectivity_watcher *w = arg;
456 :
457 5592 : gpr_mu_lock(&w->chand->mu_config);
458 5592 : on_lb_policy_state_changed_locked(exec_ctx, w);
459 5592 : gpr_mu_unlock(&w->chand->mu_config);
460 :
461 5592 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy");
462 5592 : gpr_free(w);
463 5592 : }
464 :
465 5592 : static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
466 : grpc_lb_policy *lb_policy,
467 : grpc_connectivity_state current_state) {
468 5592 : lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
469 5592 : GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
470 :
471 5592 : w->chand = chand;
472 5592 : grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
473 5592 : w->state = current_state;
474 5592 : w->lb_policy = lb_policy;
475 5592 : grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
476 : &w->on_changed);
477 5592 : }
478 :
479 2833 : static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
480 : int iomgr_success) {
481 2833 : channel_data *chand = arg;
482 2833 : grpc_lb_policy *lb_policy = NULL;
483 : grpc_lb_policy *old_lb_policy;
484 : grpc_resolver *old_resolver;
485 2833 : grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
486 2833 : int exit_idle = 0;
487 :
488 2833 : if (chand->incoming_configuration != NULL) {
489 1386 : lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
490 1386 : if (lb_policy != NULL) {
491 1386 : GRPC_LB_POLICY_REF(lb_policy, "channel");
492 1386 : GRPC_LB_POLICY_REF(lb_policy, "config_change");
493 1386 : state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy);
494 : }
495 :
496 1386 : grpc_client_config_unref(exec_ctx, chand->incoming_configuration);
497 : }
498 :
499 2833 : chand->incoming_configuration = NULL;
500 :
501 2833 : gpr_mu_lock(&chand->mu_config);
502 2833 : old_lb_policy = chand->lb_policy;
503 2833 : chand->lb_policy = lb_policy;
504 2833 : if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
505 2832 : grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures);
506 : }
507 2833 : if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
508 17 : GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
509 17 : exit_idle = 1;
510 17 : chand->exit_idle_when_lb_policy_arrives = 0;
511 : }
512 :
513 4220 : if (iomgr_success && chand->resolver) {
514 1387 : grpc_resolver *resolver = chand->resolver;
515 1387 : GRPC_RESOLVER_REF(resolver, "channel-next");
516 1387 : grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
517 : "new_lb+resolver");
518 1387 : if (lb_policy != NULL) {
519 1386 : watch_lb_policy(exec_ctx, chand, lb_policy, state);
520 : }
521 1387 : gpr_mu_unlock(&chand->mu_config);
522 1387 : GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
523 1387 : grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
524 : &chand->on_config_changed);
525 1387 : GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next");
526 : } else {
527 1446 : old_resolver = chand->resolver;
528 1446 : chand->resolver = NULL;
529 1446 : grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
530 : GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
531 1446 : gpr_mu_unlock(&chand->mu_config);
532 1446 : if (old_resolver != NULL) {
533 0 : grpc_resolver_shutdown(exec_ctx, old_resolver);
534 0 : GRPC_RESOLVER_UNREF(exec_ctx, old_resolver, "channel");
535 : }
536 : }
537 :
538 2833 : if (exit_idle) {
539 17 : grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
540 17 : GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
541 : }
542 :
543 2833 : if (old_lb_policy != NULL) {
544 0 : grpc_lb_policy_shutdown(exec_ctx, old_lb_policy);
545 0 : GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
546 : }
547 :
548 2833 : if (lb_policy != NULL) {
549 1386 : GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
550 : }
551 :
552 2833 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver");
553 2833 : }
554 :
555 1761 : static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
556 : grpc_channel_element *elem,
557 : grpc_transport_op *op) {
558 1761 : grpc_lb_policy *lb_policy = NULL;
559 1761 : channel_data *chand = elem->channel_data;
560 1761 : grpc_resolver *destroy_resolver = NULL;
561 :
562 1761 : grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
563 :
564 1761 : GPR_ASSERT(op->set_accept_stream == NULL);
565 1761 : GPR_ASSERT(op->bind_pollset == NULL);
566 :
567 1761 : gpr_mu_lock(&chand->mu_config);
568 1761 : if (op->on_connectivity_state_change != NULL) {
569 0 : grpc_connectivity_state_notify_on_state_change(
570 : exec_ctx, &chand->state_tracker, op->connectivity_state,
571 : op->on_connectivity_state_change);
572 0 : op->on_connectivity_state_change = NULL;
573 0 : op->connectivity_state = NULL;
574 : }
575 :
576 1761 : if (!is_empty(op, sizeof(*op))) {
577 1761 : lb_policy = chand->lb_policy;
578 1761 : if (lb_policy) {
579 1386 : GRPC_LB_POLICY_REF(lb_policy, "broadcast");
580 : }
581 : }
582 :
583 1761 : if (op->disconnect && chand->resolver != NULL) {
584 1761 : grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
585 : GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
586 1761 : destroy_resolver = chand->resolver;
587 1761 : chand->resolver = NULL;
588 1761 : if (chand->lb_policy != NULL) {
589 1386 : grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy);
590 1386 : GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
591 1386 : chand->lb_policy = NULL;
592 : }
593 : }
594 1761 : gpr_mu_unlock(&chand->mu_config);
595 :
596 1761 : if (destroy_resolver) {
597 1761 : grpc_resolver_shutdown(exec_ctx, destroy_resolver);
598 1761 : GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel");
599 : }
600 :
601 1761 : if (lb_policy) {
602 1386 : grpc_lb_policy_broadcast(exec_ctx, lb_policy, op);
603 1386 : GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast");
604 : }
605 1761 : }
606 :
607 : /* Constructor for call_data */
608 1400980 : static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
609 : const void *server_transport_data,
610 : grpc_transport_stream_op *initial_op) {
611 1400980 : call_data *calld = elem->call_data;
612 :
613 : /* TODO(ctiller): is there something useful we can do here? */
614 1400980 : GPR_ASSERT(initial_op == NULL);
615 :
616 1400980 : GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
617 1400980 : GPR_ASSERT(server_transport_data == NULL);
618 1400980 : gpr_mu_init(&calld->mu_state);
619 1402469 : calld->elem = elem;
620 1402469 : calld->state = CALL_CREATED;
621 1402469 : calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
622 1402689 : }
623 :
624 : /* Destructor for call_data */
625 1402501 : static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
626 : grpc_call_element *elem) {
627 1402501 : call_data *calld = elem->call_data;
628 : grpc_subchannel_call *subchannel_call;
629 :
630 : /* if the call got activated, we need to destroy the child stack also, and
631 : remove it from the in-flight requests tracked by the child_entry we
632 : picked */
633 1402501 : gpr_mu_lock(&calld->mu_state);
634 1402673 : switch (calld->state) {
635 : case CALL_ACTIVE:
636 1402343 : subchannel_call = calld->subchannel_call;
637 1402343 : gpr_mu_unlock(&calld->mu_state);
638 1402320 : GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, subchannel_call, "client_channel");
639 1402315 : break;
640 : case CALL_CREATED:
641 : case CALL_CANCELLED:
642 330 : gpr_mu_unlock(&calld->mu_state);
643 330 : break;
644 : case CALL_WAITING_FOR_PICK:
645 : case CALL_WAITING_FOR_CONFIG:
646 : case CALL_WAITING_FOR_CALL:
647 : case CALL_WAITING_FOR_SEND:
648 0 : GPR_UNREACHABLE_CODE(return );
649 : }
650 1402645 : }
651 :
652 : /* Constructor for channel_data */
653 1761 : static void init_channel_elem(grpc_exec_ctx *exec_ctx,
654 : grpc_channel_element *elem, grpc_channel *master,
655 : const grpc_channel_args *args,
656 : grpc_mdctx *metadata_context, int is_first,
657 : int is_last) {
658 1761 : channel_data *chand = elem->channel_data;
659 :
660 1761 : memset(chand, 0, sizeof(*chand));
661 :
662 1761 : GPR_ASSERT(is_last);
663 1761 : GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
664 :
665 1761 : gpr_mu_init(&chand->mu_config);
666 1761 : chand->mdctx = metadata_context;
667 1761 : chand->master = master;
668 1761 : grpc_pollset_set_init(&chand->pollset_set);
669 1761 : grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
670 :
671 1761 : grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
672 : "client_channel");
673 1761 : }
674 :
675 : /* Destructor for channel_data */
676 1761 : static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
677 : grpc_channel_element *elem) {
678 1761 : channel_data *chand = elem->channel_data;
679 :
680 1761 : if (chand->resolver != NULL) {
681 0 : grpc_resolver_shutdown(exec_ctx, chand->resolver);
682 0 : GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
683 : }
684 1761 : if (chand->lb_policy != NULL) {
685 0 : GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
686 : }
687 1761 : grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
688 1761 : grpc_pollset_set_destroy(&chand->pollset_set);
689 1761 : gpr_mu_destroy(&chand->mu_config);
690 1761 : }
691 :
692 : const grpc_channel_filter grpc_client_channel_filter = {
693 : cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
694 : init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem,
695 : destroy_channel_elem, cc_get_peer, "client-channel",
696 : };
697 :
698 1761 : void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
699 : grpc_channel_stack *channel_stack,
700 : grpc_resolver *resolver) {
701 : /* post construction initialization: set the transport setup pointer */
702 1761 : grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
703 1761 : channel_data *chand = elem->channel_data;
704 1761 : gpr_mu_lock(&chand->mu_config);
705 1761 : GPR_ASSERT(!chand->resolver);
706 1761 : chand->resolver = resolver;
707 1761 : GRPC_RESOLVER_REF(resolver, "channel");
708 3522 : if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
709 1761 : chand->exit_idle_when_lb_policy_arrives) {
710 0 : chand->started_resolving = 1;
711 0 : GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
712 0 : grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
713 : &chand->on_config_changed);
714 : }
715 1761 : gpr_mu_unlock(&chand->mu_config);
716 1761 : }
717 :
718 129 : grpc_connectivity_state grpc_client_channel_check_connectivity_state(
719 : grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
720 129 : channel_data *chand = elem->channel_data;
721 : grpc_connectivity_state out;
722 129 : gpr_mu_lock(&chand->mu_config);
723 129 : out = grpc_connectivity_state_check(&chand->state_tracker);
724 129 : if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
725 17 : if (chand->lb_policy != NULL) {
726 0 : grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
727 : } else {
728 17 : chand->exit_idle_when_lb_policy_arrives = 1;
729 17 : if (!chand->started_resolving && chand->resolver != NULL) {
730 17 : GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
731 17 : chand->started_resolving = 1;
732 17 : grpc_resolver_next(exec_ctx, chand->resolver,
733 : &chand->incoming_configuration,
734 : &chand->on_config_changed);
735 : }
736 : }
737 : }
738 129 : gpr_mu_unlock(&chand->mu_config);
739 129 : return out;
740 : }
741 :
742 94 : void grpc_client_channel_watch_connectivity_state(
743 : grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
744 : grpc_connectivity_state *state, grpc_closure *on_complete) {
745 94 : channel_data *chand = elem->channel_data;
746 94 : gpr_mu_lock(&chand->mu_config);
747 94 : grpc_connectivity_state_notify_on_state_change(
748 : exec_ctx, &chand->state_tracker, state, on_complete);
749 94 : gpr_mu_unlock(&chand->mu_config);
750 94 : }
751 :
752 1470 : grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
753 : grpc_channel_element *elem) {
754 1470 : channel_data *chand = elem->channel_data;
755 1470 : return &chand->pollset_set;
756 : }
757 :
758 94 : void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx,
759 : grpc_channel_element *elem,
760 : grpc_pollset *pollset) {
761 94 : channel_data *chand = elem->channel_data;
762 94 : grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset);
763 94 : }
764 :
765 94 : void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx,
766 : grpc_channel_element *elem,
767 : grpc_pollset *pollset) {
768 94 : channel_data *chand = elem->channel_data;
769 94 : grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset);
770 94 : }
|