LCOV - code coverage report
Current view: top level - core/transport - chttp2_transport.c (source / functions) Hit Total Coverage
Test: tmp.CaZ6RjdVn2 Lines: 786 852 92.3 %
Date: 2015-12-10 22:15:08 Functions: 58 60 96.7 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/transport/chttp2_transport.h"
      35             : 
      36             : #include <math.h>
      37             : #include <stdio.h>
      38             : #include <string.h>
      39             : 
      40             : #include <grpc/support/alloc.h>
      41             : #include <grpc/support/log.h>
      42             : #include <grpc/support/slice_buffer.h>
      43             : #include <grpc/support/string_util.h>
      44             : #include <grpc/support/useful.h>
      45             : 
      46             : #include "src/core/profiling/timers.h"
      47             : #include "src/core/support/string.h"
      48             : #include "src/core/transport/chttp2/http2_errors.h"
      49             : #include "src/core/transport/chttp2/internal.h"
      50             : #include "src/core/transport/chttp2/status_conversion.h"
      51             : #include "src/core/transport/chttp2/timeout_encoding.h"
      52             : #include "src/core/transport/static_metadata.h"
      53             : #include "src/core/transport/transport_impl.h"
      54             : 
      55             : #define DEFAULT_WINDOW 65535
      56             : #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
      57             : #define MAX_WINDOW 0x7fffffffu
      58             : 
      59             : #define MAX_CLIENT_STREAM_ID 0x7fffffffu
      60             : 
      61             : int grpc_http_trace = 0;
      62             : int grpc_flowctl_trace = 0;
      63             : 
      64             : #define TRANSPORT_FROM_WRITING(tw)                                        \
      65             :   ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
      66             :                                                    writing)))
      67             : 
      68             : #define TRANSPORT_FROM_PARSING(tw)                                        \
      69             :   ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
      70             :                                                    parsing)))
      71             : 
      72             : #define TRANSPORT_FROM_GLOBAL(tg)                                         \
      73             :   ((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \
      74             :                                                    global)))
      75             : 
      76             : #define STREAM_FROM_GLOBAL(sg) \
      77             :   ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global)))
      78             : 
      79             : #define STREAM_FROM_PARSING(sg) \
      80             :   ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, parsing)))
      81             : 
      82             : static const grpc_transport_vtable vtable;
      83             : 
      84             : static void lock(grpc_chttp2_transport *t);
      85             : static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
      86             : 
      87             : /* forward declarations of various callbacks that we'll build closures around */
      88             : static void writing_action(grpc_exec_ctx *exec_ctx, void *t,
      89             :                            int iomgr_success_ignored);
      90             : 
      91             : /** Set a transport level setting, and push it to our peer */
      92             : static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
      93             :                          gpr_uint32 value);
      94             : 
      95             : /** Endpoint callback to process incoming data */
      96             : static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success);
      97             : 
      98             : /** Start disconnection chain */
      99             : static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
     100             : 
     101             : /** Perform a transport_op */
     102             : static void perform_stream_op_locked(
     103             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
     104             :     grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op);
     105             : 
     106             : /** Cancel a stream: coming from the transport API */
     107             : static void cancel_from_api(grpc_exec_ctx *exec_ctx,
     108             :                             grpc_chttp2_transport_global *transport_global,
     109             :                             grpc_chttp2_stream_global *stream_global,
     110             :                             grpc_status_code status);
     111             : 
     112             : static void close_from_api(grpc_exec_ctx *exec_ctx,
     113             :                            grpc_chttp2_transport_global *transport_global,
     114             :                            grpc_chttp2_stream_global *stream_global,
     115             :                            grpc_status_code status,
     116             :                            gpr_slice *optional_message);
     117             : 
     118             : /** Add endpoint from this transport to pollset */
     119             : static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
     120             :                                   grpc_chttp2_transport *t,
     121             :                                   grpc_pollset *pollset);
     122             : static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
     123             :                                       grpc_chttp2_transport *t,
     124             :                                       grpc_pollset_set *pollset_set);
     125             : 
     126             : /** Start new streams that have been created if we can */
     127             : static void maybe_start_some_streams(
     128             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
     129             : 
     130             : static void connectivity_state_set(
     131             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
     132             :     grpc_connectivity_state state, const char *reason);
     133             : 
     134             : static void check_read_ops(grpc_exec_ctx *exec_ctx,
     135             :                            grpc_chttp2_transport_global *transport_global);
     136             : 
     137             : /*
     138             :  * CONSTRUCTION/DESTRUCTION/REFCOUNTING
     139             :  */
     140             : 
     141        5964 : static void destruct_transport(grpc_exec_ctx *exec_ctx,
     142             :                                grpc_chttp2_transport *t) {
     143             :   size_t i;
     144             : 
     145        5964 :   gpr_mu_lock(&t->mu);
     146             : 
     147        5964 :   GPR_ASSERT(t->ep == NULL);
     148             : 
     149        5964 :   gpr_slice_buffer_destroy(&t->global.qbuf);
     150             : 
     151        5964 :   gpr_slice_buffer_destroy(&t->writing.outbuf);
     152        5964 :   grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
     153             : 
     154        5964 :   gpr_slice_buffer_destroy(&t->parsing.qbuf);
     155        5964 :   gpr_slice_buffer_destroy(&t->read_buffer);
     156        5964 :   grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
     157        5964 :   grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
     158             : 
     159       65604 :   for (i = 0; i < STREAM_LIST_COUNT; i++) {
     160       59640 :     GPR_ASSERT(t->lists[i].head == NULL);
     161       59640 :     GPR_ASSERT(t->lists[i].tail == NULL);
     162             :   }
     163             : 
     164        5964 :   GPR_ASSERT(grpc_chttp2_stream_map_size(&t->parsing_stream_map) == 0);
     165        5964 :   GPR_ASSERT(grpc_chttp2_stream_map_size(&t->new_stream_map) == 0);
     166             : 
     167        5964 :   grpc_chttp2_stream_map_destroy(&t->parsing_stream_map);
     168        5964 :   grpc_chttp2_stream_map_destroy(&t->new_stream_map);
     169        5964 :   grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
     170             : 
     171        5964 :   gpr_mu_unlock(&t->mu);
     172        5964 :   gpr_mu_destroy(&t->mu);
     173             : 
     174             :   /* callback remaining pings: they're not allowed to call into the transpot,
     175             :      and maybe they hold resources that need to be freed */
     176       11928 :   while (t->global.pings.next != &t->global.pings) {
     177           0 :     grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
     178           0 :     grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 0);
     179           0 :     ping->next->prev = ping->prev;
     180           0 :     ping->prev->next = ping->next;
     181           0 :     gpr_free(ping);
     182             :   }
     183             : 
     184        5964 :   gpr_free(t->peer_string);
     185        5964 :   gpr_free(t);
     186        5964 : }
     187             : 
     188             : #ifdef REFCOUNTING_DEBUG
     189             : #define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__)
     190             : #define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__)
     191             : static void unref_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
     192             :                             const char *reason, const char *file, int line) {
     193             :   gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count,
     194             :           t->refs.count - 1, reason, file, line);
     195             :   if (!gpr_unref(&t->refs)) return;
     196             :   destruct_transport(exec_ctx, t);
     197             : }
     198             : 
     199             : static void ref_transport(grpc_chttp2_transport *t, const char *reason,
     200             :                           const char *file, int line) {
     201             :   gpr_log(GPR_DEBUG, "chttp2:  ref:%p %d->%d %s [%s:%d]", t, t->refs.count,
     202             :           t->refs.count + 1, reason, file, line);
     203             :   gpr_ref(&t->refs);
     204             : }
     205             : #else
     206             : #define REF_TRANSPORT(t, r) ref_transport(t)
     207             : #define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t)
     208    15591432 : static void unref_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
     209    31192473 :   if (!gpr_unref(&t->refs)) return;
     210        5964 :   destruct_transport(exec_ctx, t);
     211             : }
     212             : 
     213    15578445 : static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
     214             : #endif
     215             : 
     216        6038 : static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
     217             :                            const grpc_channel_args *channel_args,
     218             :                            grpc_endpoint *ep, gpr_uint8 is_client) {
     219             :   size_t i;
     220             :   int j;
     221             : 
     222             :   GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
     223             :              GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
     224             : 
     225        6038 :   memset(t, 0, sizeof(*t));
     226             : 
     227        6038 :   t->base.vtable = &vtable;
     228        6038 :   t->ep = ep;
     229             :   /* one ref is for destroy, the other for when ep becomes NULL */
     230        6038 :   gpr_ref_init(&t->refs, 2);
     231             :   /* ref is dropped at transport close() */
     232        6039 :   gpr_ref_init(&t->shutdown_ep_refs, 1);
     233        6039 :   gpr_mu_init(&t->mu);
     234        6038 :   t->peer_string = grpc_endpoint_get_peer(ep);
     235        6039 :   t->endpoint_reading = 1;
     236        6039 :   t->global.next_stream_id = is_client ? 1 : 2;
     237        6039 :   t->global.is_client = is_client;
     238        6039 :   t->writing.outgoing_window = DEFAULT_WINDOW;
     239        6039 :   t->parsing.incoming_window = DEFAULT_WINDOW;
     240        6039 :   t->global.stream_lookahead = DEFAULT_WINDOW;
     241        6039 :   t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
     242        6039 :   t->global.ping_counter = 1;
     243        6039 :   t->global.pings.next = t->global.pings.prev = &t->global.pings;
     244        6039 :   t->parsing.is_client = is_client;
     245        6039 :   t->parsing.deframe_state =
     246             :       is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
     247        6039 :   t->writing.is_client = is_client;
     248        6039 :   grpc_connectivity_state_init(
     249             :       &t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
     250             :       is_client ? "client_transport" : "server_transport");
     251             : 
     252        6037 :   gpr_slice_buffer_init(&t->global.qbuf);
     253             : 
     254        6037 :   gpr_slice_buffer_init(&t->writing.outbuf);
     255        6037 :   grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor);
     256        6037 :   grpc_closure_init(&t->writing_action, writing_action, t);
     257             : 
     258        6037 :   gpr_slice_buffer_init(&t->parsing.qbuf);
     259        6037 :   grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
     260        6035 :   grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser);
     261             : 
     262        6039 :   grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
     263        6039 :                     &t->writing);
     264        6039 :   grpc_closure_init(&t->recv_data, recv_data, t);
     265        6039 :   gpr_slice_buffer_init(&t->read_buffer);
     266             : 
     267        6039 :   if (is_client) {
     268        2983 :     gpr_slice_buffer_add(
     269             :         &t->global.qbuf,
     270             :         gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
     271             :   }
     272             :   /* 8 is a random stab in the dark as to a good initial size: it's small enough
     273             :      that it shouldn't waste memory for infrequently used connections, yet
     274             :      large enough that the exponential growth should happen nicely when it's
     275             :      needed.
     276             :      TODO(ctiller): tune this */
     277        6039 :   grpc_chttp2_stream_map_init(&t->parsing_stream_map, 8);
     278        6037 :   grpc_chttp2_stream_map_init(&t->new_stream_map, 8);
     279             : 
     280             :   /* copy in initial settings to all setting sets */
     281       48281 :   for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
     282       42244 :     t->parsing.settings[i] = grpc_chttp2_settings_parameters[i].default_value;
     283      211216 :     for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
     284      168972 :       t->global.settings[j][i] =
     285      166676 :           grpc_chttp2_settings_parameters[i].default_value;
     286             :     }
     287             :   }
     288        6037 :   t->global.dirtied_local_settings = 1;
     289             :   /* Hack: it's common for implementations to assume 65536 bytes initial send
     290             :      window -- this should by rights be 0 */
     291        6037 :   t->global.force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
     292        6037 :   t->global.sent_local_settings = 0;
     293             : 
     294             :   /* configure http2 the way we like it */
     295        6037 :   if (is_client) {
     296        2983 :     push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
     297        2983 :     push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
     298             :   }
     299        6037 :   push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
     300             : 
     301        6039 :   if (channel_args) {
     302        9236 :     for (i = 0; i < channel_args->num_args; i++) {
     303        4324 :       if (0 ==
     304        4324 :           strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
     305          22 :         if (is_client) {
     306           0 :           gpr_log(GPR_ERROR, "%s: is ignored on the client",
     307             :                   GRPC_ARG_MAX_CONCURRENT_STREAMS);
     308          22 :         } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
     309           0 :           gpr_log(GPR_ERROR, "%s: must be an integer",
     310             :                   GRPC_ARG_MAX_CONCURRENT_STREAMS);
     311             :         } else {
     312          22 :           push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
     313          22 :                        (gpr_uint32)channel_args->args[i].value.integer);
     314             :         }
     315        4302 :       } else if (0 == strcmp(channel_args->args[i].key,
     316             :                              GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
     317         208 :         if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
     318           0 :           gpr_log(GPR_ERROR, "%s: must be an integer",
     319             :                   GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER);
     320         416 :         } else if ((t->global.next_stream_id & 1) !=
     321         208 :                    (channel_args->args[i].value.integer & 1)) {
     322           0 :           gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
     323             :                   GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER,
     324           0 :                   t->global.next_stream_id & 1,
     325             :                   is_client ? "client" : "server");
     326             :         } else {
     327         208 :           t->global.next_stream_id =
     328         208 :               (gpr_uint32)channel_args->args[i].value.integer;
     329             :         }
     330        4094 :       } else if (0 == strcmp(channel_args->args[i].key,
     331             :                              GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES)) {
     332           0 :         if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
     333           0 :           gpr_log(GPR_ERROR, "%s: must be an integer",
     334             :                   GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES);
     335           0 :         } else if (channel_args->args[i].value.integer <= 5) {
     336           0 :           gpr_log(GPR_ERROR, "%s: must be at least 5",
     337             :                   GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES);
     338             :         } else {
     339           0 :           t->global.stream_lookahead =
     340           0 :               (gpr_uint32)channel_args->args[i].value.integer;
     341             :         }
     342        4094 :       } else if (0 == strcmp(channel_args->args[i].key,
     343             :                              GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER)) {
     344         600 :         if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
     345           0 :           gpr_log(GPR_ERROR, "%s: must be an integer",
     346             :                   GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER);
     347         600 :         } else if (channel_args->args[i].value.integer < 0) {
     348           0 :           gpr_log(GPR_DEBUG, "%s: must be non-negative",
     349             :                   GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_DECODER);
     350             :         } else {
     351         600 :           push_setting(t, GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE,
     352         600 :                        (gpr_uint32)channel_args->args[i].value.integer);
     353             :         }
     354        3494 :       } else if (0 == strcmp(channel_args->args[i].key,
     355             :                              GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER)) {
     356         600 :         if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
     357           0 :           gpr_log(GPR_ERROR, "%s: must be an integer",
     358             :                   GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER);
     359         600 :         } else if (channel_args->args[i].value.integer < 0) {
     360           0 :           gpr_log(GPR_DEBUG, "%s: must be non-negative",
     361             :                   GRPC_ARG_HTTP2_HPACK_TABLE_SIZE_ENCODER);
     362             :         } else {
     363         600 :           grpc_chttp2_hpack_compressor_set_max_usable_size(
     364             :               &t->writing.hpack_compressor,
     365         600 :               (gpr_uint32)channel_args->args[i].value.integer);
     366             :         }
     367             :       }
     368             :     }
     369             :   }
     370        6038 : }
     371             : 
     372        5963 : static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
     373        5940 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
     374             : 
     375        5940 :   lock(t);
     376        5964 :   t->destroying = 1;
     377        5964 :   drop_connection(exec_ctx, t);
     378        5964 :   unlock(exec_ctx, t);
     379             : 
     380        5964 :   UNREF_TRANSPORT(exec_ctx, t, "destroy");
     381        5964 : }
     382             : 
     383             : /** block grpc_endpoint_shutdown being called until a paired
     384             :     allow_endpoint_shutdown is made */
     385    11140667 : static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
     386    11140667 :   GPR_ASSERT(t->ep);
     387    11140667 :   gpr_ref(&t->shutdown_ep_refs);
     388    11141090 : }
     389             : 
     390     4631617 : static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx,
     391             :                                            grpc_chttp2_transport *t) {
     392     4631617 :   if (gpr_unref(&t->shutdown_ep_refs)) {
     393        6038 :     if (t->ep) {
     394        6038 :       grpc_endpoint_shutdown(exec_ctx, t->ep);
     395             :     }
     396             :   }
     397     4631875 : }
     398             : 
     399     6515525 : static void allow_endpoint_shutdown_unlocked(grpc_exec_ctx *exec_ctx,
     400             :                                              grpc_chttp2_transport *t) {
     401     6515525 :   if (gpr_unref(&t->shutdown_ep_refs)) {
     402           0 :     gpr_mu_lock(&t->mu);
     403           0 :     if (t->ep) {
     404           0 :       grpc_endpoint_shutdown(exec_ctx, t->ep);
     405             :     }
     406           0 :     gpr_mu_unlock(&t->mu);
     407             :   }
     408     6515528 : }
     409             : 
     410        6038 : static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
     411             :                              grpc_chttp2_transport *t) {
     412        6038 :   grpc_endpoint_destroy(exec_ctx, t->ep);
     413        6038 :   t->ep = NULL;
     414             :   /* safe because we'll still have the ref for write */
     415        6038 :   UNREF_TRANSPORT(exec_ctx, t, "disconnect");
     416        6038 : }
     417             : 
     418       17716 : static void close_transport_locked(grpc_exec_ctx *exec_ctx,
     419             :                                    grpc_chttp2_transport *t) {
     420       17716 :   if (!t->closed) {
     421        6037 :     t->closed = 1;
     422        6037 :     connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_FATAL_FAILURE,
     423             :                            "close_transport");
     424        6038 :     if (t->ep) {
     425        6038 :       allow_endpoint_shutdown_locked(exec_ctx, t);
     426             :     }
     427             :   }
     428       17717 : }
     429             : 
     430             : #ifdef GRPC_STREAM_REFCOUNT_DEBUG
     431             : void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global,
     432             :                             const char *reason) {
     433             :   grpc_stream_ref(STREAM_FROM_GLOBAL(stream_global)->refcount, reason);
     434             : }
     435             : void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
     436             :                               grpc_chttp2_stream_global *stream_global,
     437             :                               const char *reason) {
     438             :   grpc_stream_unref(exec_ctx, STREAM_FROM_GLOBAL(stream_global)->refcount,
     439             :                     reason);
     440             : }
     441             : #else
     442     9463357 : void grpc_chttp2_stream_ref(grpc_chttp2_stream_global *stream_global) {
     443     9463357 :   grpc_stream_ref(STREAM_FROM_GLOBAL(stream_global)->refcount);
     444     9479824 : }
     445     9469302 : void grpc_chttp2_stream_unref(grpc_exec_ctx *exec_ctx,
     446             :                               grpc_chttp2_stream_global *stream_global) {
     447     9469302 :   grpc_stream_unref(exec_ctx, STREAM_FROM_GLOBAL(stream_global)->refcount);
     448     9484070 : }
     449             : #endif
     450             : 
     451     4440526 : static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
     452             :                        grpc_stream *gs, grpc_stream_refcount *refcount,
     453             :                        const void *server_data) {
     454     4440347 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
     455     4440347 :   grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
     456             : 
     457     4440526 :   memset(s, 0, sizeof(*s));
     458             : 
     459     4440526 :   s->refcount = refcount;
     460     4440526 :   GRPC_CHTTP2_STREAM_REF(&s->global, "chttp2");
     461             : 
     462     4442814 :   grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[0]);
     463     4442997 :   grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.metadata_buffer[1]);
     464     4442976 :   grpc_chttp2_incoming_metadata_buffer_init(
     465             :       &s->global.received_initial_metadata);
     466     4442585 :   grpc_chttp2_incoming_metadata_buffer_init(
     467             :       &s->global.received_trailing_metadata);
     468     4441665 :   grpc_chttp2_data_parser_init(&s->parsing.data_parser);
     469     4442295 :   gpr_slice_buffer_init(&s->writing.flow_controlled_buffer);
     470             : 
     471     4442110 :   REF_TRANSPORT(t, "stream");
     472             : 
     473     4442731 :   lock(t);
     474     4443280 :   grpc_chttp2_register_stream(t, s);
     475     4443235 :   if (server_data) {
     476     2171139 :     GPR_ASSERT(t->parsing_active);
     477     2171139 :     s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
     478     2171139 :     s->parsing.id = s->global.id;
     479     2171139 :     s->global.outgoing_window =
     480             :         t->global.settings[GRPC_PEER_SETTINGS]
     481     2171139 :                           [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     482     2171139 :     s->parsing.incoming_window = s->global.max_recv_bytes =
     483             :         t->global.settings[GRPC_SENT_SETTINGS]
     484     2171139 :                           [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     485     2171139 :     *t->accepting_stream = s;
     486     2171139 :     grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s);
     487     2171116 :     s->global.in_stream_map = 1;
     488             :   }
     489     4443212 :   unlock(exec_ctx, t);
     490             : 
     491     4443345 :   return 0;
     492             : }
     493             : 
     494     4438369 : static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
     495             :                            grpc_stream *gs) {
     496     4438319 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
     497     4438319 :   grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
     498             :   int i;
     499             :   grpc_byte_stream *bs;
     500             : 
     501             :   GPR_TIMER_BEGIN("destroy_stream", 0);
     502             : 
     503     4438369 :   gpr_mu_lock(&t->mu);
     504             : 
     505     4442828 :   GPR_ASSERT((s->global.write_closed && s->global.read_closed) ||
     506             :              s->global.id == 0);
     507     4442828 :   GPR_ASSERT(!s->global.in_stream_map);
     508     4442828 :   if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
     509         315 :     close_transport_locked(exec_ctx, t);
     510             :   }
     511     4442708 :   if (!t->parsing_active && s->global.id) {
     512     4352237 :     GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
     513             :                                            s->global.id) == NULL);
     514             :   }
     515             : 
     516     4442539 :   grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
     517     4442119 :   grpc_chttp2_list_remove_unannounced_incoming_window_available(&t->global,
     518             :                                                                 &s->global);
     519             : 
     520     4441717 :   gpr_mu_unlock(&t->mu);
     521             : 
     522    48868255 :   for (i = 0; i < STREAM_LIST_COUNT; i++) {
     523    44425431 :     if (s->included[i]) {
     524           0 :       gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
     525           0 :               t->global.is_client ? "client" : "server", s->global.id, i);
     526           0 :       abort();
     527             :     }
     528             :   }
     529             : 
     530     8885717 :   while (
     531     4442943 :       (bs = grpc_chttp2_incoming_frame_queue_pop(&s->global.incoming_frames))) {
     532         119 :     grpc_byte_stream_destroy(bs);
     533             :   }
     534             : 
     535     4441288 :   GPR_ASSERT(s->global.send_initial_metadata_finished == NULL);
     536     4441288 :   GPR_ASSERT(s->global.send_message_finished == NULL);
     537     4441288 :   GPR_ASSERT(s->global.send_trailing_metadata_finished == NULL);
     538     4441288 :   GPR_ASSERT(s->global.recv_initial_metadata_finished == NULL);
     539     4441288 :   GPR_ASSERT(s->global.recv_message_ready == NULL);
     540     4441288 :   GPR_ASSERT(s->global.recv_trailing_metadata_finished == NULL);
     541     4441288 :   grpc_chttp2_data_parser_destroy(exec_ctx, &s->parsing.data_parser);
     542     4441275 :   grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[0]);
     543     4440448 :   grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.metadata_buffer[1]);
     544     4439152 :   grpc_chttp2_incoming_metadata_buffer_destroy(
     545             :       &s->global.received_initial_metadata);
     546     4441574 :   grpc_chttp2_incoming_metadata_buffer_destroy(
     547             :       &s->global.received_trailing_metadata);
     548     4441663 :   gpr_slice_buffer_destroy(&s->writing.flow_controlled_buffer);
     549             : 
     550     4441393 :   UNREF_TRANSPORT(exec_ctx, t, "stream");
     551             : 
     552             :   GPR_TIMER_END("destroy_stream", 0);
     553     4443185 : }
     554             : 
     555    11217555 : grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
     556             :     grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) {
     557    11216955 :   grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
     558    11217555 :   grpc_chttp2_stream *s =
     559    11217555 :       grpc_chttp2_stream_map_find(&t->parsing_stream_map, id);
     560    11232326 :   return s ? &s->parsing : NULL;
     561             : }
     562             : 
     563     2170874 : grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
     564             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
     565             :     gpr_uint32 id) {
     566             :   grpc_chttp2_stream *accepting;
     567     2170785 :   grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
     568     2170874 :   GPR_ASSERT(t->accepting_stream == NULL);
     569     2170874 :   t->accepting_stream = &accepting;
     570     4341748 :   t->channel_callback.accept_stream(exec_ctx,
     571             :                                     t->channel_callback.accept_stream_user_data,
     572     2170874 :                                     &t->base, (void *)(gpr_uintptr)id);
     573     2171119 :   t->accepting_stream = NULL;
     574     2171119 :   return &accepting->parsing;
     575             : }
     576             : 
     577             : /*
     578             :  * LOCK MANAGEMENT
     579             :  */
     580             : 
     581             : /* We take a grpc_chttp2_transport-global lock in response to calls coming in
     582             :    from above,
     583             :    and in response to data being received from below. New data to be written
     584             :    is always queued, as are callbacks to process data. During unlock() we
     585             :    check our todo lists and initiate callbacks and flush writes. */
     586             : 
     587    38116484 : static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
     588             : 
     589    38131591 : static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
     590             :   GPR_TIMER_BEGIN("unlock", 0);
     591    73062162 :   if (!t->writing_active && !t->closed &&
     592    34923355 :       grpc_chttp2_unlocking_check_writes(&t->global, &t->writing,
     593    34923355 :                                          t->parsing_active)) {
     594     4624202 :     t->writing_active = 1;
     595     4623596 :     REF_TRANSPORT(t, "writing");
     596     4625778 :     grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, 1);
     597     4625677 :     prevent_endpoint_shutdown(t);
     598             :   }
     599    38140412 :   check_read_ops(exec_ctx, &t->global);
     600             : 
     601    38154166 :   gpr_mu_unlock(&t->mu);
     602             :   GPR_TIMER_END("unlock", 0);
     603    38332652 : }
     604             : 
     605             : /*
     606             :  * OUTPUT PROCESSING
     607             :  */
     608             : 
     609       12625 : static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
     610             :                          gpr_uint32 value) {
     611       12461 :   const grpc_chttp2_setting_parameters *sp =
     612             :       &grpc_chttp2_settings_parameters[id];
     613       12625 :   gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
     614       12625 :   if (use_value != value) {
     615           0 :     gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
     616             :             value, use_value);
     617             :   }
     618       12626 :   if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) {
     619        6588 :     t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value;
     620        6588 :     t->global.dirtied_local_settings = 1;
     621             :   }
     622       12626 : }
     623             : 
     624     4623736 : void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
     625             :                                    void *transport_writing_ptr, int success) {
     626     4623130 :   grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
     627     4623736 :   grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
     628             : 
     629             :   GPR_TIMER_BEGIN("grpc_chttp2_terminate_writing", 0);
     630             : 
     631     4623130 :   lock(t);
     632             : 
     633     4625567 :   allow_endpoint_shutdown_locked(exec_ctx, t);
     634             : 
     635     4625836 :   if (!success) {
     636          81 :     drop_connection(exec_ctx, t);
     637             :   }
     638             : 
     639     4625836 :   grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
     640             : 
     641             :   /* leave the writing flag up on shutdown to prevent further writes in unlock()
     642             :      from starting */
     643     4622380 :   t->writing_active = 0;
     644     4622380 :   if (t->ep && !t->endpoint_reading) {
     645         109 :     destroy_endpoint(exec_ctx, t);
     646             :   }
     647             : 
     648     4622380 :   unlock(exec_ctx, t);
     649             : 
     650     4625443 :   UNREF_TRANSPORT(exec_ctx, t, "writing");
     651             : 
     652             :   GPR_TIMER_END("grpc_chttp2_terminate_writing", 0);
     653     4625849 : }
     654             : 
     655     4621553 : static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
     656             :                            int iomgr_success_ignored) {
     657     4620947 :   grpc_chttp2_transport *t = gt;
     658             :   GPR_TIMER_BEGIN("writing_action", 0);
     659     4621553 :   grpc_chttp2_perform_writes(exec_ctx, &t->writing, t->ep);
     660             :   GPR_TIMER_END("writing_action", 0);
     661     4623809 : }
     662             : 
     663         284 : void grpc_chttp2_add_incoming_goaway(
     664             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
     665             :     gpr_uint32 goaway_error, gpr_slice goaway_text) {
     666         284 :   char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
     667         284 :   gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg);
     668         284 :   gpr_free(msg);
     669         284 :   gpr_slice_unref(goaway_text);
     670         284 :   transport_global->seen_goaway = 1;
     671         284 :   connectivity_state_set(exec_ctx, transport_global, GRPC_CHANNEL_FATAL_FAILURE,
     672             :                          "got_goaway");
     673         284 : }
     674             : 
     675     6706037 : static void maybe_start_some_streams(
     676             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
     677             :   grpc_chttp2_stream_global *stream_global;
     678             :   gpr_uint32 stream_incoming_window;
     679             :   /* start streams where we have free grpc_chttp2_stream ids and free
     680             :    * concurrency */
     681    24657227 :   while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID &&
     682     8972716 :          transport_global->concurrent_stream_count <
     683             :              transport_global
     684             :                  ->settings[GRPC_PEER_SETTINGS]
     685    15783745 :                            [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
     686     6811002 :          grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
     687             :                                                       &stream_global)) {
     688             :     /* safe since we can't (legally) be parsing this stream yet */
     689     2271985 :     grpc_chttp2_stream_parsing *stream_parsing =
     690     2272074 :         &STREAM_FROM_GLOBAL(stream_global)->parsing;
     691     2272074 :     GRPC_CHTTP2_IF_TRACING(gpr_log(
     692             :         GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
     693             :         transport_global->is_client ? "CLI" : "SVR", stream_global,
     694             :         transport_global->next_stream_id));
     695             : 
     696     2272062 :     GPR_ASSERT(stream_global->id == 0);
     697     2272062 :     stream_global->id = stream_parsing->id = transport_global->next_stream_id;
     698     2272062 :     transport_global->next_stream_id += 2;
     699             : 
     700     2272062 :     if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
     701         180 :       connectivity_state_set(exec_ctx, transport_global,
     702             :                              GRPC_CHANNEL_TRANSIENT_FAILURE,
     703             :                              "no_more_stream_ids");
     704             :     }
     705             : 
     706     4544124 :     stream_global->outgoing_window =
     707             :         transport_global->settings[GRPC_PEER_SETTINGS]
     708     2272062 :                                   [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     709     2272062 :     stream_parsing->incoming_window = stream_incoming_window =
     710             :         transport_global->settings[GRPC_SENT_SETTINGS]
     711             :                                   [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     712     4544035 :     stream_global->max_recv_bytes =
     713     2272062 :         GPR_MAX(stream_incoming_window, stream_global->max_recv_bytes);
     714     6816008 :     grpc_chttp2_stream_map_add(
     715     2271973 :         &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
     716     4543946 :         stream_global->id, STREAM_FROM_GLOBAL(stream_global));
     717     2272084 :     stream_global->in_stream_map = 1;
     718     2272084 :     transport_global->concurrent_stream_count++;
     719     2272084 :     grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
     720             :   }
     721             :   /* cancel out streams that will never be started */
     722    13413256 :   while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
     723         360 :          grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
     724             :                                                       &stream_global)) {
     725           0 :     cancel_from_api(exec_ctx, transport_global, stream_global,
     726             :                     GRPC_STATUS_UNAVAILABLE);
     727             :   }
     728     6706736 : }
     729             : 
     730    21649044 : static grpc_closure *add_closure_barrier(grpc_closure *closure) {
     731    21649903 :   closure->final_data += 2;
     732    21649044 :   return closure;
     733             : }
     734             : 
     735    34452235 : void grpc_chttp2_complete_closure_step(grpc_exec_ctx *exec_ctx,
     736             :                                        grpc_closure **pclosure, int success) {
     737    34452235 :   grpc_closure *closure = *pclosure;
     738    34452235 :   if (closure == NULL) {
     739    35058459 :     return;
     740             :   }
     741    33853259 :   closure->final_data -= 2;
     742    33853259 :   if (!success) {
     743         134 :     closure->final_data |= 1;
     744             :   }
     745    33853259 :   if (closure->final_data < 2) {
     746    12343982 :     grpc_exec_ctx_enqueue(exec_ctx, closure, closure->final_data == 0);
     747             :   }
     748    33860681 :   *pclosure = NULL;
     749             : }
     750             : 
     751     8851082 : static int contains_non_ok_status(
     752             :     grpc_chttp2_transport_global *transport_global,
     753             :     grpc_metadata_batch *batch) {
     754             :   grpc_linked_mdelem *l;
     755    40992064 :   for (l = batch->list.head; l; l = l->next) {
     756    34811053 :     if (l->md->key == GRPC_MDSTR_GRPC_STATUS &&
     757     2170395 :         l->md != GRPC_MDELEM_GRPC_STATUS_0) {
     758      499653 :       return 1;
     759             :     }
     760             :   }
     761     8351429 :   return 0;
     762             : }
     763             : 
     764          48 : static void do_nothing(grpc_exec_ctx *exec_ctx, void *arg, int success) {}
     765             : 
     766    12363135 : static void perform_stream_op_locked(
     767             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
     768             :     grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) {
     769             :   grpc_closure *on_complete;
     770             : 
     771             :   GPR_TIMER_BEGIN("perform_stream_op_locked", 0);
     772             : 
     773    12363135 :   on_complete = op->on_complete;
     774    12363135 :   if (on_complete == NULL) {
     775          48 :     on_complete = grpc_closure_create(do_nothing, NULL);
     776             :   }
     777             :   /* use final_data as a barrier until enqueue time; the inital counter is
     778             :      dropped at the end of this function */
     779    12363135 :   on_complete->final_data = 2;
     780             : 
     781    12363135 :   if (op->cancel_with_status != GRPC_STATUS_OK) {
     782        1212 :     cancel_from_api(exec_ctx, transport_global, stream_global,
     783             :                     op->cancel_with_status);
     784             :   }
     785             : 
     786    12370009 :   if (op->close_with_status != GRPC_STATUS_OK) {
     787           5 :     close_from_api(exec_ctx, transport_global, stream_global,
     788             :                    op->close_with_status, op->optional_close_message);
     789             :   }
     790             : 
     791    12370009 :   if (op->send_initial_metadata != NULL) {
     792     4441751 :     GPR_ASSERT(stream_global->send_initial_metadata_finished == NULL);
     793     4442967 :     stream_global->send_initial_metadata_finished =
     794     4441751 :         add_closure_barrier(on_complete);
     795     4442967 :     stream_global->send_initial_metadata = op->send_initial_metadata;
     796     4442967 :     if (contains_non_ok_status(transport_global, op->send_initial_metadata)) {
     797           0 :       stream_global->seen_error = 1;
     798           0 :       grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
     799             :     }
     800     4442534 :     if (!stream_global->write_closed) {
     801     4442477 :       if (transport_global->is_client) {
     802     2272177 :         GPR_ASSERT(stream_global->id == 0);
     803     2272177 :         grpc_chttp2_list_add_waiting_for_concurrency(transport_global,
     804             :                                                      stream_global);
     805     2272157 :         maybe_start_some_streams(exec_ctx, transport_global);
     806             :       } else {
     807     2170300 :         GPR_ASSERT(stream_global->id != 0);
     808     2170300 :         grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
     809             :       }
     810             :     } else {
     811          57 :       grpc_chttp2_complete_closure_step(
     812             :           exec_ctx, &stream_global->send_initial_metadata_finished, 0);
     813             :     }
     814             :   }
     815             : 
     816    12367326 :   if (op->send_message != NULL) {
     817     3959322 :     GPR_ASSERT(stream_global->send_message_finished == NULL);
     818     3959322 :     GPR_ASSERT(stream_global->send_message == NULL);
     819     3959509 :     stream_global->send_message_finished = add_closure_barrier(on_complete);
     820     3957535 :     if (stream_global->write_closed) {
     821          32 :       grpc_chttp2_complete_closure_step(
     822             :           exec_ctx, &stream_global->send_message_finished, 0);
     823     3957503 :     } else if (stream_global->id != 0) {
     824     3960534 :       stream_global->send_message = op->send_message;
     825     3960534 :       grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
     826             :     }
     827             :   }
     828             : 
     829    12365119 :   if (op->send_trailing_metadata != NULL) {
     830     4439174 :     GPR_ASSERT(stream_global->send_trailing_metadata_finished == NULL);
     831     4438278 :     stream_global->send_trailing_metadata_finished =
     832     4439174 :         add_closure_barrier(on_complete);
     833     4438278 :     stream_global->send_trailing_metadata = op->send_trailing_metadata;
     834     4438278 :     if (contains_non_ok_status(transport_global, op->send_trailing_metadata)) {
     835      523309 :       stream_global->seen_error = 1;
     836      523309 :       grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
     837             :     }
     838     4438076 :     if (stream_global->write_closed) {
     839          85 :       grpc_chttp2_complete_closure_step(
     840             :           exec_ctx, &stream_global->send_trailing_metadata_finished,
     841             :           grpc_metadata_batch_is_empty(op->send_trailing_metadata));
     842     4437991 :     } else if (stream_global->id != 0) {
     843             :       /* TODO(ctiller): check if there's flow control for any outstanding
     844             :          bytes before going writable */
     845     4439188 :       grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
     846             :     }
     847             :   }
     848             : 
     849    12364360 :   if (op->recv_initial_metadata != NULL) {
     850     4442588 :     GPR_ASSERT(stream_global->recv_initial_metadata_finished == NULL);
     851     4442800 :     stream_global->recv_initial_metadata_finished =
     852     4442588 :         add_closure_barrier(on_complete);
     853     4442800 :     stream_global->recv_initial_metadata = op->recv_initial_metadata;
     854     4442800 :     grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
     855             :   }
     856             : 
     857    12364369 :   if (op->recv_message != NULL) {
     858     3959391 :     GPR_ASSERT(stream_global->recv_message_ready == NULL);
     859     3959391 :     stream_global->recv_message_ready = op->recv_message_ready;
     860     3959391 :     stream_global->recv_message = op->recv_message;
     861     3959391 :     grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
     862             :   }
     863             : 
     864    12365759 :   if (op->recv_trailing_metadata != NULL) {
     865     4441114 :     GPR_ASSERT(stream_global->recv_trailing_metadata_finished == NULL);
     866     4441055 :     stream_global->recv_trailing_metadata_finished =
     867     4441114 :         add_closure_barrier(on_complete);
     868     4441055 :     stream_global->recv_trailing_metadata = op->recv_trailing_metadata;
     869     4441055 :     grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
     870             :   }
     871             : 
     872    12363431 :   grpc_chttp2_complete_closure_step(exec_ctx, &on_complete, 1);
     873             : 
     874             :   GPR_TIMER_END("perform_stream_op_locked", 0);
     875    12333691 : }
     876             : 
     877    12332843 : static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
     878             :                               grpc_stream *gs, grpc_transport_stream_op *op) {
     879    12332082 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
     880    12332082 :   grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
     881             : 
     882    12332082 :   lock(t);
     883    12377947 :   perform_stream_op_locked(exec_ctx, &t->global, &s->global, op);
     884    12335482 :   unlock(exec_ctx, t);
     885    12380487 : }
     886             : 
     887           0 : static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
     888           0 :   grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
     889           0 :   p->next = &t->global.pings;
     890           0 :   p->prev = p->next->prev;
     891           0 :   p->prev->next = p->next->prev = p;
     892           0 :   p->id[0] = (gpr_uint8)((t->global.ping_counter >> 56) & 0xff);
     893           0 :   p->id[1] = (gpr_uint8)((t->global.ping_counter >> 48) & 0xff);
     894           0 :   p->id[2] = (gpr_uint8)((t->global.ping_counter >> 40) & 0xff);
     895           0 :   p->id[3] = (gpr_uint8)((t->global.ping_counter >> 32) & 0xff);
     896           0 :   p->id[4] = (gpr_uint8)((t->global.ping_counter >> 24) & 0xff);
     897           0 :   p->id[5] = (gpr_uint8)((t->global.ping_counter >> 16) & 0xff);
     898           0 :   p->id[6] = (gpr_uint8)((t->global.ping_counter >> 8) & 0xff);
     899           0 :   p->id[7] = (gpr_uint8)(t->global.ping_counter & 0xff);
     900           0 :   p->on_recv = on_recv;
     901           0 :   gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
     902           0 : }
     903             : 
     904       16909 : static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
     905             :                                  grpc_transport_op *op) {
     906       16663 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
     907       16663 :   int close_transport = 0;
     908             : 
     909       16663 :   lock(t);
     910             : 
     911       16910 :   grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
     912             : 
     913       16910 :   if (op->on_connectivity_state_change) {
     914        8459 :     grpc_connectivity_state_notify_on_state_change(
     915             :         exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
     916             :         op->on_connectivity_state_change);
     917             :   }
     918             : 
     919       16911 :   if (op->send_goaway) {
     920        2716 :     t->global.sent_goaway = 1;
     921        8148 :     grpc_chttp2_goaway_append(
     922             :         t->global.last_incoming_stream_id,
     923        2716 :         (gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
     924        2716 :         gpr_slice_ref(*op->goaway_message), &t->global.qbuf);
     925        2716 :     close_transport = !grpc_chttp2_has_streams(t);
     926             :   }
     927             : 
     928       16911 :   if (op->set_accept_stream != NULL) {
     929        3056 :     t->channel_callback.accept_stream = op->set_accept_stream;
     930        3056 :     t->channel_callback.accept_stream_user_data =
     931        3056 :         op->set_accept_stream_user_data;
     932             :   }
     933             : 
     934       16911 :   if (op->bind_pollset) {
     935        3110 :     add_to_pollset_locked(exec_ctx, t, op->bind_pollset);
     936             :   }
     937             : 
     938       16911 :   if (op->bind_pollset_set) {
     939        2306 :     add_to_pollset_set_locked(exec_ctx, t, op->bind_pollset_set);
     940             :   }
     941             : 
     942       16911 :   if (op->send_ping) {
     943           0 :     send_ping_locked(t, op->send_ping);
     944             :   }
     945             : 
     946       16911 :   if (op->disconnect) {
     947        2544 :     close_transport_locked(exec_ctx, t);
     948             :   }
     949             : 
     950       16911 :   unlock(exec_ctx, t);
     951             : 
     952       16911 :   if (close_transport) {
     953        2499 :     lock(t);
     954        2539 :     close_transport_locked(exec_ctx, t);
     955        2539 :     unlock(exec_ctx, t);
     956             :   }
     957       16911 : }
     958             : 
     959             : /*
     960             :  * INPUT PROCESSING
     961             :  */
     962             : 
     963    38132321 : static void check_read_ops(grpc_exec_ctx *exec_ctx,
     964             :                            grpc_chttp2_transport_global *transport_global) {
     965             :   grpc_chttp2_stream_global *stream_global;
     966             :   grpc_byte_stream *bs;
     967    94571488 :   while (
     968    56498255 :       grpc_chttp2_list_pop_check_read_ops(transport_global, &stream_global)) {
     969    28034058 :     if (stream_global->recv_initial_metadata_finished != NULL &&
     970     9713942 :         stream_global->published_initial_metadata) {
     971     8881173 :       grpc_chttp2_incoming_metadata_buffer_publish(
     972     4440499 :           &stream_global->received_initial_metadata,
     973     4440499 :           stream_global->recv_initial_metadata);
     974     4440077 :       grpc_chttp2_complete_closure_step(
     975     4440077 :           exec_ctx, &stream_global->recv_initial_metadata_finished, 1);
     976             :     }
     977    18364930 :     if (stream_global->recv_message_ready != NULL) {
     978     6590774 :       if (stream_global->incoming_frames.head != NULL) {
     979     7920329 :         *stream_global->recv_message = grpc_chttp2_incoming_frame_queue_pop(
     980     3960072 :             &stream_global->incoming_frames);
     981     3959804 :         GPR_ASSERT(*stream_global->recv_message != NULL);
     982     3959804 :         grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1);
     983     3959379 :         stream_global->recv_message_ready = NULL;
     984     2630517 :       } else if (stream_global->published_trailing_metadata) {
     985         834 :         *stream_global->recv_message = NULL;
     986         834 :         grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_message_ready, 1);
     987         834 :         stream_global->recv_message_ready = NULL;
     988             :       }
     989             :     }
     990    28582346 :     if (stream_global->recv_trailing_metadata_finished != NULL &&
     991    16818280 :         stream_global->read_closed && stream_global->write_closed) {
     992     9919204 :       while (stream_global->seen_error &&
     993     1047312 :              (bs = grpc_chttp2_incoming_frame_queue_pop(
     994     1047241 :                   &stream_global->incoming_frames)) != NULL) {
     995          31 :         grpc_byte_stream_destroy(bs);
     996             :       }
     997     4436279 :       if (stream_global->incoming_frames.head == NULL) {
     998     8872372 :         grpc_chttp2_incoming_metadata_buffer_publish(
     999     4436099 :             &stream_global->received_trailing_metadata,
    1000     4436099 :             stream_global->recv_trailing_metadata);
    1001     4437215 :         grpc_chttp2_complete_closure_step(
    1002     4437215 :             exec_ctx, &stream_global->recv_trailing_metadata_finished, 1);
    1003             :       }
    1004             :     }
    1005             :   }
    1006    38119051 : }
    1007             : 
    1008     4437040 : static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
    1009             :                           gpr_uint32 id) {
    1010             :   size_t new_stream_count;
    1011     4437040 :   grpc_chttp2_stream *s =
    1012     4437040 :       grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id);
    1013     4441408 :   if (!s) {
    1014         158 :     s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id);
    1015             :   }
    1016     4441408 :   grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
    1017     4441399 :   GPR_ASSERT(s);
    1018     4441399 :   s->global.in_stream_map = 0;
    1019     4441399 :   if (t->parsing.incoming_stream == &s->parsing) {
    1020          34 :     t->parsing.incoming_stream = NULL;
    1021          34 :     grpc_chttp2_parsing_become_skip_parser(exec_ctx, &t->parsing);
    1022             :   }
    1023     4441399 :   if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
    1024         177 :     close_transport_locked(exec_ctx, t);
    1025             :   }
    1026             : 
    1027     8881699 :   new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
    1028     4440056 :                      grpc_chttp2_stream_map_size(&t->new_stream_map);
    1029     4440491 :   GPR_ASSERT(new_stream_count <= GPR_UINT32_MAX);
    1030     4440491 :   if (new_stream_count != t->global.concurrent_stream_count) {
    1031     4439655 :     t->global.concurrent_stream_count = (gpr_uint32)new_stream_count;
    1032     4439655 :     maybe_start_some_streams(exec_ctx, &t->global);
    1033             :   }
    1034     4440176 : }
    1035             : 
    1036        1330 : static void cancel_from_api(grpc_exec_ctx *exec_ctx,
    1037             :                             grpc_chttp2_transport_global *transport_global,
    1038             :                             grpc_chttp2_stream_global *stream_global,
    1039             :                             grpc_status_code status) {
    1040        1330 :   if (stream_global->id != 0) {
    1041        1213 :     gpr_slice_buffer_add(
    1042             :         &transport_global->qbuf,
    1043             :         grpc_chttp2_rst_stream_create(
    1044             :             stream_global->id,
    1045        1213 :             (gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(status)));
    1046             :   }
    1047        1330 :   grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
    1048             :                           NULL);
    1049        1330 :   grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
    1050             :                                  1);
    1051        1330 : }
    1052             : 
    1053        1489 : void grpc_chttp2_fake_status(grpc_exec_ctx *exec_ctx,
    1054             :                              grpc_chttp2_transport_global *transport_global,
    1055             :                              grpc_chttp2_stream_global *stream_global,
    1056             :                              grpc_status_code status, gpr_slice *slice) {
    1057        1489 :   if (status != GRPC_STATUS_OK) {
    1058        1489 :     stream_global->seen_error = 1;
    1059        1489 :     grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
    1060             :   }
    1061             :   /* stream_global->recv_trailing_metadata_finished gives us a
    1062             :      last chance replacement: we've received trailing metadata,
    1063             :      but something more important has become available to signal
    1064             :      to the upper layers - drop what we've got, and then publish
    1065             :      what we want - which is safe because we haven't told anyone
    1066             :      about the metadata yet */
    1067        2209 :   if (!stream_global->published_trailing_metadata ||
    1068         720 :       stream_global->recv_trailing_metadata_finished != NULL) {
    1069             :     char status_string[GPR_LTOA_MIN_BUFSIZE];
    1070         912 :     gpr_ltoa(status, status_string);
    1071         912 :     grpc_chttp2_incoming_metadata_buffer_add(
    1072             :         &stream_global->received_trailing_metadata,
    1073             :         grpc_mdelem_from_metadata_strings(
    1074             :             GRPC_MDSTR_GRPC_STATUS, grpc_mdstr_from_string(status_string)));
    1075         912 :     if (slice) {
    1076         152 :       grpc_chttp2_incoming_metadata_buffer_add(
    1077             :           &stream_global->received_trailing_metadata,
    1078             :           grpc_mdelem_from_metadata_strings(
    1079             :               GRPC_MDSTR_GRPC_MESSAGE,
    1080             :               grpc_mdstr_from_slice(gpr_slice_ref(*slice))));
    1081             :     }
    1082         912 :     stream_global->published_trailing_metadata = 1;
    1083         912 :     grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
    1084             :   }
    1085        1489 :   if (slice) {
    1086         159 :     gpr_slice_unref(*slice);
    1087             :   }
    1088        1489 : }
    1089             : 
    1090     8867439 : void grpc_chttp2_mark_stream_closed(
    1091             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
    1092             :     grpc_chttp2_stream_global *stream_global, int close_reads,
    1093             :     int close_writes) {
    1094     8867439 :   if (stream_global->read_closed && stream_global->write_closed) {
    1095             :     /* already closed */
    1096     8878261 :     return;
    1097             :   }
    1098     8866538 :   grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
    1099     8872672 :   if (close_reads && !stream_global->read_closed) {
    1100     4440759 :     stream_global->read_closed = 1;
    1101     4440759 :     stream_global->published_initial_metadata = 1;
    1102     4440759 :     stream_global->published_trailing_metadata = 1;
    1103             :   }
    1104     8872672 :   if (close_writes && !stream_global->write_closed) {
    1105     4440462 :     stream_global->write_closed = 1;
    1106             :   }
    1107     8872672 :   if (stream_global->read_closed && stream_global->write_closed) {
    1108     8877494 :     if (stream_global->id != 0 &&
    1109     4438730 :         TRANSPORT_FROM_GLOBAL(transport_global)->parsing_active) {
    1110     2398165 :       grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
    1111             :                                                       stream_global);
    1112             :     } else {
    1113     2040599 :       if (stream_global->id != 0) {
    1114     2044075 :         remove_stream(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
    1115             :                       stream_global->id);
    1116             :       }
    1117     2040574 :       stream_global->finished_close = 1;
    1118     2040574 :       GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
    1119             :     }
    1120             :   }
    1121             : }
    1122             : 
    1123           5 : static void close_from_api(grpc_exec_ctx *exec_ctx,
    1124             :                            grpc_chttp2_transport_global *transport_global,
    1125             :                            grpc_chttp2_stream_global *stream_global,
    1126             :                            grpc_status_code status,
    1127             :                            gpr_slice *optional_message) {
    1128             :   gpr_slice hdr;
    1129             :   gpr_slice status_hdr;
    1130             :   gpr_slice message_pfx;
    1131             :   gpr_uint8 *p;
    1132           5 :   gpr_uint32 len = 0;
    1133             : 
    1134           5 :   GPR_ASSERT(status >= 0 && (int)status < 100);
    1135             : 
    1136           5 :   GPR_ASSERT(stream_global->id != 0);
    1137             : 
    1138             :   /* Hand roll a header block.
    1139             :      This is unnecessarily ugly - at some point we should find a more elegant
    1140             :      solution.
    1141             :      It's complicated by the fact that our send machinery would be dead by the
    1142             :      time we got around to sending this, so instead we ignore HPACK compression
    1143             :      and just write the uncompressed bytes onto the wire. */
    1144           5 :   status_hdr = gpr_slice_malloc(15 + (status >= 10));
    1145           5 :   p = GPR_SLICE_START_PTR(status_hdr);
    1146           5 :   *p++ = 0x40; /* literal header */
    1147           5 :   *p++ = 11;   /* len(grpc-status) */
    1148           5 :   *p++ = 'g';
    1149           5 :   *p++ = 'r';
    1150           5 :   *p++ = 'p';
    1151           5 :   *p++ = 'c';
    1152           5 :   *p++ = '-';
    1153           5 :   *p++ = 's';
    1154           5 :   *p++ = 't';
    1155           5 :   *p++ = 'a';
    1156           5 :   *p++ = 't';
    1157           5 :   *p++ = 'u';
    1158           5 :   *p++ = 's';
    1159           5 :   if (status < 10) {
    1160           0 :     *p++ = 1;
    1161           0 :     *p++ = (gpr_uint8)('0' + status);
    1162             :   } else {
    1163           5 :     *p++ = 2;
    1164           5 :     *p++ = (gpr_uint8)('0' + (status / 10));
    1165           5 :     *p++ = (gpr_uint8)('0' + (status % 10));
    1166             :   }
    1167           5 :   GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr));
    1168           5 :   len += (gpr_uint32)GPR_SLICE_LENGTH(status_hdr);
    1169             : 
    1170           5 :   if (optional_message) {
    1171           5 :     GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127);
    1172           5 :     message_pfx = gpr_slice_malloc(15);
    1173           5 :     p = GPR_SLICE_START_PTR(message_pfx);
    1174           5 :     *p++ = 0x40;
    1175           5 :     *p++ = 12; /* len(grpc-message) */
    1176           5 :     *p++ = 'g';
    1177           5 :     *p++ = 'r';
    1178           5 :     *p++ = 'p';
    1179           5 :     *p++ = 'c';
    1180           5 :     *p++ = '-';
    1181           5 :     *p++ = 'm';
    1182           5 :     *p++ = 'e';
    1183           5 :     *p++ = 's';
    1184           5 :     *p++ = 's';
    1185           5 :     *p++ = 'a';
    1186           5 :     *p++ = 'g';
    1187           5 :     *p++ = 'e';
    1188           5 :     *p++ = (gpr_uint8)GPR_SLICE_LENGTH(*optional_message);
    1189           5 :     GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx));
    1190           5 :     len += (gpr_uint32)GPR_SLICE_LENGTH(message_pfx);
    1191           5 :     len += (gpr_uint32)GPR_SLICE_LENGTH(*optional_message);
    1192             :   }
    1193             : 
    1194           5 :   hdr = gpr_slice_malloc(9);
    1195           5 :   p = GPR_SLICE_START_PTR(hdr);
    1196           5 :   *p++ = (gpr_uint8)(len >> 16);
    1197           5 :   *p++ = (gpr_uint8)(len >> 8);
    1198           5 :   *p++ = (gpr_uint8)(len);
    1199           5 :   *p++ = GRPC_CHTTP2_FRAME_HEADER;
    1200           5 :   *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
    1201           5 :   *p++ = (gpr_uint8)(stream_global->id >> 24);
    1202           5 :   *p++ = (gpr_uint8)(stream_global->id >> 16);
    1203           5 :   *p++ = (gpr_uint8)(stream_global->id >> 8);
    1204           5 :   *p++ = (gpr_uint8)(stream_global->id);
    1205           5 :   GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr));
    1206             : 
    1207           5 :   gpr_slice_buffer_add(&transport_global->qbuf, hdr);
    1208           5 :   gpr_slice_buffer_add(&transport_global->qbuf, status_hdr);
    1209           5 :   if (optional_message) {
    1210           5 :     gpr_slice_buffer_add(&transport_global->qbuf, message_pfx);
    1211           5 :     gpr_slice_buffer_add(&transport_global->qbuf,
    1212             :                          gpr_slice_ref(*optional_message));
    1213             :   }
    1214             : 
    1215           5 :   gpr_slice_buffer_add(
    1216             :       &transport_global->qbuf,
    1217             :       grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR));
    1218             : 
    1219           5 :   if (optional_message) {
    1220           5 :     gpr_slice_ref(*optional_message);
    1221             :   }
    1222           5 :   grpc_chttp2_fake_status(exec_ctx, transport_global, stream_global, status,
    1223             :                           optional_message);
    1224           5 :   grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global, 1,
    1225             :                                  1);
    1226           5 : }
    1227             : 
    1228         118 : static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
    1229             :                              void *user_data,
    1230             :                              grpc_chttp2_stream_global *stream_global) {
    1231         118 :   cancel_from_api(user_data, transport_global, stream_global,
    1232             :                   GRPC_STATUS_UNAVAILABLE);
    1233         118 : }
    1234             : 
    1235       12038 : static void end_all_the_calls(grpc_exec_ctx *exec_ctx,
    1236             :                               grpc_chttp2_transport *t) {
    1237       12142 :   grpc_chttp2_for_all_streams(&t->global, exec_ctx, cancel_stream_cb);
    1238       12038 : }
    1239             : 
    1240       12142 : static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
    1241       12142 :   close_transport_locked(exec_ctx, t);
    1242       12038 :   end_all_the_calls(exec_ctx, t);
    1243       12142 : }
    1244             : 
    1245             : /** update window from a settings change */
    1246           0 : static void update_global_window(void *args, gpr_uint32 id, void *stream) {
    1247           0 :   grpc_chttp2_transport *t = args;
    1248           0 :   grpc_chttp2_stream *s = stream;
    1249           0 :   grpc_chttp2_transport_global *transport_global = &t->global;
    1250           0 :   grpc_chttp2_stream_global *stream_global = &s->global;
    1251             :   int was_zero;
    1252             :   int is_zero;
    1253           0 :   gpr_int64 initial_window_update = t->parsing.initial_window_update;
    1254             : 
    1255           0 :   was_zero = stream_global->outgoing_window <= 0;
    1256           0 :   GRPC_CHTTP2_FLOW_CREDIT_STREAM("settings", transport_global, stream_global,
    1257             :                                  outgoing_window, initial_window_update);
    1258           0 :   is_zero = stream_global->outgoing_window <= 0;
    1259             : 
    1260           0 :   if (was_zero && !is_zero) {
    1261           0 :     grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
    1262             :   }
    1263           0 : }
    1264             : 
    1265        6038 : static void read_error_locked(grpc_exec_ctx *exec_ctx,
    1266             :                               grpc_chttp2_transport *t) {
    1267        6038 :   t->endpoint_reading = 0;
    1268        6038 :   if (!t->writing_active && t->ep) {
    1269        5929 :     destroy_endpoint(exec_ctx, t);
    1270             :   }
    1271        6038 : }
    1272             : 
    1273             : /* tcp read callback */
    1274     6521510 : static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) {
    1275             :   size_t i;
    1276     6520741 :   int keep_reading = 0;
    1277     6520741 :   grpc_chttp2_transport *t = tp;
    1278     6521510 :   grpc_chttp2_transport_global *transport_global = &t->global;
    1279     6521510 :   grpc_chttp2_transport_parsing *transport_parsing = &t->parsing;
    1280             :   grpc_chttp2_stream_global *stream_global;
    1281             : 
    1282             :   GPR_TIMER_BEGIN("recv_data", 0);
    1283             : 
    1284     6520741 :   lock(t);
    1285     6520796 :   i = 0;
    1286     6521565 :   GPR_ASSERT(!t->parsing_active);
    1287     6521565 :   if (!t->closed) {
    1288     6516056 :     t->parsing_active = 1;
    1289             :     /* merge stream lists */
    1290     6516056 :     grpc_chttp2_stream_map_move_into(&t->new_stream_map,
    1291             :                                      &t->parsing_stream_map);
    1292     6516035 :     grpc_chttp2_prepare_to_read(transport_global, transport_parsing);
    1293     6516026 :     gpr_mu_unlock(&t->mu);
    1294             :     GPR_TIMER_BEGIN("recv_data.parse", 0);
    1295    50051314 :     for (; i < t->read_buffer.count &&
    1296    18509685 :                grpc_chttp2_perform_read(exec_ctx, transport_parsing,
    1297    18509685 :                                         t->read_buffer.slices[i]);
    1298    18509574 :          i++)
    1299             :       ;
    1300             :     GPR_TIMER_END("recv_data.parse", 0);
    1301     6516000 :     gpr_mu_lock(&t->mu);
    1302             :     /* copy parsing qbuf to global qbuf */
    1303     6516050 :     gpr_slice_buffer_move_into(&t->parsing.qbuf, &t->global.qbuf);
    1304     6516050 :     if (i != t->read_buffer.count) {
    1305          59 :       unlock(exec_ctx, t);
    1306          59 :       lock(t);
    1307          59 :       drop_connection(exec_ctx, t);
    1308             :     }
    1309             :     /* merge stream lists */
    1310     6516050 :     grpc_chttp2_stream_map_move_into(&t->new_stream_map,
    1311             :                                      &t->parsing_stream_map);
    1312     6516049 :     transport_global->concurrent_stream_count =
    1313     6515995 :         (gpr_uint32)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
    1314     6516049 :     if (transport_parsing->initial_window_update != 0) {
    1315           1 :       grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
    1316             :                                       update_global_window, t);
    1317           1 :       transport_parsing->initial_window_update = 0;
    1318             :     }
    1319             :     /* handle higher level things */
    1320     6516049 :     grpc_chttp2_publish_reads(exec_ctx, transport_global, transport_parsing);
    1321     6515990 :     t->parsing_active = 0;
    1322             :     /* if a stream is in the stream map, and gets cancelled, we need to ensure
    1323             :      * we are not parsing before continuing the cancellation to keep things in
    1324             :      * a sane state */
    1325    15431024 :     while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
    1326             :                                                            &stream_global)) {
    1327     2398879 :       GPR_ASSERT(stream_global->in_stream_map);
    1328     2398878 :       GPR_ASSERT(stream_global->write_closed);
    1329     2398878 :       GPR_ASSERT(stream_global->read_closed);
    1330     2398878 :       remove_stream(exec_ctx, t, stream_global->id);
    1331     2398644 :       stream_global->finished_close = 1;
    1332     2398644 :       GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2");
    1333             :     }
    1334             :   }
    1335     6521384 :   if (!success || i != t->read_buffer.count || t->closed) {
    1336        5900 :     drop_connection(exec_ctx, t);
    1337        6038 :     read_error_locked(exec_ctx, t);
    1338     6515484 :   } else if (!t->closed) {
    1339     6514795 :     keep_reading = 1;
    1340     6514795 :     REF_TRANSPORT(t, "keep_reading");
    1341     6515519 :     prevent_endpoint_shutdown(t);
    1342             :   }
    1343     6521562 :   gpr_slice_buffer_reset_and_unref(&t->read_buffer);
    1344     6521564 :   unlock(exec_ctx, t);
    1345             : 
    1346     6521564 :   if (keep_reading) {
    1347     6515526 :     grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->recv_data);
    1348     6515525 :     allow_endpoint_shutdown_unlocked(exec_ctx, t);
    1349     6515528 :     UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
    1350             :   } else {
    1351        6038 :     UNREF_TRANSPORT(exec_ctx, t, "recv_data");
    1352             :   }
    1353             : 
    1354             :   GPR_TIMER_END("recv_data", 0);
    1355     6521566 : }
    1356             : 
    1357             : /*
    1358             :  * CALLBACK LOOP
    1359             :  */
    1360             : 
    1361        6502 : static void connectivity_state_set(
    1362             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
    1363             :     grpc_connectivity_state state, const char *reason) {
    1364        6502 :   GRPC_CHTTP2_IF_TRACING(
    1365             :       gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
    1366        6502 :   grpc_connectivity_state_set(exec_ctx, &TRANSPORT_FROM_GLOBAL(transport_global)
    1367             :                                              ->channel_callback.state_tracker,
    1368             :                               state, reason);
    1369        6502 : }
    1370             : 
    1371             : /*
    1372             :  * POLLSET STUFF
    1373             :  */
    1374             : 
    1375     4445898 : static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
    1376             :                                   grpc_chttp2_transport *t,
    1377             :                                   grpc_pollset *pollset) {
    1378     4446157 :   if (t->ep) {
    1379     4446304 :     grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
    1380             :   }
    1381     4445886 : }
    1382             : 
    1383        2306 : static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
    1384             :                                       grpc_chttp2_transport *t,
    1385             :                                       grpc_pollset_set *pollset_set) {
    1386        2347 :   if (t->ep) {
    1387        2347 :     grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
    1388             :   }
    1389        2306 : }
    1390             : 
    1391     4437763 : static void set_pollset(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
    1392             :                         grpc_stream *gs, grpc_pollset *pollset) {
    1393     4437586 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
    1394     4437586 :   lock(t);
    1395     4442921 :   add_to_pollset_locked(exec_ctx, t, pollset);
    1396     4443161 :   unlock(exec_ctx, t);
    1397     4443183 : }
    1398             : 
    1399             : /*
    1400             :  * BYTE STREAM
    1401             :  */
    1402             : 
    1403     5913747 : static void incoming_byte_stream_update_flow_control(
    1404             :     grpc_chttp2_transport_global *transport_global,
    1405             :     grpc_chttp2_stream_global *stream_global, size_t max_size_hint,
    1406             :     size_t have_already) {
    1407             :   gpr_uint32 max_recv_bytes;
    1408             : 
    1409             :   /* clamp max recv hint to an allowable size */
    1410     5913747 :   if (max_size_hint >= GPR_UINT32_MAX - transport_global->stream_lookahead) {
    1411           0 :     max_recv_bytes = GPR_UINT32_MAX - transport_global->stream_lookahead;
    1412             :   } else {
    1413     5913761 :     max_recv_bytes = (gpr_uint32)max_size_hint;
    1414             :   }
    1415             : 
    1416             :   /* account for bytes already received but unknown to higher layers */
    1417     5913747 :   if (max_recv_bytes >= have_already) {
    1418     5913747 :     max_recv_bytes -= (gpr_uint32)have_already;
    1419             :   } else {
    1420           0 :     max_recv_bytes = 0;
    1421             :   }
    1422             : 
    1423             :   /* add some small lookahead to keep pipelines flowing */
    1424     5913747 :   GPR_ASSERT(max_recv_bytes <=
    1425             :              GPR_UINT32_MAX - transport_global->stream_lookahead);
    1426     5913747 :   max_recv_bytes += transport_global->stream_lookahead;
    1427     5913747 :   if (stream_global->max_recv_bytes < max_recv_bytes) {
    1428     2409491 :     gpr_uint32 add_max_recv_bytes =
    1429     2409336 :         max_recv_bytes - stream_global->max_recv_bytes;
    1430     2409491 :     GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
    1431             :                                    max_recv_bytes, add_max_recv_bytes);
    1432     2409489 :     GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
    1433             :                                    unannounced_incoming_window_for_parse,
    1434             :                                    add_max_recv_bytes);
    1435     2409489 :     GRPC_CHTTP2_FLOW_CREDIT_STREAM("op", transport_global, stream_global,
    1436             :                                    unannounced_incoming_window_for_writing,
    1437             :                                    add_max_recv_bytes);
    1438     2409489 :     grpc_chttp2_list_add_unannounced_incoming_window_available(transport_global,
    1439             :                                                                stream_global);
    1440     2409356 :     grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
    1441             :   }
    1442     5913584 : }
    1443             : 
    1444     4166547 : static int incoming_byte_stream_next(grpc_exec_ctx *exec_ctx,
    1445             :                                      grpc_byte_stream *byte_stream,
    1446             :                                      gpr_slice *slice, size_t max_size_hint,
    1447             :                                      grpc_closure *on_complete) {
    1448     4166037 :   grpc_chttp2_incoming_byte_stream *bs =
    1449             :       (grpc_chttp2_incoming_byte_stream *)byte_stream;
    1450     4166547 :   grpc_chttp2_transport_global *transport_global = &bs->transport->global;
    1451     4166547 :   grpc_chttp2_stream_global *stream_global = &bs->stream->global;
    1452             : 
    1453     4166037 :   lock(bs->transport);
    1454     4167000 :   if (bs->is_tail) {
    1455     4155845 :     incoming_byte_stream_update_flow_control(transport_global, stream_global,
    1456             :                                              max_size_hint, bs->slices.length);
    1457             :   }
    1458     4167494 :   if (bs->slices.count > 0) {
    1459     3192020 :     *slice = gpr_slice_buffer_take_first(&bs->slices);
    1460     3192045 :     unlock(exec_ctx, bs->transport);
    1461     3192163 :     return 1;
    1462             :   } else {
    1463      975474 :     bs->on_next = on_complete;
    1464      975474 :     bs->next = slice;
    1465      975474 :     unlock(exec_ctx, bs->transport);
    1466      975474 :     return 0;
    1467             :   }
    1468             : }
    1469             : 
    1470     7903008 : static void incoming_byte_stream_unref(grpc_chttp2_incoming_byte_stream *bs) {
    1471     7903008 :   if (gpr_unref(&bs->refs)) {
    1472     3963645 :     gpr_slice_buffer_destroy(&bs->slices);
    1473     3963551 :     gpr_free(bs);
    1474             :   }
    1475     7926646 : }
    1476             : 
    1477     3957442 : static void incoming_byte_stream_destroy(grpc_byte_stream *byte_stream) {
    1478     3957442 :   incoming_byte_stream_unref((grpc_chttp2_incoming_byte_stream *)byte_stream);
    1479     3962875 : }
    1480             : 
    1481     6444833 : void grpc_chttp2_incoming_byte_stream_push(grpc_exec_ctx *exec_ctx,
    1482             :                                            grpc_chttp2_incoming_byte_stream *bs,
    1483             :                                            gpr_slice slice) {
    1484     6444833 :   gpr_mu_lock(&bs->transport->mu);
    1485     6444807 :   if (bs->on_next != NULL) {
    1486      975474 :     *bs->next = slice;
    1487      975474 :     grpc_exec_ctx_enqueue(exec_ctx, bs->on_next, 1);
    1488      975474 :     bs->on_next = NULL;
    1489             :   } else {
    1490     5469333 :     gpr_slice_buffer_add(&bs->slices, slice);
    1491             :   }
    1492     6444828 :   gpr_mu_unlock(&bs->transport->mu);
    1493     6444824 : }
    1494             : 
    1495     3960618 : void grpc_chttp2_incoming_byte_stream_finished(
    1496             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_incoming_byte_stream *bs) {
    1497     3960618 :   incoming_byte_stream_unref(bs);
    1498     3962349 : }
    1499             : 
    1500     3963172 : grpc_chttp2_incoming_byte_stream *grpc_chttp2_incoming_byte_stream_create(
    1501             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing,
    1502             :     grpc_chttp2_stream_parsing *stream_parsing, gpr_uint32 frame_size,
    1503             :     gpr_uint32 flags, grpc_chttp2_incoming_frame_queue *add_to_queue) {
    1504     3963172 :   grpc_chttp2_incoming_byte_stream *incoming_byte_stream =
    1505             :       gpr_malloc(sizeof(*incoming_byte_stream));
    1506     3963411 :   incoming_byte_stream->base.length = frame_size;
    1507     3963411 :   incoming_byte_stream->base.flags = flags;
    1508     3963411 :   incoming_byte_stream->base.next = incoming_byte_stream_next;
    1509     3963411 :   incoming_byte_stream->base.destroy = incoming_byte_stream_destroy;
    1510     3963411 :   gpr_ref_init(&incoming_byte_stream->refs, 2);
    1511     3963135 :   incoming_byte_stream->next_message = NULL;
    1512     3963135 :   incoming_byte_stream->transport = TRANSPORT_FROM_PARSING(transport_parsing);
    1513     3963135 :   incoming_byte_stream->stream = STREAM_FROM_PARSING(stream_parsing);
    1514     3963135 :   gpr_slice_buffer_init(&incoming_byte_stream->slices);
    1515     3961816 :   incoming_byte_stream->on_next = NULL;
    1516     3961816 :   incoming_byte_stream->is_tail = 1;
    1517     3961816 :   if (add_to_queue->head == NULL) {
    1518     3955207 :     add_to_queue->head = incoming_byte_stream;
    1519             :   } else {
    1520        6609 :     add_to_queue->tail->is_tail = 0;
    1521        6609 :     add_to_queue->tail->next_message = incoming_byte_stream;
    1522             :   }
    1523     3961816 :   add_to_queue->tail = incoming_byte_stream;
    1524     3961816 :   if (frame_size == 0) {
    1525     1758016 :     lock(TRANSPORT_FROM_PARSING(transport_parsing));
    1526     3517843 :     incoming_byte_stream_update_flow_control(
    1527     1758890 :         &TRANSPORT_FROM_PARSING(transport_parsing)->global,
    1528     1758890 :         &STREAM_FROM_PARSING(stream_parsing)->global, 0, 0);
    1529     1758896 :     unlock(exec_ctx, TRANSPORT_FROM_PARSING(transport_parsing));
    1530             :   }
    1531     3960931 :   return incoming_byte_stream;
    1532             : }
    1533             : 
    1534             : /*
    1535             :  * TRACING
    1536             :  */
    1537             : 
    1538       25440 : static char *format_flowctl_context_var(const char *context, const char *var,
    1539             :                                         gpr_int64 val, gpr_uint32 id,
    1540             :                                         char **scope) {
    1541             :   char *underscore_pos;
    1542             :   char *result;
    1543       25440 :   if (context == NULL) {
    1544        4442 :     *scope = NULL;
    1545        4442 :     gpr_asprintf(&result, "%s(%lld)", var, val);
    1546        4442 :     return result;
    1547             :   }
    1548       20998 :   underscore_pos = strchr(context, '_');
    1549       20998 :   *scope = gpr_strdup(context);
    1550       20998 :   (*scope)[underscore_pos - context] = 0;
    1551       20998 :   if (id != 0) {
    1552        5608 :     char *tmp = *scope;
    1553        5608 :     gpr_asprintf(scope, "%s[%d]", tmp, id);
    1554        5608 :     gpr_free(tmp);
    1555             :   }
    1556       20998 :   gpr_asprintf(&result, "%s.%s(%lld)", underscore_pos + 1, var, val);
    1557       20998 :   return result;
    1558             : }
    1559             : 
    1560        8278 : static int samestr(char *a, char *b) {
    1561        8278 :   if (a == NULL) {
    1562           0 :     return b == NULL;
    1563             :   }
    1564        8278 :   if (b == NULL) {
    1565           0 :     return 0;
    1566             :   }
    1567        8278 :   return 0 == strcmp(a, b);
    1568             : }
    1569             : 
    1570       12720 : void grpc_chttp2_flowctl_trace(const char *file, int line, const char *phase,
    1571             :                                grpc_chttp2_flowctl_op op, const char *context1,
    1572             :                                const char *var1, const char *context2,
    1573             :                                const char *var2, int is_client,
    1574             :                                gpr_uint32 stream_id, gpr_int64 val1,
    1575             :                                gpr_int64 val2) {
    1576             :   char *scope1;
    1577             :   char *scope2;
    1578       12720 :   char *label1 =
    1579             :       format_flowctl_context_var(context1, var1, val1, stream_id, &scope1);
    1580       12720 :   char *label2 =
    1581             :       format_flowctl_context_var(context2, var2, val2, stream_id, &scope2);
    1582       12720 :   char *clisvr = is_client ? "client" : "server";
    1583             :   char *prefix;
    1584             : 
    1585       12720 :   gpr_asprintf(&prefix, "FLOW % 8s: %s % 11s ", phase, clisvr, scope1);
    1586             : 
    1587       12720 :   switch (op) {
    1588             :     case GRPC_CHTTP2_FLOWCTL_MOVE:
    1589        8278 :       GPR_ASSERT(samestr(scope1, scope2));
    1590        8278 :       if (val2 != 0) {
    1591        1070 :         gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
    1592             :                 "%sMOVE   % 40s <- % 40s giving %d", prefix, label1, label2,
    1593             :                 val1 + val2);
    1594             :       }
    1595        8278 :       break;
    1596             :     case GRPC_CHTTP2_FLOWCTL_CREDIT:
    1597        1762 :       GPR_ASSERT(val2 >= 0);
    1598        1762 :       if (val2 != 0) {
    1599        1762 :         gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
    1600             :                 "%sCREDIT % 40s by % 40s giving %d", prefix, label1, label2,
    1601             :                 val1 + val2);
    1602             :       }
    1603        1762 :       break;
    1604             :     case GRPC_CHTTP2_FLOWCTL_DEBIT:
    1605        2680 :       GPR_ASSERT(val2 >= 0);
    1606        2680 :       if (val2 != 0) {
    1607        2488 :         gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
    1608             :                 "%sDEBIT  % 40s by % 40s giving %d", prefix, label1, label2,
    1609             :                 val1 - val2);
    1610             :       }
    1611        2680 :       break;
    1612             :   }
    1613             : 
    1614       12720 :   gpr_free(scope1);
    1615       12720 :   gpr_free(scope2);
    1616       12720 :   gpr_free(label1);
    1617       12720 :   gpr_free(label2);
    1618       12720 :   gpr_free(prefix);
    1619       12720 : }
    1620             : 
    1621             : /*
    1622             :  * INTEGRATION GLUE
    1623             :  */
    1624             : 
    1625        1245 : static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
    1626        1245 :   return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string);
    1627             : }
    1628             : 
    1629             : static const grpc_transport_vtable vtable = {
    1630             :     sizeof(grpc_chttp2_stream), init_stream, set_pollset, perform_stream_op,
    1631             :     perform_transport_op, destroy_stream, destroy_transport, chttp2_get_peer};
    1632             : 
    1633        6039 : grpc_transport *grpc_create_chttp2_transport(
    1634             :     grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
    1635             :     grpc_endpoint *ep, int is_client) {
    1636        6039 :   grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport));
    1637        6039 :   init_transport(exec_ctx, t, channel_args, ep, is_client != 0);
    1638        6036 :   return &t->base;
    1639             : }
    1640             : 
    1641        6039 : void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
    1642             :                                          grpc_transport *transport,
    1643             :                                          gpr_slice *slices, size_t nslices) {
    1644        5957 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
    1645        5957 :   REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
    1646        6039 :   gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
    1647        6039 :   recv_data(exec_ctx, t, 1);
    1648        6039 : }

Generated by: LCOV version 1.11