Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/channel/client_uchannel.h"
35 :
36 : #include <string.h>
37 :
38 : #include "src/core/census/grpc_filter.h"
39 : #include "src/core/channel/channel_args.h"
40 : #include "src/core/channel/client_channel.h"
41 : #include "src/core/channel/compress_filter.h"
42 : #include "src/core/channel/subchannel_call_holder.h"
43 : #include "src/core/iomgr/iomgr.h"
44 : #include "src/core/support/string.h"
45 : #include "src/core/surface/channel.h"
46 : #include "src/core/transport/connectivity_state.h"
47 :
48 : #include <grpc/support/alloc.h>
49 : #include <grpc/support/log.h>
50 : #include <grpc/support/sync.h>
51 : #include <grpc/support/useful.h>
52 :
53 : /** Microchannel (uchannel) implementation: a lightweight channel without any
54 : * load-balancing mechanisms meant for communication from within the core. */
55 :
56 : typedef struct client_uchannel_channel_data {
57 : /** master channel - the grpc_channel instance that ultimately owns
58 : this channel_data via its channel stack.
59 : We occasionally use this to bump the refcount on the master channel
60 : to keep ourselves alive through an asynchronous operation. */
61 : grpc_channel *master;
62 :
63 : /** connectivity state being tracked */
64 : grpc_connectivity_state_tracker state_tracker;
65 :
66 : /** the subchannel wrapped by the microchannel */
67 : grpc_subchannel *subchannel;
68 :
69 : /** the callback used to stay subscribed to subchannel connectivity
70 : * notifications */
71 : grpc_closure connectivity_cb;
72 :
73 : /** the current connectivity state of the wrapped subchannel */
74 : grpc_connectivity_state subchannel_connectivity;
75 :
76 : gpr_mu mu_state;
77 : } channel_data;
78 :
79 : typedef grpc_subchannel_call_holder call_data;
80 :
81 12 : static void monitor_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
82 : int iomgr_success) {
83 12 : channel_data *chand = arg;
84 12 : grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
85 : chand->subchannel_connectivity,
86 : "uchannel_monitor_subchannel");
87 12 : grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
88 : &chand->subchannel_connectivity,
89 : &chand->connectivity_cb);
90 12 : }
91 :
92 80 : static char *cuc_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
93 80 : channel_data *chand = elem->channel_data;
94 80 : return grpc_subchannel_call_holder_get_peer(exec_ctx, elem->call_data,
95 : chand->master);
96 : }
97 :
98 52566 : static void cuc_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
99 : grpc_call_element *elem,
100 : grpc_transport_stream_op *op) {
101 52566 : GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
102 52566 : grpc_subchannel_call_holder_perform_op(exec_ctx, elem->call_data, op);
103 52566 : }
104 :
105 246 : static void cuc_start_transport_op(grpc_exec_ctx *exec_ctx,
106 : grpc_channel_element *elem,
107 : grpc_transport_op *op) {
108 246 : channel_data *chand = elem->channel_data;
109 :
110 246 : grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
111 :
112 246 : GPR_ASSERT(op->set_accept_stream == NULL);
113 246 : GPR_ASSERT(op->bind_pollset == NULL);
114 :
115 246 : if (op->on_connectivity_state_change != NULL) {
116 0 : grpc_connectivity_state_notify_on_state_change(
117 : exec_ctx, &chand->state_tracker, op->connectivity_state,
118 : op->on_connectivity_state_change);
119 0 : op->on_connectivity_state_change = NULL;
120 0 : op->connectivity_state = NULL;
121 : }
122 :
123 246 : if (op->disconnect) {
124 246 : grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
125 : GRPC_CHANNEL_FATAL_FAILURE, "disconnect");
126 : }
127 246 : }
128 :
129 52358 : static int cuc_pick_subchannel(grpc_exec_ctx *exec_ctx, void *arg,
130 : grpc_metadata_batch *initial_metadata,
131 : grpc_subchannel **subchannel,
132 : grpc_closure *on_ready) {
133 52358 : channel_data *chand = arg;
134 52358 : GPR_ASSERT(initial_metadata != NULL);
135 52358 : *subchannel = chand->subchannel;
136 52358 : return 1;
137 : }
138 :
139 : /* Constructor for call_data */
140 52394 : static void cuc_init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
141 : grpc_call_element_args *args) {
142 52394 : grpc_subchannel_call_holder_init(elem->call_data, cuc_pick_subchannel,
143 : elem->channel_data);
144 52394 : }
145 :
146 : /* Destructor for call_data */
147 52394 : static void cuc_destroy_call_elem(grpc_exec_ctx *exec_ctx,
148 : grpc_call_element *elem) {
149 52394 : grpc_subchannel_call_holder_destroy(exec_ctx, elem->call_data);
150 52394 : }
151 :
152 : /* Constructor for channel_data */
153 246 : static void cuc_init_channel_elem(grpc_exec_ctx *exec_ctx,
154 : grpc_channel_element *elem,
155 : grpc_channel_element_args *args) {
156 246 : channel_data *chand = elem->channel_data;
157 246 : memset(chand, 0, sizeof(*chand));
158 246 : grpc_closure_init(&chand->connectivity_cb, monitor_subchannel, chand);
159 246 : GPR_ASSERT(args->is_last);
160 246 : GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
161 246 : chand->master = args->master;
162 246 : grpc_connectivity_state_init(&chand->state_tracker, GRPC_CHANNEL_IDLE,
163 : "client_uchannel");
164 246 : gpr_mu_init(&chand->mu_state);
165 246 : }
166 :
167 : /* Destructor for channel_data */
168 246 : static void cuc_destroy_channel_elem(grpc_exec_ctx *exec_ctx,
169 : grpc_channel_element *elem) {
170 246 : channel_data *chand = elem->channel_data;
171 246 : grpc_subchannel_state_change_unsubscribe(exec_ctx, chand->subchannel,
172 : &chand->connectivity_cb);
173 246 : grpc_connectivity_state_destroy(exec_ctx, &chand->state_tracker);
174 246 : gpr_mu_destroy(&chand->mu_state);
175 246 : }
176 :
177 52394 : static void cuc_set_pollset(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
178 : grpc_pollset *pollset) {
179 52394 : call_data *calld = elem->call_data;
180 52394 : calld->pollset = pollset;
181 52394 : }
182 :
183 : const grpc_channel_filter grpc_client_uchannel_filter = {
184 : cuc_start_transport_stream_op, cuc_start_transport_op, sizeof(call_data),
185 : cuc_init_call_elem, cuc_set_pollset, cuc_destroy_call_elem,
186 : sizeof(channel_data), cuc_init_channel_elem, cuc_destroy_channel_elem,
187 : cuc_get_peer, "client-uchannel",
188 : };
189 :
190 16 : grpc_connectivity_state grpc_client_uchannel_check_connectivity_state(
191 : grpc_exec_ctx *exec_ctx, grpc_channel_element *elem, int try_to_connect) {
192 16 : channel_data *chand = elem->channel_data;
193 : grpc_connectivity_state out;
194 16 : out = grpc_connectivity_state_check(&chand->state_tracker);
195 16 : gpr_mu_lock(&chand->mu_state);
196 16 : if (out == GRPC_CHANNEL_IDLE && try_to_connect) {
197 2 : grpc_connectivity_state_set(exec_ctx, &chand->state_tracker,
198 : GRPC_CHANNEL_CONNECTING,
199 : "uchannel_connecting_changed");
200 2 : chand->subchannel_connectivity = out;
201 2 : grpc_subchannel_notify_on_state_change(exec_ctx, chand->subchannel,
202 : &chand->subchannel_connectivity,
203 : &chand->connectivity_cb);
204 : }
205 16 : gpr_mu_unlock(&chand->mu_state);
206 16 : return out;
207 : }
208 :
209 12 : void grpc_client_uchannel_watch_connectivity_state(
210 : grpc_exec_ctx *exec_ctx, grpc_channel_element *elem,
211 : grpc_connectivity_state *state, grpc_closure *on_complete) {
212 12 : channel_data *chand = elem->channel_data;
213 12 : gpr_mu_lock(&chand->mu_state);
214 12 : grpc_connectivity_state_notify_on_state_change(
215 : exec_ctx, &chand->state_tracker, state, on_complete);
216 12 : gpr_mu_unlock(&chand->mu_state);
217 12 : }
218 :
219 24 : grpc_pollset_set *grpc_client_uchannel_get_connecting_pollset_set(
220 : grpc_channel_element *elem) {
221 24 : channel_data *chand = elem->channel_data;
222 : grpc_channel_element *parent_elem;
223 24 : gpr_mu_lock(&chand->mu_state);
224 24 : parent_elem = grpc_channel_stack_last_element(grpc_channel_get_channel_stack(
225 : grpc_subchannel_get_master(chand->subchannel)));
226 24 : gpr_mu_unlock(&chand->mu_state);
227 24 : return grpc_client_channel_get_connecting_pollset_set(parent_elem);
228 : }
229 :
230 12 : void grpc_client_uchannel_add_interested_party(grpc_exec_ctx *exec_ctx,
231 : grpc_channel_element *elem,
232 : grpc_pollset *pollset) {
233 12 : grpc_pollset_set *master_pollset_set =
234 : grpc_client_uchannel_get_connecting_pollset_set(elem);
235 12 : grpc_pollset_set_add_pollset(exec_ctx, master_pollset_set, pollset);
236 12 : }
237 :
238 12 : void grpc_client_uchannel_del_interested_party(grpc_exec_ctx *exec_ctx,
239 : grpc_channel_element *elem,
240 : grpc_pollset *pollset) {
241 12 : grpc_pollset_set *master_pollset_set =
242 : grpc_client_uchannel_get_connecting_pollset_set(elem);
243 12 : grpc_pollset_set_del_pollset(exec_ctx, master_pollset_set, pollset);
244 12 : }
245 :
246 246 : grpc_channel *grpc_client_uchannel_create(grpc_subchannel *subchannel,
247 : grpc_channel_args *args) {
248 246 : grpc_channel *channel = NULL;
249 : #define MAX_FILTERS 3
250 : const grpc_channel_filter *filters[MAX_FILTERS];
251 246 : grpc_channel *master = grpc_subchannel_get_master(subchannel);
252 246 : char *target = grpc_channel_get_target(master);
253 246 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
254 246 : size_t n = 0;
255 :
256 246 : if (grpc_channel_args_is_census_enabled(args)) {
257 2 : filters[n++] = &grpc_client_census_filter;
258 : }
259 246 : filters[n++] = &grpc_compress_filter;
260 246 : filters[n++] = &grpc_client_uchannel_filter;
261 246 : GPR_ASSERT(n <= MAX_FILTERS);
262 :
263 246 : channel =
264 : grpc_channel_create_from_filters(&exec_ctx, target, filters, n, args, 1);
265 :
266 246 : gpr_free(target);
267 246 : return channel;
268 : }
269 :
270 246 : void grpc_client_uchannel_set_subchannel(grpc_channel *uchannel,
271 : grpc_subchannel *subchannel) {
272 246 : grpc_channel_element *elem =
273 246 : grpc_channel_stack_last_element(grpc_channel_get_channel_stack(uchannel));
274 246 : channel_data *chand = elem->channel_data;
275 246 : GPR_ASSERT(elem->filter == &grpc_client_uchannel_filter);
276 246 : gpr_mu_lock(&chand->mu_state);
277 246 : chand->subchannel = subchannel;
278 246 : gpr_mu_unlock(&chand->mu_state);
279 246 : }
|