Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/channel/client_channel.h"
35 :
36 : #include <stdio.h>
37 : #include <string.h>
38 :
39 : #include <grpc/support/alloc.h>
40 : #include <grpc/support/log.h>
41 : #include <grpc/support/sync.h>
42 : #include <grpc/support/useful.h>
43 :
44 : #include "src/core/channel/channel_args.h"
45 : #include "src/core/channel/connected_channel.h"
46 : #include "src/core/channel/subchannel_call_holder.h"
47 : #include "src/core/iomgr/iomgr.h"
48 : #include "src/core/profiling/timers.h"
49 : #include "src/core/support/string.h"
50 : #include "src/core/surface/channel.h"
51 : #include "src/core/transport/connectivity_state.h"
52 :
53 : /* Client channel implementation */
54 :
55 : typedef grpc_subchannel_call_holder call_data;
56 :
57 : typedef struct client_channel_channel_data {
58 : /** resolver for this channel */
59 : grpc_resolver *resolver;
60 : /** have we started resolving this channel */
61 : int started_resolving;
62 : /** master channel - the grpc_channel instance that ultimately owns
63 : this channel_data via its channel stack.
64 : We occasionally use this to bump the refcount on the master channel
65 : to keep ourselves alive through an asynchronous operation. */
66 : grpc_channel *master;
67 :
68 : /** mutex protecting client configuration, including all
69 : variables below in this data structure */
70 : gpr_mu mu_config;
71 : /** currently active load balancer - guarded by mu_config */
72 : grpc_lb_policy *lb_policy;
73 : /** incoming configuration - set by resolver.next
74 : guarded by mu_config */
75 : grpc_client_config *incoming_configuration;
76 : /** a list of closures that are all waiting for config to come in */
77 : grpc_closure_list waiting_for_config_closures;
78 : /** resolver callback */
79 : grpc_closure on_config_changed;
80 : /** connectivity state being tracked */
81 : grpc_connectivity_state_tracker state_tracker;
82 : /** when an lb_policy arrives, should we try to exit idle */
83 : int exit_idle_when_lb_policy_arrives;
84 : /** pollset_set of interested parties in a new connection */
85 : grpc_pollset_set pollset_set;
86 : } channel_data;
87 :
88 : /** We create one watcher for each new lb_policy that is returned from a
89 : resolver,
90 : to watch for state changes from the lb_policy. When a state change is seen,
91 : we
92 : update the channel, and create a new watcher */
93 : typedef struct {
94 : channel_data *chand;
95 : grpc_closure on_changed;
96 : grpc_connectivity_state state;
97 : grpc_lb_policy *lb_policy;
98 : } lb_policy_connectivity_watcher;
99 :
100 : typedef struct {
101 : grpc_closure closure;
102 : grpc_call_element *elem;
103 : } waiting_call;
104 :
105 736 : static char *cc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
106 736 : channel_data *chand = elem->channel_data;
107 736 : return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
108 : chand->master);
109 : }
110 :
111 4117282 : static void cc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
112 : grpc_call_element *elem,
113 : grpc_transport_stream_op *op) {
114 4117282 : GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
115 4117282 : grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op);
116 4121456 : }
117 :
118 : static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
119 : grpc_lb_policy *lb_policy,
120 : grpc_connectivity_state current_state);
121 :
122 8568 : static void on_lb_policy_state_changed_locked(
123 : grpc_exec_ctx *exec_ctx, lb_policy_connectivity_watcher *w) {
124 : /* check if the notification is for a stale policy */
125 17136 : if (w->lb_policy != w->chand->lb_policy) return;
126 :
127 6381 : grpc_connectivity_state_set(exec_ctx, &w->chand->state_tracker, w->state,
128 : "lb_changed");
129 6381 : if (w->state != GRPC_CHANNEL_FATAL_FAILURE) {
130 6381 : watch_lb_policy(exec_ctx, w->chand, w->lb_policy, w->state);
131 : }
132 : }
133 :
134 8568 : static void on_lb_policy_state_changed(grpc_exec_ctx *exec_ctx, void *arg,
135 : int iomgr_success) {
136 8354 : lb_policy_connectivity_watcher *w = arg;
137 :
138 8568 : gpr_mu_lock(&w->chand->mu_config);
139 8568 : on_lb_policy_state_changed_locked(exec_ctx, w);
140 8568 : gpr_mu_unlock(&w->chand->mu_config);
141 :
142 8568 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->chand->master, "watch_lb_policy");
143 8568 : gpr_free(w);
144 8568 : }
145 :
146 8606 : static void watch_lb_policy(grpc_exec_ctx *exec_ctx, channel_data *chand,
147 : grpc_lb_policy *lb_policy,
148 : grpc_connectivity_state current_state) {
149 8606 : lb_policy_connectivity_watcher *w = gpr_malloc(sizeof(*w));
150 8606 : GRPC_CHANNEL_INTERNAL_REF(chand->master, "watch_lb_policy");
151 :
152 8606 : w->chand = chand;
153 8606 : grpc_closure_init(&w->on_changed, on_lb_policy_state_changed, w);
154 8606 : w->state = current_state;
155 8606 : w->lb_policy = lb_policy;
156 8606 : grpc_lb_policy_notify_on_state_change(exec_ctx, lb_policy, &w->state,
157 : &w->on_changed);
158 8606 : }
159 :
160 4536 : static void cc_on_config_changed(grpc_exec_ctx *exec_ctx, void *arg,
161 : int iomgr_success) {
162 4484 : channel_data *chand = arg;
163 4484 : grpc_lb_policy *lb_policy = NULL;
164 : grpc_lb_policy *old_lb_policy;
165 : grpc_resolver *old_resolver;
166 4484 : grpc_connectivity_state state = GRPC_CHANNEL_TRANSIENT_FAILURE;
167 4484 : int exit_idle = 0;
168 :
169 4536 : if (chand->incoming_configuration != NULL) {
170 2225 : lb_policy = grpc_client_config_get_lb_policy(chand->incoming_configuration);
171 2225 : if (lb_policy != NULL) {
172 2225 : GRPC_LB_POLICY_REF(lb_policy, "channel");
173 2225 : GRPC_LB_POLICY_REF(lb_policy, "config_change");
174 2225 : state = grpc_lb_policy_check_connectivity(exec_ctx, lb_policy);
175 : }
176 :
177 2225 : grpc_client_config_unref(exec_ctx, chand->incoming_configuration);
178 : }
179 :
180 4536 : chand->incoming_configuration = NULL;
181 :
182 4536 : gpr_mu_lock(&chand->mu_config);
183 4536 : old_lb_policy = chand->lb_policy;
184 4536 : chand->lb_policy = lb_policy;
185 4536 : if (lb_policy != NULL || chand->resolver == NULL /* disconnected */) {
186 4533 : grpc_exec_ctx_enqueue_list(exec_ctx, &chand->waiting_for_config_closures);
187 : }
188 4536 : if (lb_policy != NULL && chand->exit_idle_when_lb_policy_arrives) {
189 271 : GRPC_LB_POLICY_REF(lb_policy, "exit_idle");
190 265 : exit_idle = 1;
191 271 : chand->exit_idle_when_lb_policy_arrives = 0;
192 : }
193 :
194 6764 : if (iomgr_success && chand->resolver) {
195 2183 : grpc_resolver *resolver = chand->resolver;
196 2228 : GRPC_RESOLVER_REF(resolver, "channel-next");
197 2228 : grpc_connectivity_state_set(exec_ctx, &chand->state_tracker, state,
198 : "new_lb+resolver");
199 2228 : if (lb_policy != NULL) {
200 2225 : watch_lb_policy(exec_ctx, chand, lb_policy, state);
201 : }
202 2228 : gpr_mu_unlock(&chand->mu_config);
203 2228 : GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
204 2228 : grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
205 : &chand->on_config_changed);
206 2228 : GRPC_RESOLVER_UNREF(exec_ctx, resolver, "channel-next");
207 : } else {
208 2308 : old_resolver = chand->resolver;
209 2308 : chand->resolver = NULL;
210 2308 : grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
211 : GRPC_CHANNEL_FATAL_FAILURE, "resolver_gone");
212 2308 : gpr_mu_unlock(&chand->mu_config);
213 2308 : if (old_resolver != NULL) {
214 0 : grpc_resolver_shutdown(exec_ctx, old_resolver);
215 0 : GRPC_RESOLVER_UNREF(exec_ctx, old_resolver, "channel");
216 : }
217 : }
218 :
219 4536 : if (exit_idle) {
220 271 : grpc_lb_policy_exit_idle(exec_ctx, lb_policy);
221 271 : GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "exit_idle");
222 : }
223 :
224 4536 : if (old_lb_policy != NULL) {
225 0 : grpc_lb_policy_shutdown(exec_ctx, old_lb_policy);
226 0 : GRPC_LB_POLICY_UNREF(exec_ctx, old_lb_policy, "channel");
227 : }
228 :
229 4536 : if (lb_policy != NULL) {
230 2225 : GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "config_change");
231 : }
232 :
233 4536 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->master, "resolver");
234 4536 : }
235 :
236 2748 : static void cc_start_transport_op(grpc_exec_ctx *exec_ctx,
237 : grpc_channel_element *elem,
238 : grpc_transport_op *op) {
239 2731 : grpc_lb_policy *lb_policy = NULL;
240 2748 : channel_data *chand = elem->channel_data;
241 2731 : grpc_resolver *destroy_resolver = NULL;
242 :
243 2748 : grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
244 :
245 2748 : GPR_ASSERT(op->set_accept_stream == NULL);
246 2748 : GPR_ASSERT(op->bind_pollset == NULL);
247 :
248 2748 : gpr_mu_lock(&chand->mu_config);
249 2748 : if (op->on_connectivity_state_change != NULL) {
250 0 : grpc_connectivity_state_notify_on_state_change(
251 : exec_ctx, &chand->state_tracker, op->connectivity_state,
252 : op->on_connectivity_state_change);
253 0 : op->on_connectivity_state_change = NULL;
254 0 : op->connectivity_state = NULL;
255 : }
256 :
257 2748 : lb_policy = chand->lb_policy;
258 2748 : if (lb_policy) {
259 2187 : GRPC_LB_POLICY_REF(lb_policy, "broadcast");
260 : }
261 :
262 2748 : if (op->disconnect && chand->resolver != NULL) {
263 2748 : grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
264 : GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
265 2748 : destroy_resolver = chand->resolver;
266 2748 : chand->resolver = NULL;
267 2748 : if (chand->lb_policy != NULL) {
268 2187 : grpc_lb_policy_shutdown(exec_ctx, chand->lb_policy);
269 2187 : GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
270 2187 : chand->lb_policy = NULL;
271 : }
272 : }
273 2748 : gpr_mu_unlock(&chand->mu_config);
274 :
275 2748 : if (destroy_resolver) {
276 2748 : grpc_resolver_shutdown(exec_ctx, destroy_resolver);
277 2748 : GRPC_RESOLVER_UNREF(exec_ctx, destroy_resolver, "channel");
278 : }
279 :
280 2748 : if (lb_policy) {
281 2187 : grpc_lb_policy_broadcast(exec_ctx, lb_policy, op);
282 2187 : GRPC_LB_POLICY_UNREF(exec_ctx, lb_policy, "broadcast");
283 : }
284 2748 : }
285 :
286 : typedef struct {
287 : grpc_metadata_batch *initial_metadata;
288 : grpc_subchannel **subchannel;
289 : grpc_closure *on_ready;
290 : grpc_call_element *elem;
291 : grpc_closure closure;
292 : } continue_picking_args;
293 :
294 : static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
295 : grpc_metadata_batch *initial_metadata,
296 : grpc_subchannel **subchannel,
297 : grpc_closure *on_ready);
298 :
299 2252 : static void continue_picking(grpc_exec_ctx *exec_ctx, void *arg, int success) {
300 2212 : continue_picking_args *cpa = arg;
301 2252 : if (!success) {
302 0 : grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
303 2252 : } else if (cpa->subchannel == NULL) {
304 : /* cancelled, do nothing */
305 2124 : } else if (cc_pick_subchannel(exec_ctx, cpa->elem, cpa->initial_metadata,
306 : cpa->subchannel, cpa->on_ready)) {
307 0 : grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 1);
308 : }
309 2252 : gpr_free(cpa);
310 2252 : }
311 :
312 2115963 : static int cc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *elemp,
313 : grpc_metadata_batch *initial_metadata,
314 : grpc_subchannel **subchannel,
315 : grpc_closure *on_ready) {
316 2115826 : grpc_call_element *elem = elemp;
317 2115963 : channel_data *chand = elem->channel_data;
318 2115963 : call_data *calld = elem->call_data;
319 : continue_picking_args *cpa;
320 : grpc_closure *closure;
321 :
322 2115963 : GPR_ASSERT(subchannel);
323 :
324 2115963 : gpr_mu_lock(&chand->mu_config);
325 2117453 : if (initial_metadata == NULL) {
326 165 : if (chand->lb_policy != NULL) {
327 37 : grpc_lb_policy_cancel_pick(exec_ctx, chand->lb_policy, subchannel);
328 : }
329 458 : for (closure = chand->waiting_for_config_closures.head; closure != NULL;
330 128 : closure = grpc_closure_next(closure)) {
331 128 : cpa = closure->cb_arg;
332 128 : if (cpa->subchannel == subchannel) {
333 128 : cpa->subchannel = NULL;
334 128 : grpc_exec_ctx_enqueue(exec_ctx, cpa->on_ready, 0);
335 : }
336 : }
337 165 : gpr_mu_unlock(&chand->mu_config);
338 165 : return 1;
339 : }
340 2117288 : if (chand->lb_policy != NULL) {
341 2115036 : int r = grpc_lb_policy_pick(exec_ctx, chand->lb_policy, calld->pollset,
342 : initial_metadata, subchannel, on_ready);
343 2114966 : gpr_mu_unlock(&chand->mu_config);
344 2115008 : return r;
345 : }
346 2252 : if (chand->resolver != NULL && !chand->started_resolving) {
347 2074 : chand->started_resolving = 1;
348 2074 : GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
349 2074 : grpc_resolver_next(exec_ctx, chand->resolver,
350 : &chand->incoming_configuration,
351 : &chand->on_config_changed);
352 : }
353 2252 : cpa = gpr_malloc(sizeof(*cpa));
354 2252 : cpa->initial_metadata = initial_metadata;
355 2252 : cpa->subchannel = subchannel;
356 2252 : cpa->on_ready = on_ready;
357 2252 : cpa->elem = elem;
358 2252 : grpc_closure_init(&cpa->closure, continue_picking, cpa);
359 2252 : grpc_closure_list_add(&chand->waiting_for_config_closures, &cpa->closure, 1);
360 2252 : gpr_mu_unlock(&chand->mu_config);
361 2252 : return 0;
362 : }
363 :
364 : /* Constructor for call_data */
365 2115113 : static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
366 : grpc_call_element_args *args) {
367 2115113 : grpc_subchannel_call_holder_init(elem->call_data, cc_pick_subchannel, elem);
368 2115311 : }
369 :
370 : /* Destructor for call_data */
371 2115164 : static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
372 : grpc_call_element *elem) {
373 2115164 : grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
374 2115326 : }
375 :
376 : /* Constructor for channel_data */
377 2792 : static void init_channel_elem(grpc_exec_ctx *exec_ctx,
378 : grpc_channel_element *elem,
379 : grpc_channel_element_args *args) {
380 2792 : channel_data *chand = elem->channel_data;
381 :
382 2792 : memset(chand, 0, sizeof(*chand));
383 :
384 2792 : GPR_ASSERT(args->is_last);
385 2792 : GPR_ASSERT(elem->filter == &grpc_client_channel_filter);
386 :
387 2792 : gpr_mu_init(&chand->mu_config);
388 2792 : chand->master = args->master;
389 2792 : grpc_pollset_set_init(&chand->pollset_set);
390 2792 : grpc_closure_init(&chand->on_config_changed, cc_on_config_changed, chand);
391 :
392 2792 : grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
393 : "client_channel");
394 2792 : }
395 :
396 : /* Destructor for channel_data */
397 2732 : static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
398 : grpc_channel_element *elem) {
399 2732 : channel_data *chand = elem->channel_data;
400 :
401 2732 : if (chand->resolver != NULL) {
402 0 : grpc_resolver_shutdown(exec_ctx, chand->resolver);
403 0 : GRPC_RESOLVER_UNREF(exec_ctx, chand->resolver, "channel");
404 : }
405 2732 : if (chand->lb_policy != NULL) {
406 0 : GRPC_LB_POLICY_UNREF(exec_ctx, chand->lb_policy, "channel");
407 : }
408 2732 : grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
409 2732 : grpc_pollset_set_destroy(&chand->pollset_set);
410 2732 : gpr_mu_destroy(&chand->mu_config);
411 2732 : }
412 :
413 2114041 : static void cc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
414 : grpc_pollset *pollset) {
415 2114041 : call_data *calld = elem->call_data;
416 2114041 : calld->pollset = pollset;
417 2114041 : }
418 :
419 : const grpc_channel_filter grpc_client_channel_filter = {
420 : cc_start_transport_stream_op, cc_start_transport_op, sizeof(call_data),
421 : init_call_elem, cc_set_pollset, destroy_call_elem, sizeof(channel_data),
422 : init_channel_elem, destroy_channel_elem, cc_get_peer, "client-channel",
423 : };
424 :
425 2791 : void grpc_client_channel_set_resolver(grpc_exec_ctx *exec_ctx,
426 : grpc_channel_stack *channel_stack,
427 : grpc_resolver *resolver) {
428 : /* post construction initialization: set the transport setup pointer */
429 2791 : grpc_channel_element *elem = grpc_channel_stack_last_element(channel_stack);
430 2791 : channel_data *chand = elem->channel_data;
431 2791 : gpr_mu_lock(&chand->mu_config);
432 2791 : GPR_ASSERT(!chand->resolver);
433 2791 : chand->resolver = resolver;
434 2791 : GRPC_RESOLVER_REF(resolver, "channel");
435 5582 : if (!grpc_closure_list_empty(chand->waiting_for_config_closures) ||
436 2791 : chand->exit_idle_when_lb_policy_arrives) {
437 0 : chand->started_resolving = 1;
438 0 : GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
439 0 : grpc_resolver_next(exec_ctx, resolver, &chand->incoming_configuration,
440 : &chand->on_config_changed);
441 : }
442 2791 : gpr_mu_unlock(&chand->mu_config);
443 2791 : }
444 :
445 426 : grpc_connectivity_state grpc_client_channel_check_connectivity_state(
446 : grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
447 426 : channel_data *chand = elem->channel_data;
448 : grpc_connectivity_state out;
449 426 : gpr_mu_lock(&chand->mu_config);
450 426 : out = grpc_connectivity_state_check(&chand->state_tracker);
451 426 : if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
452 273 : if (chand->lb_policy != NULL) {
453 0 : grpc_lb_policy_exit_idle(exec_ctx, chand->lb_policy);
454 : } else {
455 273 : chand->exit_idle_when_lb_policy_arrives = 1;
456 273 : if (!chand->started_resolving && chand->resolver != NULL) {
457 272 : GRPC_CHANNEL_INTERNAL_REF(chand->master, "resolver");
458 272 : chand->started_resolving = 1;
459 272 : grpc_resolver_next(exec_ctx, chand->resolver,
460 : &chand->incoming_configuration,
461 : &chand->on_config_changed);
462 : }
463 : }
464 : }
465 426 : gpr_mu_unlock(&chand->mu_config);
466 426 : return out;
467 : }
468 :
469 128 : void grpc_client_channel_watch_connectivity_state(
470 : grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
471 : grpc_connectivity_state *state, grpc_closure *on_complete) {
472 128 : channel_data *chand = elem->channel_data;
473 128 : gpr_mu_lock(&chand->mu_config);
474 128 : grpc_connectivity_state_notify_on_state_change(
475 : exec_ctx, &chand->state_tracker, state, on_complete);
476 128 : gpr_mu_unlock(&chand->mu_config);
477 128 : }
478 :
479 3913 : grpc_pollset_set *grpc_client_channel_get_connecting_pollset_set(
480 : grpc_channel_element *elem) {
481 3913 : channel_data *chand = elem->channel_data;
482 3913 : return &chand->pollset_set;
483 : }
484 :
485 128 : void grpc_client_channel_add_interested_party(grpc_exec_ctx *exec_ctx,
486 : grpc_channel_element *elem,
487 : grpc_pollset *pollset) {
488 128 : channel_data *chand = elem->channel_data;
489 128 : grpc_pollset_set_add_pollset(exec_ctx, &chand->pollset_set, pollset);
490 128 : }
491 :
492 128 : void grpc_client_channel_del_interested_party(grpc_exec_ctx *exec_ctx,
493 : grpc_channel_element *elem,
494 : grpc_pollset *pollset) {
495 128 : channel_data *chand = elem->channel_data;
496 128 : grpc_pollset_set_del_pollset(exec_ctx, &chand->pollset_set, pollset);
497 128 : }
|