Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/transport/chttp2_transport.h"
35 :
36 : #include <math.h>
37 : #include <stdio.h>
38 : #include <string.h>
39 :
40 : #include <grpc/support/alloc.h>
41 : #include <grpc/support/log.h>
42 : #include <grpc/support/slice_buffer.h>
43 : #include <grpc/support/string_util.h>
44 : #include <grpc/support/useful.h>
45 :
46 : #include "src/core/profiling/timers.h"
47 : #include "src/core/support/string.h"
48 : #include "src/core/transport/chttp2/http2_errors.h"
49 : #include "src/core/transport/chttp2/internal.h"
50 : #include "src/core/transport/chttp2/status_conversion.h"
51 : #include "src/core/transport/chttp2/timeout_encoding.h"
52 : #include "src/core/transport/static_metadata.h"
53 : #include "src/core/transport/transport_impl.h"
54 :
55 : #define DEFAULT_WINDOW 65535
56 : #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
57 : #define MAX_WINDOW 0x7fffffffu
58 :
59 : #define MAX_CLIENT_STREAM_ID 0x7fffffffu
60 :
61 : int grpc_http_trace = 0;
62 : int grpc_flowctl_trace = 0;
63 :
64 : #define TRANSPORT_FROM_WRITING(tw) \
65 : ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
66 : writing)))
67 :
68 : #define TRANSPORT_FROM_PARSING(tw) \
69 : ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
70 : parsing)))
71 :
72 : #define TRANSPORT_FROM_GLOBAL(tg) \
73 : ((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \
74 : global)))
75 :
76 : #define STREAM_FROM_GLOBAL(sg) \
77 : ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global)))
78 :
79 : #define STREAM_FROM_PARSING(sg) \
80 : ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, parsing)))
81 :
82 : static const grpc_transport_vtable vtable;
83 :
84 : static void lock(grpc_chttp2_transport *t);
85 : static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
86 :
87 : /* forward declarations of various callbacks that we'll build closures around */
88 : static void writing_action(grpc_exec_ctx *exec_ctx, void *t,
89 : int iomgr_success_ignored);
90 :
91 : /** Set a transport level setting, and push it to our peer */
92 : static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
93 : gpr_uint32 value);
94 :
95 : /** Endpoint callback to process incoming data */
96 : static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success);
97 :
98 : /** Start disconnection chain */
99 : static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
100 :
101 : /** Perform a transport_op */
102 : static void perform_stream_op_locked(
103 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
104 : grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op);
105 :
106 : /** Cancel a stream: coming from the transport API */
107 : static void cancel_from_api(grpc_exec_ctx *exec_ctx,
108 : grpc_chttp2_transport_global *transport_global,
109 : grpc_chttp2_stream_global *stream_global,
110 : grpc_status_code status);
111 :
112 : static void close_from_api(grpc_exec_ctx *exec_ctx,
113 : grpc_chttp2_transport_global *transport_global,
114 : grpc_chttp2_stream_global *stream_global,
115 : grpc_status_code status,
116 : gpr_slice *optional_message);
117 :
118 : /** Add endpoint from this transport to pollset */
119 : static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
120 : grpc_chttp2_transport *t,
121 : grpc_pollset *pollset);
122 : static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
123 : grpc_chttp2_transport *t,
124 : grpc_pollset_set *pollset_set);
125 :
126 : /** Start new streams that have been created if we can */
127 : static void maybe_start_some_streams(
128 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
129 :
130 : static void connectivity_state_set(
131 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
132 : grpc_connectivity_state state, const char *reason);
133 :
134 : static void check_read_ops(grpc_exec_ctx *exec_ctx,
135 : grpc_chttp2_transport_global *transport_global);
136 :
137 : /*
138 : * CONSTRUCTION/DESTRUCTION/REFCOUNTING
139 : */
140 :
141 5964 : static void destruct_transport(grpc_exec_ctx *exec_ctx,
142 : grpc_chttp2_transport *t) {
143 : size_t i;
144 :
145 5964 : gpr_mu_lock(&t->mu);
146 :
147 5964 : GPR_ASSERT(t->ep == NULL);
148 :
149 5964 : gpr_slice_buffer_destroy(&t->global.qbuf);
150 :
151 5964 : gpr_slice_buffer_destroy(&t->writing.outbuf);
152 5964 : grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
153 :
154 5964 : gpr_slice_buffer_destroy(&t->parsing.qbuf);
155 5964 : gpr_slice_buffer_destroy(&t->read_buffer);
156 5964 : grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
157 5964 : grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
158 :
159 65604 : for (i = 0; i < STREAM_LIST_COUNT; i++) {
160 59640 : GPR_ASSERT(t->lists[i].head == NULL);
161 59640 : GPR_ASSERT(t->lists[i].tail == NULL);
162 : }
163 :
164 5964 : GPR_ASSERT(grpc_chttp2_stream_map_size(&t->parsing_stream_map) == 0);
165 5964 : GPR_ASSERT(grpc_chttp2_stream_map_size(&t->new_stream_map) == 0);
166 :
167 5964 : grpc_chttp2_stream_map_destroy(&t->parsing_stream_map);
168 5964 : grpc_chttp2_stream_map_destroy(&t->new_stream_map);
169 5964 : grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
170 :
171 5964 : gpr_mu_unlock(&t->mu);
172 5964 : gpr_mu_destroy(&t->mu);
173 :
174 : /* callback remaining pings: they're not allowed to call into the transpot,
175 : and maybe they hold resources that need to be freed */
176 11928 : while (t->global.pings.next != &t->global.pings) {
177 0 : grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
178 0 : grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 0);
179 0 : ping->next->prev = ping->prev;
180 0 : ping->prev->next = ping->next;
181 0 : gpr_free(ping);
182 : }
183 :
184 5964 : gpr_free(t->peer_string);
185 5964 : gpr_free(t);
186 5964 : }
187 :
188 : #ifdef REFCOUNTING_DEBUG
189 : #define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__)
190 : #define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__)
191 : static void unref_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
192 : const char *reason, const char *file, int line) {
193 : gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count,
194 : t->refs.count - 1, reason, file, line);
195 : if (!gpr_unref(&t->refs)) return;
196 : destruct_transport(exec_ctx, t);
197 : }
198 :
199 : static void ref_transport(grpc_chttp2_transport *t, const char *reason,
200 : const char *file, int line) {
201 : gpr_log(GPR_DEBUG, "chttp2: ref:%p %d->%d %s [%s:%d]", t, t->refs.count,
202 : t->refs.count + 1, reason, file, line);
203 : gpr_ref(&t->refs);
204 : }
205 : #else
206 : #define REF_TRANSPORT(t, r) ref_transport(t)
207 : #define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t)
208 15591432 : static void unref_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
209 31192473 : if (!gpr_unref(&t->refs)) return;
210 5964 : destruct_transport(exec_ctx, t);
211 : }
212 :
213 15578445 : static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
214 : #endif
215 :
216 6038 : static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
217 : const grpc_channel_args *channel_args,
218 : grpc_endpoint *ep, gpr_uint8 is_client) {
219 : size_t i;
220 : int j;
221 :
222 : GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
223 : GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
224 :
225 6038 : memset(t, 0, sizeof(*t));
226 :
227 6038 : t->base.vtable = &vtable;
228 6038 : t->ep = ep;
229 : /* one ref is for destroy, the other for when ep becomes NULL */
230 6038 : gpr_ref_init(&t->refs, 2);
231 : /* ref is dropped at transport close() */
232 6039 : gpr_ref_init(&t->shutdown_ep_refs, 1);
233 6039 : gpr_mu_init(&t->mu);
234 6038 : t->peer_string = grpc_endpoint_get_peer(ep);
235 6039 : t->endpoint_reading = 1;
236 6039 : t->global.next_stream_id = is_client ? 1 : 2;
237 6039 : t->global.is_client = is_client;
238 6039 : t->writing.outgoing_window = DEFAULT_WINDOW;
239 6039 : t->parsing.incoming_window = DEFAULT_WINDOW;
240 6039 : t->global.stream_lookahead = DEFAULT_WINDOW;
241 6039 : t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
242 6039 : t->global.ping_counter = 1;
243 6039 : t->global.pings.next = t->global.pings.prev = &t->global.pings;
244 6039 : t->parsing.is_client = is_client;
245 6039 : t->parsing.deframe_state =
246 : is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
247 6039 : t->writing.is_client = is_client;
248 6039 : grpc_connectivity_state_init(
249 : &t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
250 : is_client ? "client_transport" : "server_transport");
251 :
252 6037 : gpr_slice_buffer_init(&t->global.qbuf);
253 :
254 6037 : gpr_slice_buffer_init(&t->writing.outbuf);
255 6037 : grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor);
256 6037 : grpc_closure_init(&t->writing_action, writing_action, t);
257 :
258 6037 : gpr_slice_buffer_init(&t->parsing.qbuf);
259 6037 : grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
260 6035 : grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser);
261 :
262 6039 : grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
263 6039 : &t->writing);
264 6039 : grpc_closure_init(&t->recv_data, recv_data, t);
265 6039 : gpr_slice_buffer_init(&t->read_buffer);
266 :
267 6039 : if (is_client) {
268 2983 : gpr_slice_buffer_add(
269 : &t->global.qbuf,
270 : gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
271 : }
272 : /* 8 is a random stab in the dark as to a good initial size: it's small enough
273 : that it shouldn't waste memory for infrequently used connections, yet
274 : large enough that the exponential growth should happen nicely when it's
275 : needed.
276 : TODO(ctiller): tune this */
277 6039 : grpc_chttp2_stream_map_init(&t->parsing_stream_map, 8);
278 6037 : grpc_chttp2_stream_map_init(&t->new_stream_map, 8);
279 :
280 : /* copy in initial settings to all setting sets */
281 48281 : for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
282 42244 : t->parsing.settings[i] = grpc_chttp2_settings_parameters[i].default_value;
283 211216 : for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
284 168972 : t->global.settings[j][i] =
285 166676 : grpc_chttp2_settings_parameters[i].default_value;
286 : }
287 : }
288 6037 : t->global.dirtied_local_settings = 1;
289 : /* Hack: it's common for implementations to assume 65536 bytes initial send
290 : window -- this should by rights be 0 */
291 6037 : t->global.force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
292 6037 : t->global.sent_local_settings = 0;
293 :
294 : /* configure http2 the way we like it */
295 6037 : if (is_client) {
296 2983 : push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
297 2983 : push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
298 : }
299 6037 : push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
300 :
301 6039 : if (channel_args) {
302 9236 : for (i = 0; i < channel_args->num_args; i++) {
303 4324 : if (0 ==
304 4324 : strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
305 22 : if (is_client) {
306 0 : gpr_log(GPR_ERROR, "%s: is ignored on the client",
307 : GRPC_ARG_MAX_CONCURRENT_STREAMS);
308 22 : } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
309 0 : gpr_log(GPR_ERROR, "%s: must be an integer",
310 : GRPC_ARG_MAX_CONCURRENT_STREAMS);
311 : } else {
312 22 : push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
313 22 : (gpr_uint32)channel_args->args[i].value.integer);
314 : }
315 4302 : } else if (0 == strcmp(channel_args->args[i].key,
316 : GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
317 208 : if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
318 0 : gpr_log(GPR_ERROR, "%s: must be an integer",
319 : GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER);
320 416 : } else if ((t->global.next_stream_id & 1) !=
321 208 : (channel_args->args[i].value.integer & 1)) {
322 0 : gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
323 : GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER,
324 0 : t->global.next_stream_id & 1,
325 : is_client ? "client" : "server");
326 : } else {
327 208 : t->global.next_stream_id =
328 208 : (gpr_uint32)channel_args->args[i].value.integer;
329 : }
330 4094 : } else if (0 == strcmp(channel_args->args[i].key,
331 : GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES)) {
332 0 : if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
333 0 : gpr_log(GPR_ERROR, "%s: must be an integer",
334 : GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES);
335 0 : } else if (channel_args->args[i].value.integer <= 5) {
336 0 : gpr_log(GPR_ERROR, "%s: must be at least 5",
337 : GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES);
338 : } else {
339 0 : t->global.stream_lookahead =
340 0 : (gpr_uint32)channel_args->args[i].value.integer;
341 : }
342 4094 : } else if (0 == strcmp(channel_args->args[i].key,
343 : GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER)) {
344 600 : if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
345 0 : gpr_log(GPR_ERROR, "%s: must be an integer",
346 : GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER);
347 600 : } else if (channel_args->args[i].value.integer < 0) {
348 0 : gpr_log(GPR_DEBUG, "%s: must be non-negative",
349 : GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER);
350 : } else {
351 600 : push_setting(t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
352 600 : (gpr_uint32)channel_args->args[i].value.integer);
353 : }
354 3494 : } else if (0 == strcmp(channel_args->args[i].key,
355 : GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
356 600 : if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
357 0 : gpr_log(GPR_ERROR, "%s: must be an integer",
358 : GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER);
359 600 : } else if (channel_args->args[i].value.integer < 0) {
360 0 : gpr_log(GPR_DEBUG, "%s: must be non-negative",
361 : GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER);
362 : } else {
363 600 : grpc_chttp2_hpack_compressor_set_max_usable_size(
364 : &t->writing.hpack_compressor,
365 600 : (gpr_uint32)channel_args->args[i].value.integer);
366 : }
367 : }
368 : }
369 : }
370 6038 : }
371 :
372 5963 : static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
373 5940 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
374 :
375 5940 : lock(t);
376 5964 : t->destroying = 1;
377 5964 : drop_connection(exec_ctx, t);
378 5964 : unlock(exec_ctx, t);
379 :
380 5964 : UNREF_TRANSPORT(exec_ctx, t, "destroy");
381 5964 : }
382 :
383 : /** block grpc_endpoint_shutdown being called until a paired
384 : allow_endpoint_shutdown is made */
385 11140667 : static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
386 11140667 : GPR_ASSERT(t->ep);
387 11140667 : gpr_ref(&t->shutdown_ep_refs);
388 11141090 : }
389 :
390 4631617 : static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx,
391 : grpc_chttp2_transport *t) {
392 4631617 : if (gpr_unref(&t->shutdown_ep_refs)) {
393 6038 : if (t->ep) {
394 6038 : grpc_endpoint_shutdown(exec_ctx, t->ep);
395 : }
396 : }
397 4631875 : }
398 :
399 6515525 : static void allow_endpoint_shutdown_unlocked(grpc_exec_ctx *exec_ctx,
400 : grpc_chttp2_transport *t) {
401 6515525 : if (gpr_unref(&t->shutdown_ep_refs)) {
402 0 : gpr_mu_lock(&t->mu);
403 0 : if (t->ep) {
404 0 : grpc_endpoint_shutdown(exec_ctx, t->ep);
405 : }
406 0 : gpr_mu_unlock(&t->mu);
407 : }
408 6515528 : }
409 :
410 6038 : static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
411 : grpc_chttp2_transport *t) {
412 6038 : grpc_endpoint_destroy(exec_ctx, t->ep);
413 6038 : t->ep = NULL;
414 : /* safe because we'll still have the ref for write */
415 6038 : UNREF_TRANSPORT(exec_ctx, t, "disconnect");
416 6038 : }
417 :
418 17716 : static void close_transport_locked(grpc_exec_ctx *exec_ctx,
419 : grpc_chttp2_transport *t) {
420 17716 : if (!t->closed) {
421 6037 : t->closed = 1;
422 6037 : connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_FATAL_FAILURE,
423 : "close_transport");
424 6038 : if (t->ep) {
425 6038 : allow_endpoint_shutdown_locked(exec_ctx, t);
426 : }
427 : }
428 17717 : }
429 :
430 : #ifdef GRPC_STREAM_REFCOUNT_DEBUG
431 : void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global,
432 : const char *reason) {
433 : grpc_stream_ref(STREAM_FROM_GLOBAL(stream_global)->refcount, reason);
434 : }
435 : void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
436 : grpc_chttp2_stream_global *stream_global,
437 : const char *reason) {
438 : grpc_stream_unref(exec_ctx, STREAM_FROM_GLOBAL(stream_global)->refcount,
439 : reason);
440 : }
441 : #else
442 9463357 : void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global) {
443 9463357 : grpc_stream_ref(STREAM_FROM_GLOBAL(stream_global)->refcount);
444 9479824 : }
445 9469302 : void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
446 : grpc_chttp2_stream_global *stream_global) {
447 9469302 : grpc_stream_unref(exec_ctx, STREAM_FROM_GLOBAL(stream_global)->refcount);
448 9484070 : }
449 : #endif
450 :
451 4440526 : static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
452 : grpc_stream *gs, grpc_stream_refcount *refcount,
453 : const void *server_data) {
454 4440347 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
455 4440347 : grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
456 :
457 4440526 : memset(s, 0, sizeof(*s));
458 :
459 4440526 : s->refcount = refcount;
460 4440526 : GRPC_CHTTP2_STREAM_REF(&s->global, "chttp2");
461 :
462 4442814 : grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[0]);
463 4442997 : grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[1]);
464 4442976 : grpc_chttp2_incoming_metadata_buffer_init(
465 : &s->global.received_initial_metadata);
466 4442585 : grpc_chttp2_incoming_metadata_buffer_init(
467 : &s->global.received_trailing_metadata);
468 4441665 : grpc_chttp2_data_parser_init(&s->parsing.data_parser);
469 4442295 : gpr_slice_buffer_init(&s->writing.flow_controlled_buffer);
470 :
471 4442110 : REF_TRANSPORT(t, "stream");
472 :
473 4442731 : lock(t);
474 4443280 : grpc_chttp2_register_stream(t, s);
475 4443235 : if (server_data) {
476 2171139 : GPR_ASSERT(t->parsing_active);
477 2171139 : s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
478 2171139 : s->parsing.id = s->global.id;
479 2171139 : s->global.outgoing_window =
480 : t->global.settings[GRPC_PEER_SETTINGS]
481 2171139 : [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
482 2171139 : s->parsing.incoming_window = s->global.max_recv_bytes =
483 : t->global.settings[GRPC_SENT_SETTINGS]
484 2171139 : [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
485 2171139 : *t->accepting_stream = s;
486 2171139 : grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s);
487 2171116 : s->global.in_stream_map = 1;
488 : }
489 4443212 : unlock(exec_ctx, t);
490 :
491 4443345 : return 0;
492 : }
493 :
494 4438369 : static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
495 : grpc_stream *gs) {
496 4438319 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
497 4438319 : grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
498 : int i;
499 : grpc_byte_stream *bs;
500 :
501 : GPR_TIMER_BEGIN("destroy_stream", 0);
502 :
503 4438369 : gpr_mu_lock(&t->mu);
504 :
505 4442828 : GPR_ASSERT((s->global.write_closed && s->global.read_closed) ||
506 : s->global.id == 0);
507 4442828 : GPR_ASSERT(!s->global.in_stream_map);
508 4442828 : if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
509 315 : close_transport_locked(exec_ctx, t);
510 : }
511 4442708 : if (!t->parsing_active && s->global.id) {
512 4352237 : GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
513 : s->global.id) == NULL);
514 : }
515 :
516 4442539 : grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
517 4442119 : grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
518 : &s->global);
519 :
520 4441717 : gpr_mu_unlock(&t->mu);
521 :
522 48868255 : for (i = 0; i < STREAM_LIST_COUNT; i++) {
523 44425431 : if (s->included[i]) {
524 0 : gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
525 0 : t->global.is_client ? "client" : "server", s->global.id, i);
526 0 : abort();
527 : }
528 : }
529 :
530 8885717 : while (
531 4442943 : (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
532 119 : grpc_byte_stream_destroy(bs);
533 : }
534 :
535 4441288 : GPR_ASSERT(s->global.send_initial_metadata_finished == NULL);
536 4441288 : GPR_ASSERT(s->global.send_message_finished == NULL);
537 4441288 : GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL);
538 4441288 : GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL);
539 4441288 : GPR_ASSERT(s->global.recv_message_ready == NULL);
540 4441288 : GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL);
541 4441288 : grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser);
542 4441275 : grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]);
543 4440448 : grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]);
544 4439152 : grpc_chttp2_incoming_metadata_buffer_destroy(
545 : &s->global.received_initial_metadata);
546 4441574 : grpc_chttp2_incoming_metadata_buffer_destroy(
547 : &s->global.received_trailing_metadata);
548 4441663 : gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer);
549 :
550 4441393 : UNREF_TRANSPORT(exec_ctx, t, "stream");
551 :
552 : GPR_TIMER_END("destroy_stream", 0);
553 4443185 : }
554 :
555 11217555 : grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
556 : grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) {
557 11216955 : grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
558 11217555 : grpc_chttp2_stream *s =
559 11217555 : grpc_chttp2_stream_map_find(&t->parsing_stream_map, id);
560 11232326 : return s ? &s->parsing : NULL;
561 : }
562 :
563 2170874 : grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
564 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
565 : gpr_uint32 id) {
566 : grpc_chttp2_stream *accepting;
567 2170785 : grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
568 2170874 : GPR_ASSERT(t->accepting_stream == NULL);
569 2170874 : t->accepting_stream = &accepting;
570 4341748 : t->channel_callback.accept_stream(exec_ctx,
571 : t->channel_callback.accept_stream_user_data,
572 2170874 : &t->base, (void *)(gpr_uintptr)id);
573 2171119 : t->accepting_stream = NULL;
574 2171119 : return &accepting->parsing;
575 : }
576 :
577 : /*
578 : * LOCK MANAGEMENT
579 : */
580 :
581 : /* We take a grpc_chttp2_transport-global lock in response to calls coming in
582 : from above,
583 : and in response to data being received from below. New data to be written
584 : is always queued, as are callbacks to process data. During unlock() we
585 : check our todo lists and initiate callbacks and flush writes. */
586 :
587 38116484 : static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
588 :
589 38131591 : static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
590 : GPR_TIMER_BEGIN("unlock", 0);
591 73062162 : if (!t->writing_active && !t->closed &&
592 34923355 : grpc_chttp2_unlocking_check_writes(&t->global, &t->writing,
593 34923355 : t->parsing_active)) {
594 4624202 : t->writing_active = 1;
595 4623596 : REF_TRANSPORT(t, "writing");
596 4625778 : grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, 1);
597 4625677 : prevent_endpoint_shutdown(t);
598 : }
599 38140412 : check_read_ops(exec_ctx, &t->global);
600 :
601 38154166 : gpr_mu_unlock(&t->mu);
602 : GPR_TIMER_END("unlock", 0);
603 38332652 : }
604 :
605 : /*
606 : * OUTPUT PROCESSING
607 : */
608 :
609 12625 : static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
610 : gpr_uint32 value) {
611 12461 : const grpc_chttp2_setting_parameters *sp =
612 : &grpc_chttp2_settings_parameters[id];
613 12625 : gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
614 12625 : if (use_value != value) {
615 0 : gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
616 : value, use_value);
617 : }
618 12626 : if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) {
619 6588 : t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value;
620 6588 : t->global.dirtied_local_settings = 1;
621 : }
622 12626 : }
623 :
624 4623736 : void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
625 : void *transport_writing_ptr, int success) {
626 4623130 : grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
627 4623736 : grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
628 :
629 : GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0);
630 :
631 4623130 : lock(t);
632 :
633 4625567 : allow_endpoint_shutdown_locked(exec_ctx, t);
634 :
635 4625836 : if (!success) {
636 81 : drop_connection(exec_ctx, t);
637 : }
638 :
639 4625836 : grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
640 :
641 : /* leave the writing flag up on shutdown to prevent further writes in unlock()
642 : from starting */
643 4622380 : t->writing_active = 0;
644 4622380 : if (t->ep && !t->endpoint_reading) {
645 109 : destroy_endpoint(exec_ctx, t);
646 : }
647 :
648 4622380 : unlock(exec_ctx, t);
649 :
650 4625443 : UNREF_TRANSPORT(exec_ctx, t, "writing");
651 :
652 : GPR_TIMER_END("grpc_chttp2_terminate_writing", 0);
653 4625849 : }
654 :
655 4621553 : static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
656 : int iomgr_success_ignored) {
657 4620947 : grpc_chttp2_transport *t = gt;
658 : GPR_TIMER_BEGIN("writing_action", 0);
659 4621553 : grpc_chttp2_perform_writes(exec_ctx, &t->writing, t->ep);
660 : GPR_TIMER_END("writing_action", 0);
661 4623809 : }
662 :
663 284 : void grpc_chttp2_add_incoming_goaway(
664 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
665 : gpr_uint32 goaway_error, gpr_slice goaway_text) {
666 284 : char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
667 284 : gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg);
668 284 : gpr_free(msg);
669 284 : gpr_slice_unref(goaway_text);
670 284 : transport_global->seen_goaway = 1;
671 284 : connectivity_state_set(exec_ctx, transport_global, GRPC_CHANNEL_FATAL_FAILURE,
672 : "got_goaway");
673 284 : }
674 :
675 6706037 : static void maybe_start_some_streams(
676 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
677 : grpc_chttp2_stream_global *stream_global;
678 : gpr_uint32 stream_incoming_window;
679 : /* start streams where we have free grpc_chttp2_stream ids and free
680 : * concurrency */
681 24657227 : while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID &&
682 8972716 : transport_global->concurrent_stream_count <
683 : transport_global
684 : ->settings[GRPC_PEER_SETTINGS]
685 15783745 : [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
686 6811002 : grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
687 : &stream_global)) {
688 : /* safe since we can't (legally) be parsing this stream yet */
689 2271985 : grpc_chttp2_stream_parsing *stream_parsing =
690 2272074 : &STREAM_FROM_GLOBAL(stream_global)->parsing;
691 2272074 : GRPC_CHTTP2_IF_TRACING(gpr_log(
692 : GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
693 : transport_global->is_client ? "CLI" : "SVR", stream_global,
694 : transport_global->next_stream_id));
695 :
696 2272062 : GPR_ASSERT(stream_global->id == 0);
697 2272062 : stream_global->id = stream_parsing->id = transport_global->next_stream_id;
698 2272062 : transport_global->next_stream_id += 2;
699 :
700 2272062 : if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
701 180 : connectivity_state_set(exec_ctx, transport_global,
702 : GRPC_CHANNEL_TRANSIENT_FAILURE,
703 : "no_more_stream_ids");
704 : }
705 :
706 4544124 : stream_global->outgoing_window =
707 : transport_global->settings[GRPC_PEER_SETTINGS]
708 2272062 : [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
709 2272062 : stream_parsing->incoming_window = stream_incoming_window =
710 : transport_global->settings[GRPC_SENT_SETTINGS]
711 : [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
712 4544035 : stream_global->max_recv_bytes =
713 2272062 : GPR_MAX(stream_incoming_window, stream_global->max_recv_bytes);
714 6816008 : grpc_chttp2_stream_map_add(
715 2271973 : &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
716 4543946 : stream_global->id, STREAM_FROM_GLOBAL(stream_global));
717 2272084 : stream_global->in_stream_map = 1;
718 2272084 : transport_global->concurrent_stream_count++;
719 2272084 : grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
720 : }
721 : /* cancel out streams that will never be started */
722 13413256 : while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
723 360 : grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
724 : &stream_global)) {
725 0 : cancel_from_api(exec_ctx, transport_global, stream_global,
726 : GRPC_STATUS_UNAVAILABLE);
727 : }
728 6706736 : }
729 :
730 21649044 : static grpc_closure *add_closure_barrier(grpc_closure *closure) {
731 21649903 : closure->final_data += 2;
732 21649044 : return closure;
733 : }
734 :
735 34452235 : void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
736 : grpc_closure **pclosure, int success) {
737 34452235 : grpc_closure *closure = *pclosure;
738 34452235 : if (closure == NULL) {
739 35058459 : return;
740 : }
741 33853259 : closure->final_data -= 2;
742 33853259 : if (!success) {
743 134 : closure->final_data |= 1;
744 : }
745 33853259 : if (closure->final_data < 2) {
746 12343982 : grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0);
747 : }
748 33860681 : *pclosure = NULL;
749 : }
750 :
751 8851082 : static int contains_non_ok_status(
752 : grpc_chttp2_transport_global *transport_global,
753 : grpc_metadata_batch *batch) {
754 : grpc_linked_mdelem *l;
755 40992064 : for (l = batch->list.head; l; l = l->next) {
756 34811053 : if (l->md->key == GRPC_MDSTR_GRPC_STATUS &&
757 2170395 : l->md != GRPC_MDELEM_GRPC_STATUS_0) {
758 499653 : return 1;
759 : }
760 : }
761 8351429 : return 0;
762 : }
763 :
764 48 : static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, int success) {}
765 :
766 12363135 : static void perform_stream_op_locked(
767 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
768 : grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) {
769 : grpc_closure *on_complete;
770 :
771 : GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
772 :
773 12363135 : on_complete = op->on_complete;
774 12363135 : if (on_complete == NULL) {
775 48 : on_complete = grpc_closure_create(do_nothing, NULL);
776 : }
777 : /* use final_data as a barrier until enqueue time; the inital counter is
778 : dropped at the end of this function */
779 12363135 : on_complete->final_data = 2;
780 :
781 12363135 : if (op->cancel_with_status != GRPC_STATUS_OK) {
782 1212 : cancel_from_api(exec_ctx, transport_global, stream_global,
783 : op->cancel_with_status);
784 : }
785 :
786 12370009 : if (op->close_with_status != GRPC_STATUS_OK) {
787 5 : close_from_api(exec_ctx, transport_global, stream_global,
788 : op->close_with_status, op->optional_close_message);
789 : }
790 :
791 12370009 : if (op->send_initial_metadata != NULL) {
792 4441751 : GPR_ASSERT(stream_global->send_initial_metadata_finished == NULL);
793 4442967 : stream_global->send_initial_metadata_finished =
794 4441751 : add_closure_barrier(on_complete);
795 4442967 : stream_global->send_initial_metadata = op->send_initial_metadata;
796 4442967 : if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
797 0 : stream_global->seen_error = 1;
798 0 : grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
799 : }
800 4442534 : if (!stream_global->write_closed) {
801 4442477 : if (transport_global->is_client) {
802 2272177 : GPR_ASSERT(stream_global->id == 0);
803 2272177 : grpc_chttp2_list_add_waiting_for_concurrency(transport_global,
804 : stream_global);
805 2272157 : maybe_start_some_streams(exec_ctx, transport_global);
806 : } else {
807 2170300 : GPR_ASSERT(stream_global->id != 0);
808 2170300 : grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
809 : }
810 : } else {
811 57 : grpc_chttp2_complete_closure_step(
812 : exec_ctx, &stream_global->send_initial_metadata_finished, 0);
813 : }
814 : }
815 :
816 12367326 : if (op->send_message != NULL) {
817 3959322 : GPR_ASSERT(stream_global->send_message_finished == NULL);
818 3959322 : GPR_ASSERT(stream_global->send_message == NULL);
819 3959509 : stream_global->send_message_finished = add_closure_barrier(on_complete);
820 3957535 : if (stream_global->write_closed) {
821 32 : grpc_chttp2_complete_closure_step(
822 : exec_ctx, &stream_global->send_message_finished, 0);
823 3957503 : } else if (stream_global->id != 0) {
824 3960534 : stream_global->send_message = op->send_message;
825 3960534 : grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
826 : }
827 : }
828 :
829 12365119 : if (op->send_trailing_metadata != NULL) {
830 4439174 : GPR_ASSERT(stream_global->send_trailing_metadata_finished == NULL);
831 4438278 : stream_global->send_trailing_metadata_finished =
832 4439174 : add_closure_barrier(on_complete);
833 4438278 : stream_global->send_trailing_metadata = op->send_trailing_metadata;
834 4438278 : if (contains_non_ok_status(transport_global, op->send_trailing_metadata)) {
835 523309 : stream_global->seen_error = 1;
836 523309 : grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
837 : }
838 4438076 : if (stream_global->write_closed) {
839 85 : grpc_chttp2_complete_closure_step(
840 : exec_ctx, &stream_global->send_trailing_metadata_finished,
841 : grpc_metadata_batch_is_empty(op->send_trailing_metadata));
842 4437991 : } else if (stream_global->id != 0) {
843 : /* TODO(ctiller): check if there's flow control for any outstanding
844 : bytes before going writable */
845 4439188 : grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
846 : }
847 : }
848 :
849 12364360 : if (op->recv_initial_metadata != NULL) {
850 4442588 : GPR_ASSERT(stream_global->recv_initial_metadata_finished == NULL);
851 4442800 : stream_global->recv_initial_metadata_finished =
852 4442588 : add_closure_barrier(on_complete);
853 4442800 : stream_global->recv_initial_metadata = op->recv_initial_metadata;
854 4442800 : grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
855 : }
856 :
857 12364369 : if (op->recv_message != NULL) {
858 3959391 : GPR_ASSERT(stream_global->recv_message_ready == NULL);
859 3959391 : stream_global->recv_message_ready = op->recv_message_ready;
860 3959391 : stream_global->recv_message = op->recv_message;
861 3959391 : grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
862 : }
863 :
864 12365759 : if (op->recv_trailing_metadata != NULL) {
865 4441114 : GPR_ASSERT(stream_global->recv_trailing_metadata_finished == NULL);
866 4441055 : stream_global->recv_trailing_metadata_finished =
867 4441114 : add_closure_barrier(on_complete);
868 4441055 : stream_global->recv_trailing_metadata = op->recv_trailing_metadata;
869 4441055 : grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
870 : }
871 :
872 12363431 : grpc_chttp2_complete_closure_step(exec_ctx, &on_complete, 1);
873 :
874 : GPR_TIMER_END("perform_stream_op_locked", 0);
875 12333691 : }
876 :
877 12332843 : static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
878 : grpc_stream *gs, grpc_transport_stream_op *op) {
879 12332082 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
880 12332082 : grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
881 :
882 12332082 : lock(t);
883 12377947 : perform_stream_op_locked(exec_ctx, &t->global, &s->global, op);
884 12335482 : unlock(exec_ctx, t);
885 12380487 : }
886 :
887 0 : static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
888 0 : grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
889 0 : p->next = &t->global.pings;
890 0 : p->prev = p->next->prev;
891 0 : p->prev->next = p->next->prev = p;
892 0 : p->id[0] = (gpr_uint8)((t->global.ping_counter >> 56) & 0xff);
893 0 : p->id[1] = (gpr_uint8)((t->global.ping_counter >> 48) & 0xff);
894 0 : p->id[2] = (gpr_uint8)((t->global.ping_counter >> 40) & 0xff);
895 0 : p->id[3] = (gpr_uint8)((t->global.ping_counter >> 32) & 0xff);
896 0 : p->id[4] = (gpr_uint8)((t->global.ping_counter >> 24) & 0xff);
897 0 : p->id[5] = (gpr_uint8)((t->global.ping_counter >> 16) & 0xff);
898 0 : p->id[6] = (gpr_uint8)((t->global.ping_counter >> 8) & 0xff);
899 0 : p->id[7] = (gpr_uint8)(t->global.ping_counter & 0xff);
900 0 : p->on_recv = on_recv;
901 0 : gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
902 0 : }
903 :
904 16909 : static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
905 : grpc_transport_op *op) {
906 16663 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
907 16663 : int close_transport = 0;
908 :
909 16663 : lock(t);
910 :
911 16910 : grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
912 :
913 16910 : if (op->on_connectivity_state_change) {
914 8459 : grpc_connectivity_state_notify_on_state_change(
915 : exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
916 : op->on_connectivity_state_change);
917 : }
918 :
919 16911 : if (op->send_goaway) {
920 2716 : t->global.sent_goaway = 1;
921 8148 : grpc_chttp2_goaway_append(
922 : t->global.last_incoming_stream_id,
923 2716 : (gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
924 2716 : gpr_slice_ref(*op->goaway_message), &t->global.qbuf);
925 2716 : close_transport = !grpc_chttp2_has_streams(t);
926 : }
927 :
928 16911 : if (op->set_accept_stream != NULL) {
929 3056 : t->channel_callback.accept_stream = op->set_accept_stream;
930 3056 : t->channel_callback.accept_stream_user_data =
931 3056 : op->set_accept_stream_user_data;
932 : }
933 :
934 16911 : if (op->bind_pollset) {
935 3110 : add_to_pollset_locked(exec_ctx, t, op->bind_pollset);
936 : }
937 :
938 16911 : if (op->bind_pollset_set) {
939 2306 : add_to_pollset_set_locked(exec_ctx, t, op->bind_pollset_set);
940 : }
941 :
942 16911 : if (op->send_ping) {
943 0 : send_ping_locked(t, op->send_ping);
944 : }
945 :
946 16911 : if (op->disconnect) {
947 2544 : close_transport_locked(exec_ctx, t);
948 : }
949 :
950 16911 : unlock(exec_ctx, t);
951 :
952 16911 : if (close_transport) {
953 2499 : lock(t);
954 2539 : close_transport_locked(exec_ctx, t);
955 2539 : unlock(exec_ctx, t);
956 : }
957 16911 : }
958 :
959 : /*
960 : * INPUT PROCESSING
961 : */
962 :
963 38132321 : static void check_read_ops(grpc_exec_ctx *exec_ctx,
964 : grpc_chttp2_transport_global *transport_global) {
965 : grpc_chttp2_stream_global *stream_global;
966 : grpc_byte_stream *bs;
967 94571488 : while (
968 56498255 : grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) {
969 28034058 : if (stream_global->recv_initial_metadata_finished != NULL &&
970 9713942 : stream_global->published_initial_metadata) {
971 8881173 : grpc_chttp2_incoming_metadata_buffer_publish(
972 4440499 : &stream_global->received_initial_metadata,
973 4440499 : stream_global->recv_initial_metadata);
974 4440077 : grpc_chttp2_complete_closure_step(
975 4440077 : exec_ctx, &stream_global->recv_initial_metadata_finished, 1);
976 : }
977 18364930 : if (stream_global->recv_message_ready != NULL) {
978 6590774 : if (stream_global->incoming_frames.head != NULL) {
979 7920329 : *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
980 3960072 : &stream_global->incoming_frames);
981 3959804 : GPR_ASSERT(*stream_global->recv_message != NULL);
982 3959804 : grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1);
983 3959379 : stream_global->recv_message_ready = NULL;
984 2630517 : } else if (stream_global->published_trailing_metadata) {
985 834 : *stream_global->recv_message = NULL;
986 834 : grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1);
987 834 : stream_global->recv_message_ready = NULL;
988 : }
989 : }
990 28582346 : if (stream_global->recv_trailing_metadata_finished != NULL &&
991 16818280 : stream_global->read_closed && stream_global->write_closed) {
992 9919204 : while (stream_global->seen_error &&
993 1047312 : (bs = grpc_chttp2_incoming_frame_queue_pop(
994 1047241 : &stream_global->incoming_frames)) != NULL) {
995 31 : grpc_byte_stream_destroy(bs);
996 : }
997 4436279 : if (stream_global->incoming_frames.head == NULL) {
998 8872372 : grpc_chttp2_incoming_metadata_buffer_publish(
999 4436099 : &stream_global->received_trailing_metadata,
1000 4436099 : stream_global->recv_trailing_metadata);
1001 4437215 : grpc_chttp2_complete_closure_step(
1002 4437215 : exec_ctx, &stream_global->recv_trailing_metadata_finished, 1);
1003 : }
1004 : }
1005 : }
1006 38119051 : }
1007 :
1008 4437040 : static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
1009 : gpr_uint32 id) {
1010 : size_t new_stream_count;
1011 4437040 : grpc_chttp2_stream *s =
1012 4437040 : grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id);
1013 4441408 : if (!s) {
1014 158 : s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id);
1015 : }
1016 4441408 : grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
1017 4441399 : GPR_ASSERT(s);
1018 4441399 : s->global.in_stream_map = 0;
1019 4441399 : if (t->parsing.incoming_stream == &s->parsing) {
1020 34 : t->parsing.incoming_stream = NULL;
1021 34 : grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->parsing);
1022 : }
1023 4441399 : if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
1024 177 : close_transport_locked(exec_ctx, t);
1025 : }
1026 :
1027 8881699 : new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
1028 4440056 : grpc_chttp2_stream_map_size(&t->new_stream_map);
1029 4440491 : GPR_ASSERT(new_stream_count <= GPR_UINT32_MAX);
1030 4440491 : if (new_stream_count != t->global.concurrent_stream_count) {
1031 4439655 : t->global.concurrent_stream_count = (gpr_uint32)new_stream_count;
1032 4439655 : maybe_start_some_streams(exec_ctx, &t->global);
1033 : }
1034 4440176 : }
1035 :
1036 1330 : static void cancel_from_api(grpc_exec_ctx *exec_ctx,
1037 : grpc_chttp2_transport_global *transport_global,
1038 : grpc_chttp2_stream_global *stream_global,
1039 : grpc_status_code status) {
1040 1330 : if (stream_global->id != 0) {
1041 1213 : gpr_slice_buffer_add(
1042 : &transport_global->qbuf,
1043 : grpc_chttp2_rst_stream_create(
1044 : stream_global->id,
1045 1213 : (gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(status)));
1046 : }
1047 1330 : grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
1048 : NULL);
1049 1330 : grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1050 : 1);
1051 1330 : }
1052 :
1053 1489 : void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
1054 : grpc_chttp2_transport_global *transport_global,
1055 : grpc_chttp2_stream_global *stream_global,
1056 : grpc_status_code status, gpr_slice *slice) {
1057 1489 : if (status != GRPC_STATUS_OK) {
1058 1489 : stream_global->seen_error = 1;
1059 1489 : grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
1060 : }
1061 : /* stream_global->recv_trailing_metadata_finished gives us a
1062 : last chance replacement: we've received trailing metadata,
1063 : but something more important has become available to signal
1064 : to the upper layers - drop what we've got, and then publish
1065 : what we want - which is safe because we haven't told anyone
1066 : about the metadata yet */
1067 2209 : if (!stream_global->published_trailing_metadata ||
1068 720 : stream_global->recv_trailing_metadata_finished != NULL) {
1069 : char status_string[GPR_LTOA_MIN_BUFSIZE];
1070 912 : gpr_ltoa(status, status_string);
1071 912 : grpc_chttp2_incoming_metadata_buffer_add(
1072 : &stream_global->received_trailing_metadata,
1073 : grpc_mdelem_from_metadata_strings(
1074 : GRPC_MDSTR_GRPC_STATUS, grpc_mdstr_from_string(status_string)));
1075 912 : if (slice) {
1076 152 : grpc_chttp2_incoming_metadata_buffer_add(
1077 : &stream_global->received_trailing_metadata,
1078 : grpc_mdelem_from_metadata_strings(
1079 : GRPC_MDSTR_GRPC_MESSAGE,
1080 : grpc_mdstr_from_slice(gpr_slice_ref(*slice))));
1081 : }
1082 912 : stream_global->published_trailing_metadata = 1;
1083 912 : grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
1084 : }
1085 1489 : if (slice) {
1086 159 : gpr_slice_unref(*slice);
1087 : }
1088 1489 : }
1089 :
1090 8867439 : void grpc_chttp2_mark_stream_closed(
1091 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
1092 : grpc_chttp2_stream_global *stream_global, int close_reads,
1093 : int close_writes) {
1094 8867439 : if (stream_global->read_closed && stream_global->write_closed) {
1095 : /* already closed */
1096 8878261 : return;
1097 : }
1098 8866538 : grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
1099 8872672 : if (close_reads && !stream_global->read_closed) {
1100 4440759 : stream_global->read_closed = 1;
1101 4440759 : stream_global->published_initial_metadata = 1;
1102 4440759 : stream_global->published_trailing_metadata = 1;
1103 : }
1104 8872672 : if (close_writes && !stream_global->write_closed) {
1105 4440462 : stream_global->write_closed = 1;
1106 : }
1107 8872672 : if (stream_global->read_closed && stream_global->write_closed) {
1108 8877494 : if (stream_global->id != 0 &&
1109 4438730 : TRANSPORT_FROM_GLOBAL(transport_global)->parsing_active) {
1110 2398165 : grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
1111 : stream_global);
1112 : } else {
1113 2040599 : if (stream_global->id != 0) {
1114 2044075 : remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
1115 : stream_global->id);
1116 : }
1117 2040574 : stream_global->finished_close = 1;
1118 2040574 : GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
1119 : }
1120 : }
1121 : }
1122 :
1123 5 : static void close_from_api(grpc_exec_ctx *exec_ctx,
1124 : grpc_chttp2_transport_global *transport_global,
1125 : grpc_chttp2_stream_global *stream_global,
1126 : grpc_status_code status,
1127 : gpr_slice *optional_message) {
1128 : gpr_slice hdr;
1129 : gpr_slice status_hdr;
1130 : gpr_slice message_pfx;
1131 : gpr_uint8 *p;
1132 5 : gpr_uint32 len = 0;
1133 :
1134 5 : GPR_ASSERT(status >= 0 && (int)status < 100);
1135 :
1136 5 : GPR_ASSERT(stream_global->id != 0);
1137 :
1138 : /* Hand roll a header block.
1139 : This is unnecessarily ugly - at some point we should find a more elegant
1140 : solution.
1141 : It's complicated by the fact that our send machinery would be dead by the
1142 : time we got around to sending this, so instead we ignore HPACK compression
1143 : and just write the uncompressed bytes onto the wire. */
1144 5 : status_hdr = gpr_slice_malloc(15 + (status >= 10));
1145 5 : p = GPR_SLICE_START_PTR(status_hdr);
1146 5 : *p++ = 0x40; /* literal header */
1147 5 : *p++ = 11; /* len(grpc-status) */
1148 5 : *p++ = 'g';
1149 5 : *p++ = 'r';
1150 5 : *p++ = 'p';
1151 5 : *p++ = 'c';
1152 5 : *p++ = '-';
1153 5 : *p++ = 's';
1154 5 : *p++ = 't';
1155 5 : *p++ = 'a';
1156 5 : *p++ = 't';
1157 5 : *p++ = 'u';
1158 5 : *p++ = 's';
1159 5 : if (status < 10) {
1160 0 : *p++ = 1;
1161 0 : *p++ = (gpr_uint8)('0' + status);
1162 : } else {
1163 5 : *p++ = 2;
1164 5 : *p++ = (gpr_uint8)('0' + (status / 10));
1165 5 : *p++ = (gpr_uint8)('0' + (status % 10));
1166 : }
1167 5 : GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr));
1168 5 : len += (gpr_uint32)GPR_SLICE_LENGTH(status_hdr);
1169 :
1170 5 : if (optional_message) {
1171 5 : GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127);
1172 5 : message_pfx = gpr_slice_malloc(15);
1173 5 : p = GPR_SLICE_START_PTR(message_pfx);
1174 5 : *p++ = 0x40;
1175 5 : *p++ = 12; /* len(grpc-message) */
1176 5 : *p++ = 'g';
1177 5 : *p++ = 'r';
1178 5 : *p++ = 'p';
1179 5 : *p++ = 'c';
1180 5 : *p++ = '-';
1181 5 : *p++ = 'm';
1182 5 : *p++ = 'e';
1183 5 : *p++ = 's';
1184 5 : *p++ = 's';
1185 5 : *p++ = 'a';
1186 5 : *p++ = 'g';
1187 5 : *p++ = 'e';
1188 5 : *p++ = (gpr_uint8)GPR_SLICE_LENGTH(*optional_message);
1189 5 : GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx));
1190 5 : len += (gpr_uint32)GPR_SLICE_LENGTH(message_pfx);
1191 5 : len += (gpr_uint32)GPR_SLICE_LENGTH(*optional_message);
1192 : }
1193 :
1194 5 : hdr = gpr_slice_malloc(9);
1195 5 : p = GPR_SLICE_START_PTR(hdr);
1196 5 : *p++ = (gpr_uint8)(len >> 16);
1197 5 : *p++ = (gpr_uint8)(len >> 8);
1198 5 : *p++ = (gpr_uint8)(len);
1199 5 : *p++ = GRPC_CHTTP2_FRAME_HEADER;
1200 5 : *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
1201 5 : *p++ = (gpr_uint8)(stream_global->id >> 24);
1202 5 : *p++ = (gpr_uint8)(stream_global->id >> 16);
1203 5 : *p++ = (gpr_uint8)(stream_global->id >> 8);
1204 5 : *p++ = (gpr_uint8)(stream_global->id);
1205 5 : GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr));
1206 :
1207 5 : gpr_slice_buffer_add(&transport_global->qbuf, hdr);
1208 5 : gpr_slice_buffer_add(&transport_global->qbuf, status_hdr);
1209 5 : if (optional_message) {
1210 5 : gpr_slice_buffer_add(&transport_global->qbuf, message_pfx);
1211 5 : gpr_slice_buffer_add(&transport_global->qbuf,
1212 : gpr_slice_ref(*optional_message));
1213 : }
1214 :
1215 5 : gpr_slice_buffer_add(
1216 : &transport_global->qbuf,
1217 : grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR));
1218 :
1219 5 : if (optional_message) {
1220 5 : gpr_slice_ref(*optional_message);
1221 : }
1222 5 : grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
1223 : optional_message);
1224 5 : grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
1225 : 1);
1226 5 : }
1227 :
1228 118 : static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
1229 : void *user_data,
1230 : grpc_chttp2_stream_global *stream_global) {
1231 118 : cancel_from_api(user_data, transport_global, stream_global,
1232 : GRPC_STATUS_UNAVAILABLE);
1233 118 : }
1234 :
1235 12038 : static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
1236 : grpc_chttp2_transport *t) {
1237 12142 : grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb);
1238 12038 : }
1239 :
1240 12142 : static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
1241 12142 : close_transport_locked(exec_ctx, t);
1242 12038 : end_all_the_calls(exec_ctx, t);
1243 12142 : }
1244 :
1245 : /** update window from a settings change */
1246 0 : static void update_global_window(void *args, gpr_uint32 id, void *stream) {
1247 0 : grpc_chttp2_transport *t = args;
1248 0 : grpc_chttp2_stream *s = stream;
1249 0 : grpc_chttp2_transport_global *transport_global = &t->global;
1250 0 : grpc_chttp2_stream_global *stream_global = &s->global;
1251 : int was_zero;
1252 : int is_zero;
1253 0 : gpr_int64 initial_window_update = t->parsing.initial_window_update;
1254 :
1255 0 : was_zero = stream_global->outgoing_window <= 0;
1256 0 : GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", transport_global, stream_global,
1257 : outgoing_window, initial_window_update);
1258 0 : is_zero = stream_global->outgoing_window <= 0;
1259 :
1260 0 : if (was_zero && !is_zero) {
1261 0 : grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
1262 : }
1263 0 : }
1264 :
1265 6038 : static void read_error_locked(grpc_exec_ctx *exec_ctx,
1266 : grpc_chttp2_transport *t) {
1267 6038 : t->endpoint_reading = 0;
1268 6038 : if (!t->writing_active && t->ep) {
1269 5929 : destroy_endpoint(exec_ctx, t);
1270 : }
1271 6038 : }
1272 :
1273 : /* tcp read callback */
1274 6521510 : static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) {
1275 : size_t i;
1276 6520741 : int keep_reading = 0;
1277 6520741 : grpc_chttp2_transport *t = tp;
1278 6521510 : grpc_chttp2_transport_global *transport_global = &t->global;
1279 6521510 : grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
1280 : grpc_chttp2_stream_global *stream_global;
1281 :
1282 : GPR_TIMER_BEGIN("recv_data", 0);
1283 :
1284 6520741 : lock(t);
1285 6520796 : i = 0;
1286 6521565 : GPR_ASSERT(!t->parsing_active);
1287 6521565 : if (!t->closed) {
1288 6516056 : t->parsing_active = 1;
1289 : /* merge stream lists */
1290 6516056 : grpc_chttp2_stream_map_move_into(&t->new_stream_map,
1291 : &t->parsing_stream_map);
1292 6516035 : grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
1293 6516026 : gpr_mu_unlock(&t->mu);
1294 : GPR_TIMER_BEGIN("recv_data.parse", 0);
1295 50051314 : for (; i < t->read_buffer.count &&
1296 18509685 : grpc_chttp2_perform_read(exec_ctx, transport_parsing,
1297 18509685 : t->read_buffer.slices[i]);
1298 18509574 : i++)
1299 : ;
1300 : GPR_TIMER_END("recv_data.parse", 0);
1301 6516000 : gpr_mu_lock(&t->mu);
1302 : /* copy parsing qbuf to global qbuf */
1303 6516050 : gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
1304 6516050 : if (i != t->read_buffer.count) {
1305 59 : unlock(exec_ctx, t);
1306 59 : lock(t);
1307 59 : drop_connection(exec_ctx, t);
1308 : }
1309 : /* merge stream lists */
1310 6516050 : grpc_chttp2_stream_map_move_into(&t->new_stream_map,
1311 : &t->parsing_stream_map);
1312 6516049 : transport_global->concurrent_stream_count =
1313 6515995 : (gpr_uint32)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
1314 6516049 : if (transport_parsing->initial_window_update != 0) {
1315 1 : grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
1316 : update_global_window, t);
1317 1 : transport_parsing->initial_window_update = 0;
1318 : }
1319 : /* handle higher level things */
1320 6516049 : grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing);
1321 6515990 : t->parsing_active = 0;
1322 : /* if a stream is in the stream map, and gets cancelled, we need to ensure
1323 : * we are not parsing before continuing the cancellation to keep things in
1324 : * a sane state */
1325 15431024 : while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
1326 : &stream_global)) {
1327 2398879 : GPR_ASSERT(stream_global->in_stream_map);
1328 2398878 : GPR_ASSERT(stream_global->write_closed);
1329 2398878 : GPR_ASSERT(stream_global->read_closed);
1330 2398878 : remove_stream(exec_ctx, t, stream_global->id);
1331 2398644 : stream_global->finished_close = 1;
1332 2398644 : GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
1333 : }
1334 : }
1335 6521384 : if (!success || i != t->read_buffer.count || t->closed) {
1336 5900 : drop_connection(exec_ctx, t);
1337 6038 : read_error_locked(exec_ctx, t);
1338 6515484 : } else if (!t->closed) {
1339 6514795 : keep_reading = 1;
1340 6514795 : REF_TRANSPORT(t, "keep_reading");
1341 6515519 : prevent_endpoint_shutdown(t);
1342 : }
1343 6521562 : gpr_slice_buffer_reset_and_unref(&t->read_buffer);
1344 6521564 : unlock(exec_ctx, t);
1345 :
1346 6521564 : if (keep_reading) {
1347 6515526 : grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->recv_data);
1348 6515525 : allow_endpoint_shutdown_unlocked(exec_ctx, t);
1349 6515528 : UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
1350 : } else {
1351 6038 : UNREF_TRANSPORT(exec_ctx, t, "recv_data");
1352 : }
1353 :
1354 : GPR_TIMER_END("recv_data", 0);
1355 6521566 : }
1356 :
1357 : /*
1358 : * CALLBACK LOOP
1359 : */
1360 :
1361 6502 : static void connectivity_state_set(
1362 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
1363 : grpc_connectivity_state state, const char *reason) {
1364 6502 : GRPC_CHTTP2_IF_TRACING(
1365 : gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
1366 6502 : grpc_connectivity_state_set(exec_ctx, &TRANSPORT_FROM_GLOBAL(transport_global)
1367 : ->channel_callback.state_tracker,
1368 : state, reason);
1369 6502 : }
1370 :
1371 : /*
1372 : * POLLSET STUFF
1373 : */
1374 :
1375 4445898 : static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
1376 : grpc_chttp2_transport *t,
1377 : grpc_pollset *pollset) {
1378 4446157 : if (t->ep) {
1379 4446304 : grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
1380 : }
1381 4445886 : }
1382 :
1383 2306 : static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
1384 : grpc_chttp2_transport *t,
1385 : grpc_pollset_set *pollset_set) {
1386 2347 : if (t->ep) {
1387 2347 : grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
1388 : }
1389 2306 : }
1390 :
1391 4437763 : static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
1392 : grpc_stream *gs, grpc_pollset *pollset) {
1393 4437586 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
1394 4437586 : lock(t);
1395 4442921 : add_to_pollset_locked(exec_ctx, t, pollset);
1396 4443161 : unlock(exec_ctx, t);
1397 4443183 : }
1398 :
1399 : /*
1400 : * BYTE STREAM
1401 : */
1402 :
1403 5913747 : static void incoming_byte_stream_update_flow_control(
1404 : grpc_chttp2_transport_global *transport_global,
1405 : grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
1406 : size_t have_already) {
1407 : gpr_uint32 max_recv_bytes;
1408 :
1409 : /* clamp max recv hint to an allowable size */
1410 5913747 : if (max_size_hint >= GPR_UINT32_MAX - transport_global->stream_lookahead) {
1411 0 : max_recv_bytes = GPR_UINT32_MAX - transport_global->stream_lookahead;
1412 : } else {
1413 5913761 : max_recv_bytes = (gpr_uint32)max_size_hint;
1414 : }
1415 :
1416 : /* account for bytes already received but unknown to higher layers */
1417 5913747 : if (max_recv_bytes >= have_already) {
1418 5913747 : max_recv_bytes -= (gpr_uint32)have_already;
1419 : } else {
1420 0 : max_recv_bytes = 0;
1421 : }
1422 :
1423 : /* add some small lookahead to keep pipelines flowing */
1424 5913747 : GPR_ASSERT(max_recv_bytes <=
1425 : GPR_UINT32_MAX - transport_global->stream_lookahead);
1426 5913747 : max_recv_bytes += transport_global->stream_lookahead;
1427 5913747 : if (stream_global->max_recv_bytes < max_recv_bytes) {
1428 2409491 : gpr_uint32 add_max_recv_bytes =
1429 2409336 : max_recv_bytes - stream_global->max_recv_bytes;
1430 2409491 : GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
1431 : max_recv_bytes, add_max_recv_bytes);
1432 2409489 : GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
1433 : unannounced_incoming_window_for_parse,
1434 : add_max_recv_bytes);
1435 2409489 : GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
1436 : unannounced_incoming_window_for_writing,
1437 : add_max_recv_bytes);
1438 2409489 : grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
1439 : stream_global);
1440 2409356 : grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
1441 : }
1442 5913584 : }
1443 :
1444 4166547 : static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
1445 : grpc_byte_stream *byte_stream,
1446 : gpr_slice *slice, size_t max_size_hint,
1447 : grpc_closure *on_complete) {
1448 4166037 : grpc_chttp2_incoming_byte_stream *bs =
1449 : (grpc_chttp2_incoming_byte_stream *)byte_stream;
1450 4166547 : grpc_chttp2_transport_global *transport_global = &bs->transport->global;
1451 4166547 : grpc_chttp2_stream_global *stream_global = &bs->stream->global;
1452 :
1453 4166037 : lock(bs->transport);
1454 4167000 : if (bs->is_tail) {
1455 4155845 : incoming_byte_stream_update_flow_control(transport_global, stream_global,
1456 : max_size_hint, bs->slices.length);
1457 : }
1458 4167494 : if (bs->slices.count > 0) {
1459 3192020 : *slice = gpr_slice_buffer_take_first(&bs->slices);
1460 3192045 : unlock(exec_ctx, bs->transport);
1461 3192163 : return 1;
1462 : } else {
1463 975474 : bs->on_next = on_complete;
1464 975474 : bs->next = slice;
1465 975474 : unlock(exec_ctx, bs->transport);
1466 975474 : return 0;
1467 : }
1468 : }
1469 :
1470 7903008 : static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) {
1471 7903008 : if (gpr_unref(&bs->refs)) {
1472 3963645 : gpr_slice_buffer_destroy(&bs->slices);
1473 3963551 : gpr_free(bs);
1474 : }
1475 7926646 : }
1476 :
1477 3957442 : static void incoming_byte_stream_destroy(grpc_byte_stream *byte_stream) {
1478 3957442 : incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream);
1479 3962875 : }
1480 :
1481 6444833 : void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
1482 : grpc_chttp2_incoming_byte_stream *bs,
1483 : gpr_slice slice) {
1484 6444833 : gpr_mu_lock(&bs->transport->mu);
1485 6444807 : if (bs->on_next != NULL) {
1486 975474 : *bs->next = slice;
1487 975474 : grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 1);
1488 975474 : bs->on_next = NULL;
1489 : } else {
1490 5469333 : gpr_slice_buffer_add(&bs->slices, slice);
1491 : }
1492 6444828 : gpr_mu_unlock(&bs->transport->mu);
1493 6444824 : }
1494 :
1495 3960618 : void grpc_chttp2_incoming_byte_stream_finished(
1496 : grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) {
1497 3960618 : incoming_byte_stream_unref(bs);
1498 3962349 : }
1499 :
1500 3963172 : grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
1501 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
1502 : grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
1503 : gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue) {
1504 3963172 : grpc_chttp2_incoming_byte_stream *incoming_byte_stream =
1505 : gpr_malloc(sizeof(*incoming_byte_stream));
1506 3963411 : incoming_byte_stream->base.length = frame_size;
1507 3963411 : incoming_byte_stream->base.flags = flags;
1508 3963411 : incoming_byte_stream->base.next = incoming_byte_stream_next;
1509 3963411 : incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
1510 3963411 : gpr_ref_init(&incoming_byte_stream->refs, 2);
1511 3963135 : incoming_byte_stream->next_message = NULL;
1512 3963135 : incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing);
1513 3963135 : incoming_byte_stream->stream = STREAM_FROM_PARSING(stream_parsing);
1514 3963135 : gpr_slice_buffer_init(&incoming_byte_stream->slices);
1515 3961816 : incoming_byte_stream->on_next = NULL;
1516 3961816 : incoming_byte_stream->is_tail = 1;
1517 3961816 : if (add_to_queue->head == NULL) {
1518 3955207 : add_to_queue->head = incoming_byte_stream;
1519 : } else {
1520 6609 : add_to_queue->tail->is_tail = 0;
1521 6609 : add_to_queue->tail->next_message = incoming_byte_stream;
1522 : }
1523 3961816 : add_to_queue->tail = incoming_byte_stream;
1524 3961816 : if (frame_size == 0) {
1525 1758016 : lock(TRANSPORT_FROM_PARSING(transport_parsing));
1526 3517843 : incoming_byte_stream_update_flow_control(
1527 1758890 : &TRANSPORT_FROM_PARSING(transport_parsing)->global,
1528 1758890 : &STREAM_FROM_PARSING(stream_parsing)->global, 0, 0);
1529 1758896 : unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing));
1530 : }
1531 3960931 : return incoming_byte_stream;
1532 : }
1533 :
1534 : /*
1535 : * TRACING
1536 : */
1537 :
1538 25440 : static char *format_flowctl_context_var(const char *context, const char *var,
1539 : gpr_int64 val, gpr_uint32 id,
1540 : char **scope) {
1541 : char *underscore_pos;
1542 : char *result;
1543 25440 : if (context == NULL) {
1544 4442 : *scope = NULL;
1545 4442 : gpr_asprintf(&result, "%s(%lld)", var, val);
1546 4442 : return result;
1547 : }
1548 20998 : underscore_pos = strchr(context, '_');
1549 20998 : *scope = gpr_strdup(context);
1550 20998 : (*scope)[underscore_pos - context] = 0;
1551 20998 : if (id != 0) {
1552 5608 : char *tmp = *scope;
1553 5608 : gpr_asprintf(scope, "%s[%d]", tmp, id);
1554 5608 : gpr_free(tmp);
1555 : }
1556 20998 : gpr_asprintf(&result, "%s.%s(%lld)", underscore_pos + 1, var, val);
1557 20998 : return result;
1558 : }
1559 :
1560 8278 : static int samestr(char *a, char *b) {
1561 8278 : if (a == NULL) {
1562 0 : return b == NULL;
1563 : }
1564 8278 : if (b == NULL) {
1565 0 : return 0;
1566 : }
1567 8278 : return 0 == strcmp(a, b);
1568 : }
1569 :
1570 12720 : void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
1571 : grpc_chttp2_flowctl_op op, const char *context1,
1572 : const char *var1, const char *context2,
1573 : const char *var2, int is_client,
1574 : gpr_uint32 stream_id, gpr_int64 val1,
1575 : gpr_int64 val2) {
1576 : char *scope1;
1577 : char *scope2;
1578 12720 : char *label1 =
1579 : format_flowctl_context_var(context1, var1, val1, stream_id, &scope1);
1580 12720 : char *label2 =
1581 : format_flowctl_context_var(context2, var2, val2, stream_id, &scope2);
1582 12720 : char *clisvr = is_client ? "client" : "server";
1583 : char *prefix;
1584 :
1585 12720 : gpr_asprintf(&prefix, "FLOW % 8s: %s % 11s ", phase, clisvr, scope1);
1586 :
1587 12720 : switch (op) {
1588 : case GRPC_CHTTP2_FLOWCTL_MOVE:
1589 8278 : GPR_ASSERT(samestr(scope1, scope2));
1590 8278 : if (val2 != 0) {
1591 1070 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
1592 : "%sMOVE % 40s <- % 40s giving %d", prefix, label1, label2,
1593 : val1 + val2);
1594 : }
1595 8278 : break;
1596 : case GRPC_CHTTP2_FLOWCTL_CREDIT:
1597 1762 : GPR_ASSERT(val2 >= 0);
1598 1762 : if (val2 != 0) {
1599 1762 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
1600 : "%sCREDIT % 40s by % 40s giving %d", prefix, label1, label2,
1601 : val1 + val2);
1602 : }
1603 1762 : break;
1604 : case GRPC_CHTTP2_FLOWCTL_DEBIT:
1605 2680 : GPR_ASSERT(val2 >= 0);
1606 2680 : if (val2 != 0) {
1607 2488 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
1608 : "%sDEBIT % 40s by % 40s giving %d", prefix, label1, label2,
1609 : val1 - val2);
1610 : }
1611 2680 : break;
1612 : }
1613 :
1614 12720 : gpr_free(scope1);
1615 12720 : gpr_free(scope2);
1616 12720 : gpr_free(label1);
1617 12720 : gpr_free(label2);
1618 12720 : gpr_free(prefix);
1619 12720 : }
1620 :
1621 : /*
1622 : * INTEGRATION GLUE
1623 : */
1624 :
1625 1245 : static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
1626 1245 : return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string);
1627 : }
1628 :
1629 : static const grpc_transport_vtable vtable = {
1630 : sizeof(grpc_chttp2_stream), init_stream, set_pollset, perform_stream_op,
1631 : perform_transport_op, destroy_stream, destroy_transport, chttp2_get_peer};
1632 :
1633 6039 : grpc_transport *grpc_create_chttp2_transport(
1634 : grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
1635 : grpc_endpoint *ep, int is_client) {
1636 6039 : grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport));
1637 6039 : init_transport(exec_ctx, t, channel_args, ep, is_client != 0);
1638 6036 : return &t->base;
1639 : }
1640 :
1641 6039 : void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
1642 : grpc_transport *transport,
1643 : gpr_slice *slices, size_t nslices) {
1644 5957 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
1645 5957 : REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
1646 6039 : gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
1647 6039 : recv_data(exec_ctx, t, 1);
1648 6039 : }
|