Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/transport/chttp2_transport.h"
35 :
36 : #include <math.h>
37 : #include <stdio.h>
38 : #include <string.h>
39 :
40 : #include <grpc/support/alloc.h>
41 : #include <grpc/support/log.h>
42 : #include <grpc/support/slice_buffer.h>
43 : #include <grpc/support/string_util.h>
44 : #include <grpc/support/useful.h>
45 :
46 : #include "src/core/profiling/timers.h"
47 : #include "src/core/support/string.h"
48 : #include "src/core/transport/chttp2/http2_errors.h"
49 : #include "src/core/transport/chttp2/internal.h"
50 : #include "src/core/transport/chttp2/status_conversion.h"
51 : #include "src/core/transport/chttp2/timeout_encoding.h"
52 : #include "src/core/transport/transport_impl.h"
53 :
54 : #define DEFAULT_WINDOW 65535
55 : #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
56 : #define MAX_WINDOW 0x7fffffffu
57 :
58 : #define MAX_CLIENT_STREAM_ID 0x7fffffffu
59 :
60 : int grpc_http_trace = 0;
61 : int grpc_flowctl_trace = 0;
62 :
63 : #define TRANSPORT_FROM_WRITING(tw) \
64 : ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
65 : writing)))
66 :
67 : #define TRANSPORT_FROM_PARSING(tw) \
68 : ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
69 : parsing)))
70 :
71 : #define TRANSPORT_FROM_GLOBAL(tg) \
72 : ((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \
73 : global)))
74 :
75 : #define STREAM_FROM_GLOBAL(sg) \
76 : ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global)))
77 :
78 : static const grpc_transport_vtable vtable;
79 :
80 : static void lock(grpc_chttp2_transport *t);
81 : static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
82 :
83 : static void unlock_check_read_write_state(grpc_exec_ctx *exec_ctx,
84 : grpc_chttp2_transport *t);
85 :
86 : /* forward declarations of various callbacks that we'll build closures around */
87 : static void writing_action(grpc_exec_ctx *exec_ctx, void *t,
88 : int iomgr_success_ignored);
89 :
90 : /** Set a transport level setting, and push it to our peer */
91 : static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
92 : gpr_uint32 value);
93 :
94 : /** Endpoint callback to process incoming data */
95 : static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success);
96 :
97 : /** Start disconnection chain */
98 : static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
99 :
100 : /** Perform a transport_op */
101 : static void perform_stream_op_locked(
102 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
103 : grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op);
104 :
105 : /** Cancel a stream: coming from the transport API */
106 : static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
107 : grpc_chttp2_stream_global *stream_global,
108 : grpc_status_code status);
109 :
110 : static void close_from_api(grpc_chttp2_transport_global *transport_global,
111 : grpc_chttp2_stream_global *stream_global,
112 : grpc_status_code status,
113 : gpr_slice *optional_message);
114 :
115 : /** Add endpoint from this transport to pollset */
116 : static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
117 : grpc_chttp2_transport *t,
118 : grpc_pollset *pollset);
119 : static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
120 : grpc_chttp2_transport *t,
121 : grpc_pollset_set *pollset_set);
122 :
123 : /** Start new streams that have been created if we can */
124 : static void maybe_start_some_streams(
125 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
126 :
127 : static void connectivity_state_set(
128 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
129 : grpc_connectivity_state state, const char *reason);
130 :
131 : /*
132 : * CONSTRUCTION/DESTRUCTION/REFCOUNTING
133 : */
134 :
135 4014 : static void destruct_transport(grpc_exec_ctx *exec_ctx,
136 : grpc_chttp2_transport *t) {
137 : size_t i;
138 :
139 4014 : gpr_mu_lock(&t->mu);
140 :
141 4014 : GPR_ASSERT(t->ep == NULL);
142 :
143 4014 : gpr_slice_buffer_destroy(&t->global.qbuf);
144 :
145 4014 : gpr_slice_buffer_destroy(&t->writing.outbuf);
146 4014 : grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
147 :
148 4014 : gpr_slice_buffer_destroy(&t->parsing.qbuf);
149 4014 : gpr_slice_buffer_destroy(&t->read_buffer);
150 4014 : grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
151 4014 : grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
152 :
153 4014 : GRPC_MDSTR_UNREF(t->parsing.str_grpc_timeout);
154 :
155 44154 : for (i = 0; i < STREAM_LIST_COUNT; i++) {
156 40140 : GPR_ASSERT(t->lists[i].head == NULL);
157 40140 : GPR_ASSERT(t->lists[i].tail == NULL);
158 : }
159 :
160 4014 : GPR_ASSERT(grpc_chttp2_stream_map_size(&t->parsing_stream_map) == 0);
161 4014 : GPR_ASSERT(grpc_chttp2_stream_map_size(&t->new_stream_map) == 0);
162 :
163 4014 : grpc_chttp2_stream_map_destroy(&t->parsing_stream_map);
164 4014 : grpc_chttp2_stream_map_destroy(&t->new_stream_map);
165 4014 : grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
166 :
167 4014 : gpr_mu_unlock(&t->mu);
168 4014 : gpr_mu_destroy(&t->mu);
169 :
170 : /* callback remaining pings: they're not allowed to call into the transpot,
171 : and maybe they hold resources that need to be freed */
172 8028 : while (t->global.pings.next != &t->global.pings) {
173 0 : grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
174 0 : grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 0);
175 0 : ping->next->prev = ping->prev;
176 0 : ping->prev->next = ping->next;
177 0 : gpr_free(ping);
178 : }
179 :
180 4014 : grpc_mdctx_unref(t->metadata_context);
181 :
182 4014 : gpr_free(t->peer_string);
183 4014 : gpr_free(t);
184 4014 : }
185 :
186 : #ifdef REFCOUNTING_DEBUG
187 : #define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__)
188 : #define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__)
189 : static void unref_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
190 : const char *reason, const char *file, int line) {
191 : gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count,
192 : t->refs.count - 1, reason, file, line);
193 : if (!gpr_unref(&t->refs)) return;
194 : destruct_transport(exec_ctx, t);
195 : }
196 :
197 : static void ref_transport(grpc_chttp2_transport *t, const char *reason,
198 : const char *file, int line) {
199 : gpr_log(GPR_DEBUG, "chttp2: ref:%p %d->%d %s [%s:%d]", t, t->refs.count,
200 : t->refs.count + 1, reason, file, line);
201 : gpr_ref(&t->refs);
202 : }
203 : #else
204 : #define REF_TRANSPORT(t, r) ref_transport(t)
205 : #define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t)
206 8615629 : static void unref_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
207 17239906 : if (!gpr_unref(&t->refs)) return;
208 4014 : destruct_transport(exec_ctx, t);
209 : }
210 :
211 8609310 : static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
212 : #endif
213 :
214 4014 : static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
215 : const grpc_channel_args *channel_args,
216 : grpc_endpoint *ep, grpc_mdctx *mdctx,
217 : gpr_uint8 is_client) {
218 : size_t i;
219 : int j;
220 :
221 : GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
222 : GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
223 :
224 4014 : memset(t, 0, sizeof(*t));
225 :
226 4014 : t->base.vtable = &vtable;
227 4014 : t->ep = ep;
228 : /* one ref is for destroy, the other for when ep becomes NULL */
229 4014 : gpr_ref_init(&t->refs, 2);
230 : /* ref is dropped at transport close() */
231 4014 : gpr_ref_init(&t->shutdown_ep_refs, 1);
232 4014 : gpr_mu_init(&t->mu);
233 4014 : grpc_mdctx_ref(mdctx);
234 4014 : t->peer_string = grpc_endpoint_get_peer(ep);
235 4014 : t->metadata_context = mdctx;
236 4014 : t->endpoint_reading = 1;
237 4014 : t->global.next_stream_id = is_client ? 1 : 2;
238 4014 : t->global.is_client = is_client;
239 4014 : t->global.outgoing_window = DEFAULT_WINDOW;
240 4014 : t->global.incoming_window = DEFAULT_WINDOW;
241 4014 : t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
242 4014 : t->global.ping_counter = 1;
243 4014 : t->global.pings.next = t->global.pings.prev = &t->global.pings;
244 4014 : t->parsing.is_client = is_client;
245 4014 : t->parsing.str_grpc_timeout =
246 4014 : grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
247 4014 : t->parsing.deframe_state =
248 : is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
249 4014 : t->writing.is_client = is_client;
250 4014 : grpc_connectivity_state_init(
251 : &t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
252 : is_client ? "client_transport" : "server_transport");
253 :
254 4014 : gpr_slice_buffer_init(&t->global.qbuf);
255 :
256 4014 : gpr_slice_buffer_init(&t->writing.outbuf);
257 4014 : grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
258 4014 : grpc_closure_init(&t->writing_action, writing_action, t);
259 :
260 4014 : gpr_slice_buffer_init(&t->parsing.qbuf);
261 4014 : grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
262 4014 : grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
263 :
264 4014 : grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
265 4014 : &t->writing);
266 4014 : grpc_closure_init(&t->recv_data, recv_data, t);
267 4014 : gpr_slice_buffer_init(&t->read_buffer);
268 :
269 4014 : if (is_client) {
270 1992 : gpr_slice_buffer_add(
271 : &t->global.qbuf,
272 : gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
273 : }
274 : /* 8 is a random stab in the dark as to a good initial size: it's small enough
275 : that it shouldn't waste memory for infrequently used connections, yet
276 : large enough that the exponential growth should happen nicely when it's
277 : needed.
278 : TODO(ctiller): tune this */
279 4014 : grpc_chttp2_stream_map_init(&t->parsing_stream_map, 8);
280 4014 : grpc_chttp2_stream_map_init(&t->new_stream_map, 8);
281 :
282 : /* copy in initial settings to all setting sets */
283 32103 : for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
284 28089 : t->parsing.settings[i] = grpc_chttp2_settings_parameters[i].default_value;
285 140447 : for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
286 112358 : t->global.settings[j][i] =
287 112358 : grpc_chttp2_settings_parameters[i].default_value;
288 : }
289 : }
290 4014 : t->global.dirtied_local_settings = 1;
291 : /* Hack: it's common for implementations to assume 65536 bytes initial send
292 : window -- this should by rights be 0 */
293 4014 : t->global.force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
294 4014 : t->global.sent_local_settings = 0;
295 :
296 : /* configure http2 the way we like it */
297 4014 : if (is_client) {
298 1992 : push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
299 1992 : push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
300 : }
301 4014 : push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
302 :
303 4014 : if (channel_args) {
304 5642 : for (i = 0; i < channel_args->num_args; i++) {
305 2458 : if (0 ==
306 2458 : strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
307 20 : if (is_client) {
308 0 : gpr_log(GPR_ERROR, "%s: is ignored on the client",
309 : GRPC_ARG_MAX_CONCURRENT_STREAMS);
310 20 : } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
311 0 : gpr_log(GPR_ERROR, "%s: must be an integer",
312 : GRPC_ARG_MAX_CONCURRENT_STREAMS);
313 : } else {
314 20 : push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
315 20 : (gpr_uint32)channel_args->args[i].value.integer);
316 : }
317 2438 : } else if (0 == strcmp(channel_args->args[i].key,
318 : GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
319 197 : if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
320 0 : gpr_log(GPR_ERROR, "%s: must be an integer",
321 : GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER);
322 394 : } else if ((t->global.next_stream_id & 1) !=
323 197 : (channel_args->args[i].value.integer & 1)) {
324 0 : gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
325 : GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER,
326 0 : t->global.next_stream_id & 1,
327 : is_client ? "client" : "server");
328 : } else {
329 197 : t->global.next_stream_id =
330 197 : (gpr_uint32)channel_args->args[i].value.integer;
331 : }
332 : }
333 : }
334 : }
335 4014 : }
336 :
337 4014 : static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
338 4014 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
339 :
340 4014 : lock(t);
341 4014 : t->destroying = 1;
342 4014 : drop_connection(exec_ctx, t);
343 4014 : unlock(exec_ctx, t);
344 :
345 4014 : UNREF_TRANSPORT(exec_ctx, t, "destroy");
346 4014 : }
347 :
348 : /** block grpc_endpoint_shutdown being called until a paired
349 : allow_endpoint_shutdown is made */
350 5909561 : static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
351 5909561 : GPR_ASSERT(t->ep);
352 5909561 : gpr_ref(&t->shutdown_ep_refs);
353 5910538 : }
354 :
355 3449387 : static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx,
356 : grpc_chttp2_transport *t) {
357 3449387 : if (gpr_unref(&t->shutdown_ep_refs)) {
358 4013 : if (t->ep) {
359 4013 : grpc_endpoint_shutdown(exec_ctx, t->ep);
360 : }
361 : }
362 3449642 : }
363 :
364 2465619 : static void allow_endpoint_shutdown_unlocked(grpc_exec_ctx *exec_ctx,
365 : grpc_chttp2_transport *t) {
366 2465619 : if (gpr_unref(&t->shutdown_ep_refs)) {
367 1 : gpr_mu_lock(&t->mu);
368 1 : if (t->ep) {
369 1 : grpc_endpoint_shutdown(exec_ctx, t->ep);
370 : }
371 1 : gpr_mu_unlock(&t->mu);
372 : }
373 2465605 : }
374 :
375 4014 : static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
376 : grpc_chttp2_transport *t) {
377 4014 : grpc_endpoint_destroy(exec_ctx, t->ep);
378 4014 : t->ep = NULL;
379 : /* safe because we'll still have the ref for write */
380 4014 : UNREF_TRANSPORT(exec_ctx, t, "disconnect");
381 4014 : }
382 :
383 11711 : static void close_transport_locked(grpc_exec_ctx *exec_ctx,
384 : grpc_chttp2_transport *t) {
385 11711 : if (!t->closed) {
386 4014 : t->closed = 1;
387 4014 : connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_FATAL_FAILURE,
388 : "close_transport");
389 4014 : if (t->ep) {
390 4014 : allow_endpoint_shutdown_locked(exec_ctx, t);
391 : }
392 : }
393 11711 : }
394 :
395 2700138 : static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
396 : grpc_stream *gs, const void *server_data,
397 : grpc_transport_stream_op *initial_op) {
398 2700138 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
399 2700138 : grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
400 :
401 2700138 : memset(s, 0, sizeof(*s));
402 :
403 2700138 : grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.incoming_metadata);
404 2704773 : grpc_chttp2_incoming_metadata_buffer_init(&s->global.incoming_metadata);
405 2704631 : grpc_sopb_init(&s->writing.sopb);
406 2704601 : grpc_sopb_init(&s->global.incoming_sopb);
407 2704454 : grpc_chttp2_data_parser_init(&s->parsing.data_parser);
408 :
409 2703832 : REF_TRANSPORT(t, "stream");
410 :
411 2704801 : lock(t);
412 2704807 : grpc_chttp2_register_stream(t, s);
413 2704702 : if (server_data) {
414 1301880 : GPR_ASSERT(t->parsing_active);
415 1301880 : s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
416 1301880 : s->global.outgoing_window =
417 : t->global.settings[GRPC_PEER_SETTINGS]
418 1301880 : [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
419 1301880 : s->global.max_recv_bytes = s->parsing.incoming_window =
420 1301880 : s->global.incoming_window =
421 : t->global.settings[GRPC_SENT_SETTINGS]
422 1301880 : [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
423 1301880 : *t->accepting_stream = s;
424 1301880 : grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s);
425 1301859 : s->global.in_stream_map = 1;
426 : }
427 :
428 2704681 : if (initial_op)
429 1301866 : perform_stream_op_locked(exec_ctx, &t->global, &s->global, initial_op);
430 2704630 : unlock(exec_ctx, t);
431 :
432 2704286 : return 0;
433 : }
434 :
435 2702166 : static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
436 : grpc_stream *gs) {
437 2702166 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
438 2702166 : grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
439 : int i;
440 :
441 2702166 : gpr_mu_lock(&t->mu);
442 :
443 2704584 : GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED ||
444 : s->global.id == 0);
445 2704584 : GPR_ASSERT(!s->global.in_stream_map);
446 2704584 : if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
447 239 : close_transport_locked(exec_ctx, t);
448 : }
449 2704185 : if (!t->parsing_active && s->global.id) {
450 2656124 : GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
451 : s->global.id) == NULL);
452 : }
453 :
454 2704196 : grpc_chttp2_list_remove_incoming_window_updated(&t->global, &s->global);
455 2703470 : grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
456 :
457 2703962 : gpr_mu_unlock(&t->mu);
458 :
459 29746774 : for (i = 0; i < STREAM_LIST_COUNT; i++) {
460 27043124 : if (s->included[i]) {
461 4684 : gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
462 2342 : t->global.is_client ? "client" : "server", s->global.id, i);
463 0 : abort();
464 : }
465 : }
466 :
467 2703650 : GPR_ASSERT(s->global.outgoing_sopb == NULL);
468 2703650 : GPR_ASSERT(s->global.publish_sopb == NULL);
469 2703650 : grpc_sopb_destroy(&s->writing.sopb);
470 2704395 : grpc_sopb_destroy(&s->global.incoming_sopb);
471 2703998 : grpc_chttp2_data_parser_destroy(&s->parsing.data_parser);
472 2703478 : grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.incoming_metadata);
473 2703066 : grpc_chttp2_incoming_metadata_buffer_destroy(&s->global.incoming_metadata);
474 2702646 : grpc_chttp2_incoming_metadata_live_op_buffer_end(
475 : &s->global.outstanding_metadata);
476 :
477 2701433 : UNREF_TRANSPORT(exec_ctx, t, "stream");
478 2704760 : }
479 :
480 8989852 : grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
481 : grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) {
482 8989852 : grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
483 8989852 : grpc_chttp2_stream *s =
484 8989852 : grpc_chttp2_stream_map_find(&t->parsing_stream_map, id);
485 8998420 : return s ? &s->parsing : NULL;
486 : }
487 :
488 1301774 : grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
489 : grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) {
490 : grpc_chttp2_stream *accepting;
491 1301774 : grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
492 1301774 : GPR_ASSERT(t->accepting_stream == NULL);
493 1301774 : t->accepting_stream = &accepting;
494 2603548 : t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
495 1301774 : &t->base, (void *)(gpr_uintptr)id);
496 1301890 : t->accepting_stream = NULL;
497 1301890 : return &accepting->parsing;
498 : }
499 :
500 : /*
501 : * LOCK MANAGEMENT
502 : */
503 :
504 : /* We take a grpc_chttp2_transport-global lock in response to calls coming in
505 : from above,
506 : and in response to data being received from below. New data to be written
507 : is always queued, as are callbacks to process data. During unlock() we
508 : check our todo lists and initiate callbacks and flush writes. */
509 :
510 16803057 : static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
511 :
512 16811400 : static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
513 16811400 : unlock_check_read_write_state(exec_ctx, t);
514 29363533 : if (!t->writing_active && !t->closed &&
515 12555229 : grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
516 3444202 : t->writing_active = 1;
517 3444202 : REF_TRANSPORT(t, "writing");
518 3445372 : grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, 1);
519 3445324 : prevent_endpoint_shutdown(t);
520 : }
521 :
522 16809583 : gpr_mu_unlock(&t->mu);
523 16876602 : }
524 :
525 : /*
526 : * OUTPUT PROCESSING
527 : */
528 :
529 8016 : static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
530 : gpr_uint32 value) {
531 8016 : const grpc_chttp2_setting_parameters *sp =
532 : &grpc_chttp2_settings_parameters[id];
533 8016 : gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
534 8016 : if (use_value != value) {
535 0 : gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
536 : value, use_value);
537 : }
538 8018 : if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) {
539 4004 : t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value;
540 4004 : t->global.dirtied_local_settings = 1;
541 : }
542 8018 : }
543 :
544 3444143 : void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
545 : void *transport_writing_ptr, int success) {
546 3444143 : grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
547 3444143 : grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
548 :
549 3444143 : lock(t);
550 :
551 3445374 : allow_endpoint_shutdown_locked(exec_ctx, t);
552 :
553 3445635 : if (!success) {
554 25 : drop_connection(exec_ctx, t);
555 : }
556 :
557 : /* cleanup writing related jazz */
558 3445635 : grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
559 :
560 : /* leave the writing flag up on shutdown to prevent further writes in unlock()
561 : from starting */
562 3441330 : t->writing_active = 0;
563 3441330 : if (t->ep && !t->endpoint_reading) {
564 16 : destroy_endpoint(exec_ctx, t);
565 : }
566 :
567 3441330 : unlock(exec_ctx, t);
568 :
569 3444603 : UNREF_TRANSPORT(exec_ctx, t, "writing");
570 3445634 : }
571 :
572 3441817 : static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
573 : int iomgr_success_ignored) {
574 3441817 : grpc_chttp2_transport *t = gt;
575 3441817 : grpc_chttp2_perform_writes(exec_ctx, &t->writing, t->ep);
576 3443410 : }
577 :
578 229 : void grpc_chttp2_add_incoming_goaway(
579 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
580 : gpr_uint32 goaway_error, gpr_slice goaway_text) {
581 229 : char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
582 229 : gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg);
583 229 : gpr_free(msg);
584 229 : gpr_slice_unref(goaway_text);
585 229 : transport_global->seen_goaway = 1;
586 229 : connectivity_state_set(exec_ctx, transport_global, GRPC_CHANNEL_FATAL_FAILURE,
587 : "got_goaway");
588 229 : }
589 :
590 4103470 : static void maybe_start_some_streams(
591 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
592 : grpc_chttp2_stream_global *stream_global;
593 : /* start streams where we have free grpc_chttp2_stream ids and free
594 : * concurrency */
595 15113565 : while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID &&
596 5503598 : transport_global->concurrent_stream_count <
597 : transport_global
598 : ->settings[GRPC_PEER_SETTINGS]
599 9709058 : [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
600 4205983 : grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
601 : &stream_global)) {
602 1402904 : GRPC_CHTTP2_IF_TRACING(gpr_log(
603 : GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
604 : transport_global->is_client ? "CLI" : "SVR", stream_global,
605 : transport_global->next_stream_id));
606 :
607 1402896 : GPR_ASSERT(stream_global->id == 0);
608 1402896 : stream_global->id = transport_global->next_stream_id;
609 1402896 : transport_global->next_stream_id += 2;
610 :
611 1402896 : if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
612 170 : connectivity_state_set(exec_ctx, transport_global,
613 : GRPC_CHANNEL_TRANSIENT_FAILURE,
614 : "no_more_stream_ids");
615 : }
616 :
617 2805792 : stream_global->outgoing_window =
618 : transport_global->settings[GRPC_PEER_SETTINGS]
619 1402896 : [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
620 2805792 : stream_global->incoming_window =
621 : transport_global->settings[GRPC_SENT_SETTINGS]
622 1402896 : [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
623 2805792 : stream_global->max_recv_bytes =
624 1402896 : GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes);
625 4208688 : grpc_chttp2_stream_map_add(
626 1402896 : &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
627 1402896 : stream_global->id, STREAM_FROM_GLOBAL(stream_global));
628 1402881 : stream_global->in_stream_map = 1;
629 1402881 : transport_global->concurrent_stream_count++;
630 1402881 : grpc_chttp2_list_add_incoming_window_updated(transport_global,
631 : stream_global);
632 1402846 : grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
633 : }
634 : /* cancel out streams that will never be started */
635 8206082 : while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
636 340 : grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
637 : &stream_global)) {
638 0 : cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE);
639 : }
640 4102672 : }
641 :
642 9554515 : static void perform_stream_op_locked(
643 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
644 : grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) {
645 9554515 : if (op->cancel_with_status != GRPC_STATUS_OK) {
646 278775 : cancel_from_api(transport_global, stream_global, op->cancel_with_status);
647 : }
648 :
649 9555650 : if (op->close_with_status != GRPC_STATUS_OK) {
650 7 : close_from_api(transport_global, stream_global, op->close_with_status,
651 : op->optional_close_message);
652 : }
653 :
654 9555650 : if (op->send_ops) {
655 3007349 : GPR_ASSERT(stream_global->outgoing_sopb == NULL);
656 3007349 : stream_global->send_done_closure = op->on_done_send;
657 3007349 : if (!stream_global->cancelled) {
658 3007257 : stream_global->written_anything = 1;
659 3007257 : stream_global->outgoing_sopb = op->send_ops;
660 5710707 : if (op->is_last_send &&
661 2703450 : stream_global->write_state == GRPC_WRITE_STATE_OPEN) {
662 2703625 : stream_global->write_state = GRPC_WRITE_STATE_QUEUED_CLOSE;
663 : }
664 3007257 : if (stream_global->id == 0) {
665 1402932 : GRPC_CHTTP2_IF_TRACING(gpr_log(
666 : GPR_DEBUG,
667 : "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency",
668 : transport_global->is_client ? "CLI" : "SVR", stream_global));
669 1402932 : grpc_chttp2_list_add_waiting_for_concurrency(transport_global,
670 : stream_global);
671 1402891 : maybe_start_some_streams(exec_ctx, transport_global);
672 1604325 : } else if (stream_global->outgoing_window > 0) {
673 1604297 : grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
674 : }
675 : } else {
676 92 : grpc_sopb_reset(op->send_ops);
677 92 : grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 0);
678 : }
679 : }
680 :
681 9554160 : if (op->recv_ops) {
682 5344555 : GPR_ASSERT(stream_global->publish_sopb == NULL);
683 5344555 : GPR_ASSERT(stream_global->published_state != GRPC_STREAM_CLOSED);
684 5344555 : stream_global->recv_done_closure = op->on_done_recv;
685 5344555 : stream_global->publish_sopb = op->recv_ops;
686 5344555 : stream_global->publish_sopb->nops = 0;
687 5344555 : stream_global->publish_state = op->recv_state;
688 : /* clamp max recv bytes */
689 5344555 : op->max_recv_bytes = GPR_MIN(op->max_recv_bytes, GPR_UINT32_MAX);
690 5344555 : if (stream_global->max_recv_bytes < op->max_recv_bytes) {
691 3002211 : GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
692 : "op", transport_global, stream_global, max_recv_bytes,
693 : op->max_recv_bytes - stream_global->max_recv_bytes);
694 3001952 : GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
695 : "op", transport_global, stream_global, unannounced_incoming_window,
696 : op->max_recv_bytes - stream_global->max_recv_bytes);
697 6003904 : stream_global->unannounced_incoming_window +=
698 3001952 : (gpr_uint32)op->max_recv_bytes - stream_global->max_recv_bytes;
699 3001952 : stream_global->max_recv_bytes = (gpr_uint32)op->max_recv_bytes;
700 : }
701 5344296 : grpc_chttp2_incoming_metadata_live_op_buffer_end(
702 : &stream_global->outstanding_metadata);
703 5345145 : grpc_chttp2_list_add_read_write_state_changed(transport_global,
704 : stream_global);
705 5345201 : if (stream_global->id != 0) {
706 5345112 : grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
707 : }
708 : }
709 :
710 9554403 : if (op->bind_pollset) {
711 2703701 : add_to_pollset_locked(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
712 : op->bind_pollset);
713 : }
714 :
715 9553853 : grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
716 9546146 : }
717 :
718 8224266 : static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
719 : grpc_stream *gs, grpc_transport_stream_op *op) {
720 8224266 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
721 8224266 : grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
722 :
723 8224266 : lock(t);
724 8258219 : perform_stream_op_locked(exec_ctx, &t->global, &s->global, op);
725 8246653 : unlock(exec_ctx, t);
726 8256338 : }
727 :
728 0 : static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
729 0 : grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
730 0 : p->next = &t->global.pings;
731 0 : p->prev = p->next->prev;
732 0 : p->prev->next = p->next->prev = p;
733 0 : p->id[0] = (gpr_uint8)((t->global.ping_counter >> 56) & 0xff);
734 0 : p->id[1] = (gpr_uint8)((t->global.ping_counter >> 48) & 0xff);
735 0 : p->id[2] = (gpr_uint8)((t->global.ping_counter >> 40) & 0xff);
736 0 : p->id[3] = (gpr_uint8)((t->global.ping_counter >> 32) & 0xff);
737 0 : p->id[4] = (gpr_uint8)((t->global.ping_counter >> 24) & 0xff);
738 0 : p->id[5] = (gpr_uint8)((t->global.ping_counter >> 16) & 0xff);
739 0 : p->id[6] = (gpr_uint8)((t->global.ping_counter >> 8) & 0xff);
740 0 : p->id[7] = (gpr_uint8)(t->global.ping_counter & 0xff);
741 0 : p->on_recv = on_recv;
742 0 : gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
743 0 : }
744 :
745 11123 : static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
746 : grpc_transport_op *op) {
747 11123 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
748 11123 : int close_transport = 0;
749 :
750 11123 : lock(t);
751 :
752 11123 : grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
753 :
754 11124 : if (op->on_connectivity_state_change) {
755 5588 : grpc_connectivity_state_notify_on_state_change(
756 : exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
757 : op->on_connectivity_state_change);
758 : }
759 :
760 11126 : if (op->send_goaway) {
761 1772 : t->global.sent_goaway = 1;
762 5316 : grpc_chttp2_goaway_append(
763 : t->global.last_incoming_stream_id,
764 1772 : (gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
765 1772 : gpr_slice_ref(*op->goaway_message), &t->global.qbuf);
766 1772 : close_transport = !grpc_chttp2_has_streams(t);
767 : }
768 :
769 11126 : if (op->set_accept_stream != NULL) {
770 2022 : t->channel_callback.accept_stream = op->set_accept_stream;
771 2022 : t->channel_callback.accept_stream_user_data =
772 2022 : op->set_accept_stream_user_data;
773 : }
774 :
775 11126 : if (op->bind_pollset) {
776 2149 : add_to_pollset_locked(exec_ctx, t, op->bind_pollset);
777 : }
778 :
779 11126 : if (op->bind_pollset_set) {
780 1547 : add_to_pollset_set_locked(exec_ctx, t, op->bind_pollset_set);
781 : }
782 :
783 11127 : if (op->send_ping) {
784 0 : send_ping_locked(t, op->send_ping);
785 : }
786 :
787 11127 : if (op->disconnect) {
788 1616 : close_transport_locked(exec_ctx, t);
789 : }
790 :
791 11127 : unlock(exec_ctx, t);
792 :
793 11128 : if (close_transport) {
794 1648 : lock(t);
795 1648 : close_transport_locked(exec_ctx, t);
796 1648 : unlock(exec_ctx, t);
797 : }
798 11128 : }
799 :
800 : /*
801 : * INPUT PROCESSING
802 : */
803 :
804 12629478 : static grpc_stream_state compute_state(gpr_uint8 write_closed,
805 : gpr_uint8 read_closed) {
806 12629478 : if (write_closed && read_closed) return GRPC_STREAM_CLOSED;
807 9926843 : if (write_closed) return GRPC_STREAM_SEND_CLOSED;
808 9926843 : if (read_closed) return GRPC_STREAM_RECV_CLOSED;
809 7086222 : return GRPC_STREAM_OPEN;
810 : }
811 :
812 2700826 : static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
813 : gpr_uint32 id) {
814 : size_t new_stream_count;
815 2700826 : grpc_chttp2_stream *s =
816 2700826 : grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id);
817 2704059 : if (!s) {
818 64 : s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id);
819 : }
820 2704059 : grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
821 2703331 : GPR_ASSERT(s);
822 2703331 : s->global.in_stream_map = 0;
823 2703331 : if (t->parsing.incoming_stream == &s->parsing) {
824 9 : t->parsing.incoming_stream = NULL;
825 9 : grpc_chttp2_parsing_become_skip_parser(&t->parsing);
826 : }
827 2703331 : if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
828 124 : close_transport_locked(exec_ctx, t);
829 : }
830 :
831 5406262 : new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
832 2703259 : grpc_chttp2_stream_map_size(&t->new_stream_map);
833 2702958 : GPR_ASSERT(new_stream_count <= GPR_UINT32_MAX);
834 2702958 : if (new_stream_count != t->global.concurrent_stream_count) {
835 2703076 : t->global.concurrent_stream_count = (gpr_uint32)new_stream_count;
836 2703076 : maybe_start_some_streams(exec_ctx, &t->global);
837 : }
838 2702297 : }
839 :
840 16813749 : static void unlock_check_read_write_state(grpc_exec_ctx *exec_ctx,
841 : grpc_chttp2_transport *t) {
842 16813749 : grpc_chttp2_transport_global *transport_global = &t->global;
843 : grpc_chttp2_stream_global *stream_global;
844 : grpc_stream_state state;
845 :
846 16813749 : if (!t->parsing_active) {
847 : /* if a stream is in the stream map, and gets cancelled, we need to ensure
848 : we are not parsing before continuing the cancellation to keep things in
849 : a sane state */
850 30803735 : while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
851 : &stream_global)) {
852 47035 : GPR_ASSERT(stream_global->in_stream_map);
853 47035 : GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_OPEN);
854 47035 : GPR_ASSERT(stream_global->read_closed);
855 47035 : remove_stream(exec_ctx, t, stream_global->id);
856 47035 : grpc_chttp2_list_add_read_write_state_changed(transport_global,
857 : stream_global);
858 : }
859 : }
860 :
861 16817248 : if (!t->writing_active) {
862 25160935 : while (grpc_chttp2_list_pop_cancelled_waiting_for_writing(transport_global,
863 : &stream_global)) {
864 81 : grpc_chttp2_list_add_read_write_state_changed(transport_global,
865 : stream_global);
866 : }
867 : }
868 :
869 47815921 : while (grpc_chttp2_list_pop_read_write_state_changed(transport_global,
870 : &stream_global)) {
871 14145070 : if (stream_global->cancelled) {
872 575912 : if (t->writing_active &&
873 273594 : stream_global->write_state != GRPC_WRITE_STATE_SENT_CLOSE) {
874 84 : grpc_chttp2_list_add_cancelled_waiting_for_writing(transport_global,
875 : stream_global);
876 : } else {
877 302234 : stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
878 302234 : if (stream_global->outgoing_sopb != NULL) {
879 1 : grpc_sopb_reset(stream_global->outgoing_sopb);
880 1 : stream_global->outgoing_sopb = NULL;
881 1 : grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 1);
882 : }
883 302238 : stream_global->read_closed = 1;
884 302238 : if (!stream_global->published_cancelled) {
885 : char buffer[GPR_LTOA_MIN_BUFSIZE];
886 284248 : gpr_ltoa(stream_global->cancelled_status, buffer);
887 568508 : grpc_chttp2_incoming_metadata_buffer_add(
888 284257 : &stream_global->incoming_metadata,
889 : grpc_mdelem_from_strings(t->metadata_context, "grpc-status",
890 : buffer));
891 568506 : grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
892 568506 : &stream_global->incoming_metadata, &stream_global->incoming_sopb);
893 284255 : stream_global->published_cancelled = 1;
894 : }
895 : }
896 : }
897 21812232 : if (stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE &&
898 10667184 : stream_global->read_closed && stream_global->in_stream_map) {
899 2718468 : if (t->parsing_active) {
900 64631 : grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
901 : stream_global);
902 : } else {
903 2653837 : remove_stream(exec_ctx, t, stream_global->id);
904 : }
905 : }
906 14183824 : if (!stream_global->publish_sopb) {
907 1540855 : continue;
908 : }
909 12642969 : if (stream_global->writing_now != 0) {
910 25803 : continue;
911 : }
912 : /* FIXME(ctiller): we include in_stream_map in our computation of
913 : whether the stream is write-closed. This is completely bogus,
914 : but has the effect of delaying stream-closed until the stream
915 : is indeed evicted from the stream map, making it safe to delete.
916 : To fix this will require having an edge after stream-closed
917 : indicating that the stream is closed AND safe to delete. */
918 25234332 : state = compute_state(
919 18877984 : stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE &&
920 6260818 : !stream_global->in_stream_map,
921 12617166 : stream_global->read_closed);
922 21182600 : if (stream_global->incoming_sopb.nops == 0 &&
923 8579979 : state == stream_global->published_state) {
924 7283477 : continue;
925 : }
926 15957432 : grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
927 10638288 : &stream_global->incoming_metadata, &stream_global->incoming_sopb,
928 5319144 : &stream_global->outstanding_metadata);
929 5343511 : grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
930 5343784 : stream_global->published_state = *stream_global->publish_state = state;
931 5343784 : grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_done_closure, 1);
932 5342312 : stream_global->recv_done_closure = NULL;
933 5342312 : stream_global->publish_sopb = NULL;
934 5342312 : stream_global->publish_state = NULL;
935 : }
936 16785611 : }
937 :
938 278835 : static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
939 : grpc_chttp2_stream_global *stream_global,
940 : grpc_status_code status) {
941 278835 : stream_global->cancelled = 1;
942 278835 : stream_global->cancelled_status = status;
943 278835 : if (stream_global->id != 0) {
944 278742 : gpr_slice_buffer_add(
945 : &transport_global->qbuf,
946 : grpc_chttp2_rst_stream_create(
947 : stream_global->id,
948 278742 : (gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(status)));
949 : }
950 278836 : grpc_chttp2_list_add_read_write_state_changed(transport_global,
951 : stream_global);
952 278834 : }
953 :
954 7 : static void close_from_api(grpc_chttp2_transport_global *transport_global,
955 : grpc_chttp2_stream_global *stream_global,
956 : grpc_status_code status,
957 : gpr_slice *optional_message) {
958 : gpr_slice hdr;
959 : gpr_slice status_hdr;
960 : gpr_slice message_pfx;
961 : gpr_uint8 *p;
962 7 : gpr_uint32 len = 0;
963 :
964 7 : GPR_ASSERT(status >= 0 && (int)status < 100);
965 :
966 7 : stream_global->cancelled = 1;
967 7 : stream_global->cancelled_status = status;
968 7 : GPR_ASSERT(stream_global->id != 0);
969 7 : GPR_ASSERT(!stream_global->written_anything);
970 :
971 : /* Hand roll a header block.
972 : This is unnecessarily ugly - at some point we should find a more elegant
973 : solution.
974 : It's complicated by the fact that our send machinery would be dead by the
975 : time we got around to sending this, so instead we ignore HPACK compression
976 : and just write the uncompressed bytes onto the wire. */
977 7 : status_hdr = gpr_slice_malloc(15 + (status >= 10));
978 7 : p = GPR_SLICE_START_PTR(status_hdr);
979 7 : *p++ = 0x40; /* literal header */
980 7 : *p++ = 11; /* len(grpc-status) */
981 7 : *p++ = 'g';
982 7 : *p++ = 'r';
983 7 : *p++ = 'p';
984 7 : *p++ = 'c';
985 7 : *p++ = '-';
986 7 : *p++ = 's';
987 7 : *p++ = 't';
988 7 : *p++ = 'a';
989 7 : *p++ = 't';
990 7 : *p++ = 'u';
991 7 : *p++ = 's';
992 7 : if (status < 10) {
993 0 : *p++ = 1;
994 0 : *p++ = (gpr_uint8)('0' + status);
995 : } else {
996 7 : *p++ = 2;
997 7 : *p++ = (gpr_uint8)('0' + (status / 10));
998 7 : *p++ = (gpr_uint8)('0' + (status % 10));
999 : }
1000 7 : GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr));
1001 7 : len += (gpr_uint32)GPR_SLICE_LENGTH(status_hdr);
1002 :
1003 7 : if (optional_message) {
1004 7 : GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127);
1005 7 : message_pfx = gpr_slice_malloc(15);
1006 7 : p = GPR_SLICE_START_PTR(message_pfx);
1007 7 : *p++ = 0x40;
1008 7 : *p++ = 12; /* len(grpc-message) */
1009 7 : *p++ = 'g';
1010 7 : *p++ = 'r';
1011 7 : *p++ = 'p';
1012 7 : *p++ = 'c';
1013 7 : *p++ = '-';
1014 7 : *p++ = 'm';
1015 7 : *p++ = 'e';
1016 7 : *p++ = 's';
1017 7 : *p++ = 's';
1018 7 : *p++ = 'a';
1019 7 : *p++ = 'g';
1020 7 : *p++ = 'e';
1021 7 : *p++ = (gpr_uint8)GPR_SLICE_LENGTH(*optional_message);
1022 7 : GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx));
1023 7 : len += (gpr_uint32)GPR_SLICE_LENGTH(message_pfx);
1024 7 : len += (gpr_uint32)GPR_SLICE_LENGTH(*optional_message);
1025 : }
1026 :
1027 7 : hdr = gpr_slice_malloc(9);
1028 7 : p = GPR_SLICE_START_PTR(hdr);
1029 7 : *p++ = (gpr_uint8)(len >> 16);
1030 7 : *p++ = (gpr_uint8)(len >> 8);
1031 7 : *p++ = (gpr_uint8)(len);
1032 7 : *p++ = GRPC_CHTTP2_FRAME_HEADER;
1033 7 : *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
1034 7 : *p++ = (gpr_uint8)(stream_global->id >> 24);
1035 7 : *p++ = (gpr_uint8)(stream_global->id >> 16);
1036 7 : *p++ = (gpr_uint8)(stream_global->id >> 8);
1037 7 : *p++ = (gpr_uint8)(stream_global->id);
1038 7 : GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr));
1039 :
1040 7 : gpr_slice_buffer_add(&transport_global->qbuf, hdr);
1041 7 : gpr_slice_buffer_add(&transport_global->qbuf, status_hdr);
1042 7 : if (optional_message) {
1043 7 : gpr_slice_buffer_add(&transport_global->qbuf, message_pfx);
1044 7 : gpr_slice_buffer_add(&transport_global->qbuf,
1045 : gpr_slice_ref(*optional_message));
1046 : }
1047 :
1048 7 : gpr_slice_buffer_add(
1049 : &transport_global->qbuf,
1050 : grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR));
1051 :
1052 7 : grpc_chttp2_list_add_read_write_state_changed(transport_global,
1053 : stream_global);
1054 7 : }
1055 :
1056 60 : static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
1057 : void *user_data,
1058 : grpc_chttp2_stream_global *stream_global) {
1059 60 : cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE);
1060 60 : }
1061 :
1062 8084 : static void end_all_the_calls(grpc_chttp2_transport *t) {
1063 8084 : grpc_chttp2_for_all_streams(&t->global, NULL, cancel_stream_cb);
1064 8084 : }
1065 :
1066 8084 : static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
1067 8084 : close_transport_locked(exec_ctx, t);
1068 8084 : end_all_the_calls(t);
1069 8084 : }
1070 :
1071 : /** update window from a settings change */
1072 0 : static void update_global_window(void *args, gpr_uint32 id, void *stream) {
1073 0 : grpc_chttp2_transport *t = args;
1074 0 : grpc_chttp2_stream *s = stream;
1075 0 : grpc_chttp2_transport_global *transport_global = &t->global;
1076 0 : grpc_chttp2_stream_global *stream_global = &s->global;
1077 : int was_zero;
1078 : int is_zero;
1079 :
1080 0 : GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("settings", transport_global, stream_global,
1081 : outgoing_window,
1082 : t->parsing.initial_window_update);
1083 0 : was_zero = stream_global->outgoing_window <= 0;
1084 0 : stream_global->outgoing_window += t->parsing.initial_window_update;
1085 0 : is_zero = stream_global->outgoing_window <= 0;
1086 :
1087 0 : if (was_zero && !is_zero) {
1088 0 : grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
1089 : }
1090 0 : }
1091 :
1092 4014 : static void read_error_locked(grpc_exec_ctx *exec_ctx,
1093 : grpc_chttp2_transport *t) {
1094 4014 : t->endpoint_reading = 0;
1095 4014 : if (!t->writing_active && t->ep) {
1096 3998 : destroy_endpoint(exec_ctx, t);
1097 : }
1098 4014 : }
1099 :
1100 : /* tcp read callback */
1101 2469546 : static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) {
1102 : size_t i;
1103 2469546 : int keep_reading = 0;
1104 2469546 : grpc_chttp2_transport *t = tp;
1105 :
1106 2469546 : lock(t);
1107 2469634 : i = 0;
1108 2469634 : GPR_ASSERT(!t->parsing_active);
1109 2469634 : if (!t->closed) {
1110 2466034 : t->parsing_active = 1;
1111 : /* merge stream lists */
1112 2466034 : grpc_chttp2_stream_map_move_into(&t->new_stream_map,
1113 : &t->parsing_stream_map);
1114 2466012 : grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
1115 2465830 : gpr_mu_unlock(&t->mu);
1116 16177566 : for (; i < t->read_buffer.count &&
1117 5622810 : grpc_chttp2_perform_read(exec_ctx, &t->parsing,
1118 5622810 : t->read_buffer.slices[i]);
1119 5622752 : i++)
1120 : ;
1121 2465988 : gpr_mu_lock(&t->mu);
1122 2466032 : if (i != t->read_buffer.count) {
1123 31 : drop_connection(exec_ctx, t);
1124 : }
1125 : /* merge stream lists */
1126 2466032 : grpc_chttp2_stream_map_move_into(&t->new_stream_map,
1127 : &t->parsing_stream_map);
1128 2466031 : t->global.concurrent_stream_count =
1129 2466031 : (gpr_uint32)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
1130 2466031 : if (t->parsing.initial_window_update != 0) {
1131 0 : grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
1132 : update_global_window, t);
1133 0 : t->parsing.initial_window_update = 0;
1134 : }
1135 : /* handle higher level things */
1136 2466031 : grpc_chttp2_publish_reads(exec_ctx, &t->global, &t->parsing);
1137 2466005 : t->parsing_active = 0;
1138 : }
1139 2469605 : if (!success || i != t->read_buffer.count || t->closed) {
1140 4014 : drop_connection(exec_ctx, t);
1141 4014 : read_error_locked(exec_ctx, t);
1142 2465591 : } else if (!t->closed) {
1143 2465592 : keep_reading = 1;
1144 2465592 : REF_TRANSPORT(t, "keep_reading");
1145 2465626 : prevent_endpoint_shutdown(t);
1146 : }
1147 2469638 : gpr_slice_buffer_reset_and_unref(&t->read_buffer);
1148 2469632 : unlock(exec_ctx, t);
1149 :
1150 2469635 : if (keep_reading) {
1151 2465621 : grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->recv_data);
1152 2465619 : allow_endpoint_shutdown_unlocked(exec_ctx, t);
1153 2465627 : UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
1154 : } else {
1155 4014 : UNREF_TRANSPORT(exec_ctx, t, "recv_data");
1156 : }
1157 2469641 : }
1158 :
1159 : /*
1160 : * CALLBACK LOOP
1161 : */
1162 :
1163 4413 : static void connectivity_state_set(
1164 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
1165 : grpc_connectivity_state state, const char *reason) {
1166 4413 : GRPC_CHTTP2_IF_TRACING(
1167 : gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
1168 4413 : grpc_connectivity_state_set(exec_ctx, &TRANSPORT_FROM_GLOBAL(transport_global)
1169 : ->channel_callback.state_tracker,
1170 : state, reason);
1171 4413 : }
1172 :
1173 : /*
1174 : * POLLSET STUFF
1175 : */
1176 :
1177 2705377 : static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
1178 : grpc_chttp2_transport *t,
1179 : grpc_pollset *pollset) {
1180 2705377 : if (t->ep) {
1181 2705943 : grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
1182 : }
1183 2705615 : }
1184 :
1185 1547 : static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
1186 : grpc_chttp2_transport *t,
1187 : grpc_pollset_set *pollset_set) {
1188 1547 : if (t->ep) {
1189 1547 : grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
1190 : }
1191 1548 : }
1192 :
1193 : /*
1194 : * TRACING
1195 : */
1196 :
1197 11018 : void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
1198 : const char *context, const char *var,
1199 : int is_client, gpr_uint32 stream_id,
1200 : gpr_int64 current_value, gpr_int64 delta) {
1201 : char *identifier;
1202 : char *context_scope;
1203 : char *context_thread;
1204 11018 : char *underscore_pos = strchr(context, '_');
1205 11018 : GPR_ASSERT(underscore_pos);
1206 11018 : context_thread = gpr_strdup(underscore_pos + 1);
1207 11018 : context_scope = gpr_strdup(context);
1208 11018 : context_scope[underscore_pos - context] = 0;
1209 11018 : if (stream_id) {
1210 6564 : gpr_asprintf(&identifier, "%s[%d]", context_scope, stream_id);
1211 : } else {
1212 4454 : identifier = gpr_strdup(context_scope);
1213 : }
1214 11018 : gpr_log(GPR_INFO,
1215 : "FLOWCTL: %s %-10s %8s %-27s %8lld %c %8lld = %8lld %-10s [%s:%d]",
1216 : is_client ? "client" : "server", identifier, context_thread, var,
1217 : current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta,
1218 : current_value + delta, reason, file, line);
1219 11018 : gpr_free(identifier);
1220 11018 : gpr_free(context_thread);
1221 11018 : gpr_free(context_scope);
1222 11018 : }
1223 :
1224 : /*
1225 : * INTEGRATION GLUE
1226 : */
1227 :
1228 1094 : static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
1229 1094 : return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string);
1230 : }
1231 :
1232 : static const grpc_transport_vtable vtable = {
1233 : sizeof(grpc_chttp2_stream), init_stream, perform_stream_op,
1234 : perform_transport_op, destroy_stream, destroy_transport, chttp2_get_peer};
1235 :
1236 4014 : grpc_transport *grpc_create_chttp2_transport(
1237 : grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
1238 : grpc_endpoint *ep, grpc_mdctx *mdctx, int is_client) {
1239 4014 : grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport));
1240 4014 : init_transport(exec_ctx, t, channel_args, ep, mdctx, is_client != 0);
1241 4012 : return &t->base;
1242 : }
1243 :
1244 4014 : void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
1245 : grpc_transport *transport,
1246 : gpr_slice *slices, size_t nslices) {
1247 4014 : grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
1248 4014 : REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
1249 4014 : gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
1250 4014 : recv_data(exec_ctx, t, 1);
1251 4014 : }
|