LCOV - code coverage report
Current view: top level - src/core/transport - chttp2_transport.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 603 649 92.9 %
Date: 2015-10-10 Functions: 41 43 95.3 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/transport/chttp2_transport.h"
      35             : 
      36             : #include <math.h>
      37             : #include <stdio.h>
      38             : #include <string.h>
      39             : 
      40             : #include <grpc/support/alloc.h>
      41             : #include <grpc/support/log.h>
      42             : #include <grpc/support/slice_buffer.h>
      43             : #include <grpc/support/string_util.h>
      44             : #include <grpc/support/useful.h>
      45             : 
      46             : #include "src/core/profiling/timers.h"
      47             : #include "src/core/support/string.h"
      48             : #include "src/core/transport/chttp2/http2_errors.h"
      49             : #include "src/core/transport/chttp2/internal.h"
      50             : #include "src/core/transport/chttp2/status_conversion.h"
      51             : #include "src/core/transport/chttp2/timeout_encoding.h"
      52             : #include "src/core/transport/transport_impl.h"
      53             : 
      54             : #define DEFAULT_WINDOW 65535
      55             : #define DEFAULT_CONNECTION_WINDOW_TARGET (1024 * 1024)
      56             : #define MAX_WINDOW 0x7fffffffu
      57             : 
      58             : #define MAX_CLIENT_STREAM_ID 0x7fffffffu
      59             : 
      60             : int grpc_http_trace = 0;
      61             : int grpc_flowctl_trace = 0;
      62             : 
      63             : #define TRANSPORT_FROM_WRITING(tw)                                        \
      64             :   ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
      65             :                                                    writing)))
      66             : 
      67             : #define TRANSPORT_FROM_PARSING(tw)                                        \
      68             :   ((grpc_chttp2_transport *)((char *)(tw)-offsetof(grpc_chttp2_transport, \
      69             :                                                    parsing)))
      70             : 
      71             : #define TRANSPORT_FROM_GLOBAL(tg)                                         \
      72             :   ((grpc_chttp2_transport *)((char *)(tg)-offsetof(grpc_chttp2_transport, \
      73             :                                                    global)))
      74             : 
      75             : #define STREAM_FROM_GLOBAL(sg) \
      76             :   ((grpc_chttp2_stream *)((char *)(sg)-offsetof(grpc_chttp2_stream, global)))
      77             : 
      78             : static const grpc_transport_vtable vtable;
      79             : 
      80             : static void lock(grpc_chttp2_transport *t);
      81             : static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
      82             : 
      83             : static void unlock_check_read_write_state(grpc_exec_ctx *exec_ctx,
      84             :                                           grpc_chttp2_transport *t);
      85             : 
      86             : /* forward declarations of various callbacks that we'll build closures around */
      87             : static void writing_action(grpc_exec_ctx *exec_ctx, void *t,
      88             :                            int iomgr_success_ignored);
      89             : 
      90             : /** Set a transport level setting, and push it to our peer */
      91             : static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
      92             :                          gpr_uint32 value);
      93             : 
      94             : /** Endpoint callback to process incoming data */
      95             : static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success);
      96             : 
      97             : /** Start disconnection chain */
      98             : static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t);
      99             : 
     100             : /** Perform a transport_op */
     101             : static void perform_stream_op_locked(
     102             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
     103             :     grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op);
     104             : 
     105             : /** Cancel a stream: coming from the transport API */
     106             : static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
     107             :                             grpc_chttp2_stream_global *stream_global,
     108             :                             grpc_status_code status);
     109             : 
     110             : static void close_from_api(grpc_chttp2_transport_global *transport_global,
     111             :                            grpc_chttp2_stream_global *stream_global,
     112             :                            grpc_status_code status,
     113             :                            gpr_slice *optional_message);
     114             : 
     115             : /** Add endpoint from this transport to pollset */
     116             : static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
     117             :                                   grpc_chttp2_transport *t,
     118             :                                   grpc_pollset *pollset);
     119             : static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
     120             :                                       grpc_chttp2_transport *t,
     121             :                                       grpc_pollset_set *pollset_set);
     122             : 
     123             : /** Start new streams that have been created if we can */
     124             : static void maybe_start_some_streams(
     125             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global);
     126             : 
     127             : static void connectivity_state_set(
     128             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
     129             :     grpc_connectivity_state state, const char *reason);
     130             : 
     131             : /*
     132             :  * CONSTRUCTION/DESTRUCTION/REFCOUNTING
     133             :  */
     134             : 
     135        4014 : static void destruct_transport(grpc_exec_ctx *exec_ctx,
     136             :                                grpc_chttp2_transport *t) {
     137             :   size_t i;
     138             : 
     139        4014 :   gpr_mu_lock(&t->mu);
     140             : 
     141        4014 :   GPR_ASSERT(t->ep == NULL);
     142             : 
     143        4014 :   gpr_slice_buffer_destroy(&t->global.qbuf);
     144             : 
     145        4014 :   gpr_slice_buffer_destroy(&t->writing.outbuf);
     146        4014 :   grpc_chttp2_hpack_compressor_destroy(&t->writing.hpack_compressor);
     147             : 
     148        4014 :   gpr_slice_buffer_destroy(&t->parsing.qbuf);
     149        4014 :   gpr_slice_buffer_destroy(&t->read_buffer);
     150        4014 :   grpc_chttp2_hpack_parser_destroy(&t->parsing.hpack_parser);
     151        4014 :   grpc_chttp2_goaway_parser_destroy(&t->parsing.goaway_parser);
     152             : 
     153        4014 :   GRPC_MDSTR_UNREF(t->parsing.str_grpc_timeout);
     154             : 
     155       44154 :   for (i = 0; i < STREAM_LIST_COUNT; i++) {
     156       40140 :     GPR_ASSERT(t->lists[i].head == NULL);
     157       40140 :     GPR_ASSERT(t->lists[i].tail == NULL);
     158             :   }
     159             : 
     160        4014 :   GPR_ASSERT(grpc_chttp2_stream_map_size(&t->parsing_stream_map) == 0);
     161        4014 :   GPR_ASSERT(grpc_chttp2_stream_map_size(&t->new_stream_map) == 0);
     162             : 
     163        4014 :   grpc_chttp2_stream_map_destroy(&t->parsing_stream_map);
     164        4014 :   grpc_chttp2_stream_map_destroy(&t->new_stream_map);
     165        4014 :   grpc_connectivity_state_destroy(exec_ctx, &t->channel_callback.state_tracker);
     166             : 
     167        4014 :   gpr_mu_unlock(&t->mu);
     168        4014 :   gpr_mu_destroy(&t->mu);
     169             : 
     170             :   /* callback remaining pings: they're not allowed to call into the transpot,
     171             :      and maybe they hold resources that need to be freed */
     172        8028 :   while (t->global.pings.next != &t->global.pings) {
     173           0 :     grpc_chttp2_outstanding_ping *ping = t->global.pings.next;
     174           0 :     grpc_exec_ctx_enqueue(exec_ctx, ping->on_recv, 0);
     175           0 :     ping->next->prev = ping->prev;
     176           0 :     ping->prev->next = ping->next;
     177           0 :     gpr_free(ping);
     178             :   }
     179             : 
     180        4014 :   grpc_mdctx_unref(t->metadata_context);
     181             : 
     182        4014 :   gpr_free(t->peer_string);
     183        4014 :   gpr_free(t);
     184        4014 : }
     185             : 
     186             : #ifdef REFCOUNTING_DEBUG
     187             : #define REF_TRANSPORT(t, r) ref_transport(t, r, __FILE__, __LINE__)
     188             : #define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t, r, __FILE__, __LINE__)
     189             : static void unref_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
     190             :                             const char *reason, const char *file, int line) {
     191             :   gpr_log(GPR_DEBUG, "chttp2:unref:%p %d->%d %s [%s:%d]", t, t->refs.count,
     192             :           t->refs.count - 1, reason, file, line);
     193             :   if (!gpr_unref(&t->refs)) return;
     194             :   destruct_transport(exec_ctx, t);
     195             : }
     196             : 
     197             : static void ref_transport(grpc_chttp2_transport *t, const char *reason,
     198             :                           const char *file, int line) {
     199             :   gpr_log(GPR_DEBUG, "chttp2:  ref:%p %d->%d %s [%s:%d]", t, t->refs.count,
     200             :           t->refs.count + 1, reason, file, line);
     201             :   gpr_ref(&t->refs);
     202             : }
     203             : #else
     204             : #define REF_TRANSPORT(t, r) ref_transport(t)
     205             : #define UNREF_TRANSPORT(cl, t, r) unref_transport(cl, t)
     206     8615629 : static void unref_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
     207    17239906 :   if (!gpr_unref(&t->refs)) return;
     208        4014 :   destruct_transport(exec_ctx, t);
     209             : }
     210             : 
     211     8609310 : static void ref_transport(grpc_chttp2_transport *t) { gpr_ref(&t->refs); }
     212             : #endif
     213             : 
     214        4014 : static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
     215             :                            const grpc_channel_args *channel_args,
     216             :                            grpc_endpoint *ep, grpc_mdctx *mdctx,
     217             :                            gpr_uint8 is_client) {
     218             :   size_t i;
     219             :   int j;
     220             : 
     221             :   GPR_ASSERT(strlen(GRPC_CHTTP2_CLIENT_CONNECT_STRING) ==
     222             :              GRPC_CHTTP2_CLIENT_CONNECT_STRLEN);
     223             : 
     224        4014 :   memset(t, 0, sizeof(*t));
     225             : 
     226        4014 :   t->base.vtable = &vtable;
     227        4014 :   t->ep = ep;
     228             :   /* one ref is for destroy, the other for when ep becomes NULL */
     229        4014 :   gpr_ref_init(&t->refs, 2);
     230             :   /* ref is dropped at transport close() */
     231        4014 :   gpr_ref_init(&t->shutdown_ep_refs, 1);
     232        4014 :   gpr_mu_init(&t->mu);
     233        4014 :   grpc_mdctx_ref(mdctx);
     234        4014 :   t->peer_string = grpc_endpoint_get_peer(ep);
     235        4014 :   t->metadata_context = mdctx;
     236        4014 :   t->endpoint_reading = 1;
     237        4014 :   t->global.next_stream_id = is_client ? 1 : 2;
     238        4014 :   t->global.is_client = is_client;
     239        4014 :   t->global.outgoing_window = DEFAULT_WINDOW;
     240        4014 :   t->global.incoming_window = DEFAULT_WINDOW;
     241        4014 :   t->global.connection_window_target = DEFAULT_CONNECTION_WINDOW_TARGET;
     242        4014 :   t->global.ping_counter = 1;
     243        4014 :   t->global.pings.next = t->global.pings.prev = &t->global.pings;
     244        4014 :   t->parsing.is_client = is_client;
     245        4014 :   t->parsing.str_grpc_timeout =
     246        4014 :       grpc_mdstr_from_string(t->metadata_context, "grpc-timeout");
     247        4014 :   t->parsing.deframe_state =
     248             :       is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
     249        4014 :   t->writing.is_client = is_client;
     250        4014 :   grpc_connectivity_state_init(
     251             :       &t->channel_callback.state_tracker, GRPC_CHANNEL_READY,
     252             :       is_client ? "client_transport" : "server_transport");
     253             : 
     254        4014 :   gpr_slice_buffer_init(&t->global.qbuf);
     255             : 
     256        4014 :   gpr_slice_buffer_init(&t->writing.outbuf);
     257        4014 :   grpc_chttp2_hpack_compressor_init(&t->writing.hpack_compressor, mdctx);
     258        4014 :   grpc_closure_init(&t->writing_action, writing_action, t);
     259             : 
     260        4014 :   gpr_slice_buffer_init(&t->parsing.qbuf);
     261        4014 :   grpc_chttp2_goaway_parser_init(&t->parsing.goaway_parser);
     262        4014 :   grpc_chttp2_hpack_parser_init(&t->parsing.hpack_parser, t->metadata_context);
     263             : 
     264        4014 :   grpc_closure_init(&t->writing.done_cb, grpc_chttp2_terminate_writing,
     265        4014 :                     &t->writing);
     266        4014 :   grpc_closure_init(&t->recv_data, recv_data, t);
     267        4014 :   gpr_slice_buffer_init(&t->read_buffer);
     268             : 
     269        4014 :   if (is_client) {
     270        1992 :     gpr_slice_buffer_add(
     271             :         &t->global.qbuf,
     272             :         gpr_slice_from_copied_string(GRPC_CHTTP2_CLIENT_CONNECT_STRING));
     273             :   }
     274             :   /* 8 is a random stab in the dark as to a good initial size: it's small enough
     275             :      that it shouldn't waste memory for infrequently used connections, yet
     276             :      large enough that the exponential growth should happen nicely when it's
     277             :      needed.
     278             :      TODO(ctiller): tune this */
     279        4014 :   grpc_chttp2_stream_map_init(&t->parsing_stream_map, 8);
     280        4014 :   grpc_chttp2_stream_map_init(&t->new_stream_map, 8);
     281             : 
     282             :   /* copy in initial settings to all setting sets */
     283       32103 :   for (i = 0; i < GRPC_CHTTP2_NUM_SETTINGS; i++) {
     284       28089 :     t->parsing.settings[i] = grpc_chttp2_settings_parameters[i].default_value;
     285      140447 :     for (j = 0; j < GRPC_NUM_SETTING_SETS; j++) {
     286      112358 :       t->global.settings[j][i] =
     287      112358 :           grpc_chttp2_settings_parameters[i].default_value;
     288             :     }
     289             :   }
     290        4014 :   t->global.dirtied_local_settings = 1;
     291             :   /* Hack: it's common for implementations to assume 65536 bytes initial send
     292             :      window -- this should by rights be 0 */
     293        4014 :   t->global.force_send_settings = 1 << GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE;
     294        4014 :   t->global.sent_local_settings = 0;
     295             : 
     296             :   /* configure http2 the way we like it */
     297        4014 :   if (is_client) {
     298        1992 :     push_setting(t, GRPC_CHTTP2_SETTINGS_ENABLE_PUSH, 0);
     299        1992 :     push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS, 0);
     300             :   }
     301        4014 :   push_setting(t, GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, DEFAULT_WINDOW);
     302             : 
     303        4014 :   if (channel_args) {
     304        5642 :     for (i = 0; i < channel_args->num_args; i++) {
     305        2458 :       if (0 ==
     306        2458 :           strcmp(channel_args->args[i].key, GRPC_ARG_MAX_CONCURRENT_STREAMS)) {
     307          20 :         if (is_client) {
     308           0 :           gpr_log(GPR_ERROR, "%s: is ignored on the client",
     309             :                   GRPC_ARG_MAX_CONCURRENT_STREAMS);
     310          20 :         } else if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
     311           0 :           gpr_log(GPR_ERROR, "%s: must be an integer",
     312             :                   GRPC_ARG_MAX_CONCURRENT_STREAMS);
     313             :         } else {
     314          20 :           push_setting(t, GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS,
     315          20 :                        (gpr_uint32)channel_args->args[i].value.integer);
     316             :         }
     317        2438 :       } else if (0 == strcmp(channel_args->args[i].key,
     318             :                              GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER)) {
     319         197 :         if (channel_args->args[i].type != GRPC_ARG_INTEGER) {
     320           0 :           gpr_log(GPR_ERROR, "%s: must be an integer",
     321             :                   GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER);
     322         394 :         } else if ((t->global.next_stream_id & 1) !=
     323         197 :                    (channel_args->args[i].value.integer & 1)) {
     324           0 :           gpr_log(GPR_ERROR, "%s: low bit must be %d on %s",
     325             :                   GRPC_ARG_HTTP2_INITIAL_SEQUENCE_NUMBER,
     326           0 :                   t->global.next_stream_id & 1,
     327             :                   is_client ? "client" : "server");
     328             :         } else {
     329         197 :           t->global.next_stream_id =
     330         197 :               (gpr_uint32)channel_args->args[i].value.integer;
     331             :         }
     332             :       }
     333             :     }
     334             :   }
     335        4014 : }
     336             : 
     337        4014 : static void destroy_transport(grpc_exec_ctx *exec_ctx, grpc_transport *gt) {
     338        4014 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
     339             : 
     340        4014 :   lock(t);
     341        4014 :   t->destroying = 1;
     342        4014 :   drop_connection(exec_ctx, t);
     343        4014 :   unlock(exec_ctx, t);
     344             : 
     345        4014 :   UNREF_TRANSPORT(exec_ctx, t, "destroy");
     346        4014 : }
     347             : 
     348             : /** block grpc_endpoint_shutdown being called until a paired
     349             :     allow_endpoint_shutdown is made */
     350     5909561 : static void prevent_endpoint_shutdown(grpc_chttp2_transport *t) {
     351     5909561 :   GPR_ASSERT(t->ep);
     352     5909561 :   gpr_ref(&t->shutdown_ep_refs);
     353     5910538 : }
     354             : 
     355     3449387 : static void allow_endpoint_shutdown_locked(grpc_exec_ctx *exec_ctx,
     356             :                                            grpc_chttp2_transport *t) {
     357     3449387 :   if (gpr_unref(&t->shutdown_ep_refs)) {
     358        4013 :     if (t->ep) {
     359        4013 :       grpc_endpoint_shutdown(exec_ctx, t->ep);
     360             :     }
     361             :   }
     362     3449642 : }
     363             : 
     364     2465619 : static void allow_endpoint_shutdown_unlocked(grpc_exec_ctx *exec_ctx,
     365             :                                              grpc_chttp2_transport *t) {
     366     2465619 :   if (gpr_unref(&t->shutdown_ep_refs)) {
     367           1 :     gpr_mu_lock(&t->mu);
     368           1 :     if (t->ep) {
     369           1 :       grpc_endpoint_shutdown(exec_ctx, t->ep);
     370             :     }
     371           1 :     gpr_mu_unlock(&t->mu);
     372             :   }
     373     2465605 : }
     374             : 
     375        4014 : static void destroy_endpoint(grpc_exec_ctx *exec_ctx,
     376             :                              grpc_chttp2_transport *t) {
     377        4014 :   grpc_endpoint_destroy(exec_ctx, t->ep);
     378        4014 :   t->ep = NULL;
     379             :   /* safe because we'll still have the ref for write */
     380        4014 :   UNREF_TRANSPORT(exec_ctx, t, "disconnect");
     381        4014 : }
     382             : 
     383       11711 : static void close_transport_locked(grpc_exec_ctx *exec_ctx,
     384             :                                    grpc_chttp2_transport *t) {
     385       11711 :   if (!t->closed) {
     386        4014 :     t->closed = 1;
     387        4014 :     connectivity_state_set(exec_ctx, &t->global, GRPC_CHANNEL_FATAL_FAILURE,
     388             :                            "close_transport");
     389        4014 :     if (t->ep) {
     390        4014 :       allow_endpoint_shutdown_locked(exec_ctx, t);
     391             :     }
     392             :   }
     393       11711 : }
     394             : 
     395     2700138 : static int init_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
     396             :                        grpc_stream *gs, const void *server_data,
     397             :                        grpc_transport_stream_op *initial_op) {
     398     2700138 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
     399     2700138 :   grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
     400             : 
     401     2700138 :   memset(s, 0, sizeof(*s));
     402             : 
     403     2700138 :   grpc_chttp2_incoming_metadata_buffer_init(&s->parsing.incoming_metadata);
     404     2704773 :   grpc_chttp2_incoming_metadata_buffer_init(&s->global.incoming_metadata);
     405     2704631 :   grpc_sopb_init(&s->writing.sopb);
     406     2704601 :   grpc_sopb_init(&s->global.incoming_sopb);
     407     2704454 :   grpc_chttp2_data_parser_init(&s->parsing.data_parser);
     408             : 
     409     2703832 :   REF_TRANSPORT(t, "stream");
     410             : 
     411     2704801 :   lock(t);
     412     2704807 :   grpc_chttp2_register_stream(t, s);
     413     2704702 :   if (server_data) {
     414     1301880 :     GPR_ASSERT(t->parsing_active);
     415     1301880 :     s->global.id = (gpr_uint32)(gpr_uintptr)server_data;
     416     1301880 :     s->global.outgoing_window =
     417             :         t->global.settings[GRPC_PEER_SETTINGS]
     418     1301880 :                           [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     419     1301880 :     s->global.max_recv_bytes = s->parsing.incoming_window =
     420     1301880 :         s->global.incoming_window =
     421             :             t->global.settings[GRPC_SENT_SETTINGS]
     422     1301880 :                               [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     423     1301880 :     *t->accepting_stream = s;
     424     1301880 :     grpc_chttp2_stream_map_add(&t->parsing_stream_map, s->global.id, s);
     425     1301859 :     s->global.in_stream_map = 1;
     426             :   }
     427             : 
     428     2704681 :   if (initial_op)
     429     1301866 :     perform_stream_op_locked(exec_ctx, &t->global, &s->global, initial_op);
     430     2704630 :   unlock(exec_ctx, t);
     431             : 
     432     2704286 :   return 0;
     433             : }
     434             : 
     435     2702166 : static void destroy_stream(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
     436             :                            grpc_stream *gs) {
     437     2702166 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
     438     2702166 :   grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
     439             :   int i;
     440             : 
     441     2702166 :   gpr_mu_lock(&t->mu);
     442             : 
     443     2704584 :   GPR_ASSERT(s->global.published_state == GRPC_STREAM_CLOSED ||
     444             :              s->global.id == 0);
     445     2704584 :   GPR_ASSERT(!s->global.in_stream_map);
     446     2704584 :   if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
     447         239 :     close_transport_locked(exec_ctx, t);
     448             :   }
     449     2704185 :   if (!t->parsing_active && s->global.id) {
     450     2656124 :     GPR_ASSERT(grpc_chttp2_stream_map_find(&t->parsing_stream_map,
     451             :                                            s->global.id) == NULL);
     452             :   }
     453             : 
     454     2704196 :   grpc_chttp2_list_remove_incoming_window_updated(&t->global, &s->global);
     455     2703470 :   grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
     456             : 
     457     2703962 :   gpr_mu_unlock(&t->mu);
     458             : 
     459    29746774 :   for (i = 0; i < STREAM_LIST_COUNT; i++) {
     460    27043124 :     if (s->included[i]) {
     461        4684 :       gpr_log(GPR_ERROR, "%s stream %d still included in list %d",
     462        2342 :               t->global.is_client ? "client" : "server", s->global.id, i);
     463           0 :       abort();
     464             :     }
     465             :   }
     466             : 
     467     2703650 :   GPR_ASSERT(s->global.outgoing_sopb == NULL);
     468     2703650 :   GPR_ASSERT(s->global.publish_sopb == NULL);
     469     2703650 :   grpc_sopb_destroy(&s->writing.sopb);
     470     2704395 :   grpc_sopb_destroy(&s->global.incoming_sopb);
     471     2703998 :   grpc_chttp2_data_parser_destroy(&s->parsing.data_parser);
     472     2703478 :   grpc_chttp2_incoming_metadata_buffer_destroy(&s->parsing.incoming_metadata);
     473     2703066 :   grpc_chttp2_incoming_metadata_buffer_destroy(&s->global.incoming_metadata);
     474     2702646 :   grpc_chttp2_incoming_metadata_live_op_buffer_end(
     475             :       &s->global.outstanding_metadata);
     476             : 
     477     2701433 :   UNREF_TRANSPORT(exec_ctx, t, "stream");
     478     2704760 : }
     479             : 
     480     8989852 : grpc_chttp2_stream_parsing *grpc_chttp2_parsing_lookup_stream(
     481             :     grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) {
     482     8989852 :   grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
     483     8989852 :   grpc_chttp2_stream *s =
     484     8989852 :       grpc_chttp2_stream_map_find(&t->parsing_stream_map, id);
     485     8998420 :   return s ? &s->parsing : NULL;
     486             : }
     487             : 
     488     1301774 : grpc_chttp2_stream_parsing *grpc_chttp2_parsing_accept_stream(
     489             :     grpc_chttp2_transport_parsing *transport_parsing, gpr_uint32 id) {
     490             :   grpc_chttp2_stream *accepting;
     491     1301774 :   grpc_chttp2_transport *t = TRANSPORT_FROM_PARSING(transport_parsing);
     492     1301774 :   GPR_ASSERT(t->accepting_stream == NULL);
     493     1301774 :   t->accepting_stream = &accepting;
     494     2603548 :   t->channel_callback.accept_stream(t->channel_callback.accept_stream_user_data,
     495     1301774 :                                     &t->base, (void *)(gpr_uintptr)id);
     496     1301890 :   t->accepting_stream = NULL;
     497     1301890 :   return &accepting->parsing;
     498             : }
     499             : 
     500             : /*
     501             :  * LOCK MANAGEMENT
     502             :  */
     503             : 
     504             : /* We take a grpc_chttp2_transport-global lock in response to calls coming in
     505             :    from above,
     506             :    and in response to data being received from below. New data to be written
     507             :    is always queued, as are callbacks to process data. During unlock() we
     508             :    check our todo lists and initiate callbacks and flush writes. */
     509             : 
     510    16803057 : static void lock(grpc_chttp2_transport *t) { gpr_mu_lock(&t->mu); }
     511             : 
     512    16811400 : static void unlock(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
     513    16811400 :   unlock_check_read_write_state(exec_ctx, t);
     514    29363533 :   if (!t->writing_active && !t->closed &&
     515    12555229 :       grpc_chttp2_unlocking_check_writes(&t->global, &t->writing)) {
     516     3444202 :     t->writing_active = 1;
     517     3444202 :     REF_TRANSPORT(t, "writing");
     518     3445372 :     grpc_exec_ctx_enqueue(exec_ctx, &t->writing_action, 1);
     519     3445324 :     prevent_endpoint_shutdown(t);
     520             :   }
     521             : 
     522    16809583 :   gpr_mu_unlock(&t->mu);
     523    16876602 : }
     524             : 
     525             : /*
     526             :  * OUTPUT PROCESSING
     527             :  */
     528             : 
     529        8016 : static void push_setting(grpc_chttp2_transport *t, grpc_chttp2_setting_id id,
     530             :                          gpr_uint32 value) {
     531        8016 :   const grpc_chttp2_setting_parameters *sp =
     532             :       &grpc_chttp2_settings_parameters[id];
     533        8016 :   gpr_uint32 use_value = GPR_CLAMP(value, sp->min_value, sp->max_value);
     534        8016 :   if (use_value != value) {
     535           0 :     gpr_log(GPR_INFO, "Requested parameter %s clamped from %d to %d", sp->name,
     536             :             value, use_value);
     537             :   }
     538        8018 :   if (use_value != t->global.settings[GRPC_LOCAL_SETTINGS][id]) {
     539        4004 :     t->global.settings[GRPC_LOCAL_SETTINGS][id] = use_value;
     540        4004 :     t->global.dirtied_local_settings = 1;
     541             :   }
     542        8018 : }
     543             : 
     544     3444143 : void grpc_chttp2_terminate_writing(grpc_exec_ctx *exec_ctx,
     545             :                                    void *transport_writing_ptr, int success) {
     546     3444143 :   grpc_chttp2_transport_writing *transport_writing = transport_writing_ptr;
     547     3444143 :   grpc_chttp2_transport *t = TRANSPORT_FROM_WRITING(transport_writing);
     548             : 
     549     3444143 :   lock(t);
     550             : 
     551     3445374 :   allow_endpoint_shutdown_locked(exec_ctx, t);
     552             : 
     553     3445635 :   if (!success) {
     554          25 :     drop_connection(exec_ctx, t);
     555             :   }
     556             : 
     557             :   /* cleanup writing related jazz */
     558     3445635 :   grpc_chttp2_cleanup_writing(exec_ctx, &t->global, &t->writing);
     559             : 
     560             :   /* leave the writing flag up on shutdown to prevent further writes in unlock()
     561             :      from starting */
     562     3441330 :   t->writing_active = 0;
     563     3441330 :   if (t->ep && !t->endpoint_reading) {
     564          16 :     destroy_endpoint(exec_ctx, t);
     565             :   }
     566             : 
     567     3441330 :   unlock(exec_ctx, t);
     568             : 
     569     3444603 :   UNREF_TRANSPORT(exec_ctx, t, "writing");
     570     3445634 : }
     571             : 
     572     3441817 : static void writing_action(grpc_exec_ctx *exec_ctx, void *gt,
     573             :                            int iomgr_success_ignored) {
     574     3441817 :   grpc_chttp2_transport *t = gt;
     575     3441817 :   grpc_chttp2_perform_writes(exec_ctx, &t->writing, t->ep);
     576     3443410 : }
     577             : 
     578         229 : void grpc_chttp2_add_incoming_goaway(
     579             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
     580             :     gpr_uint32 goaway_error, gpr_slice goaway_text) {
     581         229 :   char *msg = gpr_dump_slice(goaway_text, GPR_DUMP_HEX | GPR_DUMP_ASCII);
     582         229 :   gpr_log(GPR_DEBUG, "got goaway [%d]: %s", goaway_error, msg);
     583         229 :   gpr_free(msg);
     584         229 :   gpr_slice_unref(goaway_text);
     585         229 :   transport_global->seen_goaway = 1;
     586         229 :   connectivity_state_set(exec_ctx, transport_global, GRPC_CHANNEL_FATAL_FAILURE,
     587             :                          "got_goaway");
     588         229 : }
     589             : 
     590     4103470 : static void maybe_start_some_streams(
     591             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global) {
     592             :   grpc_chttp2_stream_global *stream_global;
     593             :   /* start streams where we have free grpc_chttp2_stream ids and free
     594             :    * concurrency */
     595    15113565 :   while (transport_global->next_stream_id <= MAX_CLIENT_STREAM_ID &&
     596     5503598 :          transport_global->concurrent_stream_count <
     597             :              transport_global
     598             :                  ->settings[GRPC_PEER_SETTINGS]
     599     9709058 :                            [GRPC_CHTTP2_SETTINGS_MAX_CONCURRENT_STREAMS] &&
     600     4205983 :          grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
     601             :                                                       &stream_global)) {
     602     1402904 :     GRPC_CHTTP2_IF_TRACING(gpr_log(
     603             :         GPR_DEBUG, "HTTP:%s: Allocating new grpc_chttp2_stream %p to id %d",
     604             :         transport_global->is_client ? "CLI" : "SVR", stream_global,
     605             :         transport_global->next_stream_id));
     606             : 
     607     1402896 :     GPR_ASSERT(stream_global->id == 0);
     608     1402896 :     stream_global->id = transport_global->next_stream_id;
     609     1402896 :     transport_global->next_stream_id += 2;
     610             : 
     611     1402896 :     if (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID) {
     612         170 :       connectivity_state_set(exec_ctx, transport_global,
     613             :                              GRPC_CHANNEL_TRANSIENT_FAILURE,
     614             :                              "no_more_stream_ids");
     615             :     }
     616             : 
     617     2805792 :     stream_global->outgoing_window =
     618             :         transport_global->settings[GRPC_PEER_SETTINGS]
     619     1402896 :                                   [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     620     2805792 :     stream_global->incoming_window =
     621             :         transport_global->settings[GRPC_SENT_SETTINGS]
     622     1402896 :                                   [GRPC_CHTTP2_SETTINGS_INITIAL_WINDOW_SIZE];
     623     2805792 :     stream_global->max_recv_bytes =
     624     1402896 :         GPR_MAX(stream_global->incoming_window, stream_global->max_recv_bytes);
     625     4208688 :     grpc_chttp2_stream_map_add(
     626     1402896 :         &TRANSPORT_FROM_GLOBAL(transport_global)->new_stream_map,
     627     1402896 :         stream_global->id, STREAM_FROM_GLOBAL(stream_global));
     628     1402881 :     stream_global->in_stream_map = 1;
     629     1402881 :     transport_global->concurrent_stream_count++;
     630     1402881 :     grpc_chttp2_list_add_incoming_window_updated(transport_global,
     631             :                                                  stream_global);
     632     1402846 :     grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
     633             :   }
     634             :   /* cancel out streams that will never be started */
     635     8206082 :   while (transport_global->next_stream_id >= MAX_CLIENT_STREAM_ID &&
     636         340 :          grpc_chttp2_list_pop_waiting_for_concurrency(transport_global,
     637             :                                                       &stream_global)) {
     638           0 :     cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE);
     639             :   }
     640     4102672 : }
     641             : 
     642     9554515 : static void perform_stream_op_locked(
     643             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
     644             :     grpc_chttp2_stream_global *stream_global, grpc_transport_stream_op *op) {
     645     9554515 :   if (op->cancel_with_status != GRPC_STATUS_OK) {
     646      278775 :     cancel_from_api(transport_global, stream_global, op->cancel_with_status);
     647             :   }
     648             : 
     649     9555650 :   if (op->close_with_status != GRPC_STATUS_OK) {
     650           7 :     close_from_api(transport_global, stream_global, op->close_with_status,
     651             :                    op->optional_close_message);
     652             :   }
     653             : 
     654     9555650 :   if (op->send_ops) {
     655     3007349 :     GPR_ASSERT(stream_global->outgoing_sopb == NULL);
     656     3007349 :     stream_global->send_done_closure = op->on_done_send;
     657     3007349 :     if (!stream_global->cancelled) {
     658     3007257 :       stream_global->written_anything = 1;
     659     3007257 :       stream_global->outgoing_sopb = op->send_ops;
     660     5710707 :       if (op->is_last_send &&
     661     2703450 :           stream_global->write_state == GRPC_WRITE_STATE_OPEN) {
     662     2703625 :         stream_global->write_state = GRPC_WRITE_STATE_QUEUED_CLOSE;
     663             :       }
     664     3007257 :       if (stream_global->id == 0) {
     665     1402932 :         GRPC_CHTTP2_IF_TRACING(gpr_log(
     666             :             GPR_DEBUG,
     667             :             "HTTP:%s: New grpc_chttp2_stream %p waiting for concurrency",
     668             :             transport_global->is_client ? "CLI" : "SVR", stream_global));
     669     1402932 :         grpc_chttp2_list_add_waiting_for_concurrency(transport_global,
     670             :                                                      stream_global);
     671     1402891 :         maybe_start_some_streams(exec_ctx, transport_global);
     672     1604325 :       } else if (stream_global->outgoing_window > 0) {
     673     1604297 :         grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
     674             :       }
     675             :     } else {
     676          92 :       grpc_sopb_reset(op->send_ops);
     677          92 :       grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 0);
     678             :     }
     679             :   }
     680             : 
     681     9554160 :   if (op->recv_ops) {
     682     5344555 :     GPR_ASSERT(stream_global->publish_sopb == NULL);
     683     5344555 :     GPR_ASSERT(stream_global->published_state != GRPC_STREAM_CLOSED);
     684     5344555 :     stream_global->recv_done_closure = op->on_done_recv;
     685     5344555 :     stream_global->publish_sopb = op->recv_ops;
     686     5344555 :     stream_global->publish_sopb->nops = 0;
     687     5344555 :     stream_global->publish_state = op->recv_state;
     688             :     /* clamp max recv bytes */
     689     5344555 :     op->max_recv_bytes = GPR_MIN(op->max_recv_bytes, GPR_UINT32_MAX);
     690     5344555 :     if (stream_global->max_recv_bytes < op->max_recv_bytes) {
     691     3002211 :       GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
     692             :           "op", transport_global, stream_global, max_recv_bytes,
     693             :           op->max_recv_bytes - stream_global->max_recv_bytes);
     694     3001952 :       GRPC_CHTTP2_FLOWCTL_TRACE_STREAM(
     695             :           "op", transport_global, stream_global, unannounced_incoming_window,
     696             :           op->max_recv_bytes - stream_global->max_recv_bytes);
     697     6003904 :       stream_global->unannounced_incoming_window +=
     698     3001952 :           (gpr_uint32)op->max_recv_bytes - stream_global->max_recv_bytes;
     699     3001952 :       stream_global->max_recv_bytes = (gpr_uint32)op->max_recv_bytes;
     700             :     }
     701     5344296 :     grpc_chttp2_incoming_metadata_live_op_buffer_end(
     702             :         &stream_global->outstanding_metadata);
     703     5345145 :     grpc_chttp2_list_add_read_write_state_changed(transport_global,
     704             :                                                   stream_global);
     705     5345201 :     if (stream_global->id != 0) {
     706     5345112 :       grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
     707             :     }
     708             :   }
     709             : 
     710     9554403 :   if (op->bind_pollset) {
     711     2703701 :     add_to_pollset_locked(exec_ctx, TRANSPORT_FROM_GLOBAL(transport_global),
     712             :                           op->bind_pollset);
     713             :   }
     714             : 
     715     9553853 :   grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
     716     9546146 : }
     717             : 
     718     8224266 : static void perform_stream_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
     719             :                               grpc_stream *gs, grpc_transport_stream_op *op) {
     720     8224266 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
     721     8224266 :   grpc_chttp2_stream *s = (grpc_chttp2_stream *)gs;
     722             : 
     723     8224266 :   lock(t);
     724     8258219 :   perform_stream_op_locked(exec_ctx, &t->global, &s->global, op);
     725     8246653 :   unlock(exec_ctx, t);
     726     8256338 : }
     727             : 
     728           0 : static void send_ping_locked(grpc_chttp2_transport *t, grpc_closure *on_recv) {
     729           0 :   grpc_chttp2_outstanding_ping *p = gpr_malloc(sizeof(*p));
     730           0 :   p->next = &t->global.pings;
     731           0 :   p->prev = p->next->prev;
     732           0 :   p->prev->next = p->next->prev = p;
     733           0 :   p->id[0] = (gpr_uint8)((t->global.ping_counter >> 56) & 0xff);
     734           0 :   p->id[1] = (gpr_uint8)((t->global.ping_counter >> 48) & 0xff);
     735           0 :   p->id[2] = (gpr_uint8)((t->global.ping_counter >> 40) & 0xff);
     736           0 :   p->id[3] = (gpr_uint8)((t->global.ping_counter >> 32) & 0xff);
     737           0 :   p->id[4] = (gpr_uint8)((t->global.ping_counter >> 24) & 0xff);
     738           0 :   p->id[5] = (gpr_uint8)((t->global.ping_counter >> 16) & 0xff);
     739           0 :   p->id[6] = (gpr_uint8)((t->global.ping_counter >> 8) & 0xff);
     740           0 :   p->id[7] = (gpr_uint8)(t->global.ping_counter & 0xff);
     741           0 :   p->on_recv = on_recv;
     742           0 :   gpr_slice_buffer_add(&t->global.qbuf, grpc_chttp2_ping_create(0, p->id));
     743           0 : }
     744             : 
     745       11123 : static void perform_transport_op(grpc_exec_ctx *exec_ctx, grpc_transport *gt,
     746             :                                  grpc_transport_op *op) {
     747       11123 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)gt;
     748       11123 :   int close_transport = 0;
     749             : 
     750       11123 :   lock(t);
     751             : 
     752       11123 :   grpc_exec_ctx_enqueue(exec_ctx, op->on_consumed, 1);
     753             : 
     754       11124 :   if (op->on_connectivity_state_change) {
     755        5588 :     grpc_connectivity_state_notify_on_state_change(
     756             :         exec_ctx, &t->channel_callback.state_tracker, op->connectivity_state,
     757             :         op->on_connectivity_state_change);
     758             :   }
     759             : 
     760       11126 :   if (op->send_goaway) {
     761        1772 :     t->global.sent_goaway = 1;
     762        5316 :     grpc_chttp2_goaway_append(
     763             :         t->global.last_incoming_stream_id,
     764        1772 :         (gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(op->goaway_status),
     765        1772 :         gpr_slice_ref(*op->goaway_message), &t->global.qbuf);
     766        1772 :     close_transport = !grpc_chttp2_has_streams(t);
     767             :   }
     768             : 
     769       11126 :   if (op->set_accept_stream != NULL) {
     770        2022 :     t->channel_callback.accept_stream = op->set_accept_stream;
     771        2022 :     t->channel_callback.accept_stream_user_data =
     772        2022 :         op->set_accept_stream_user_data;
     773             :   }
     774             : 
     775       11126 :   if (op->bind_pollset) {
     776        2149 :     add_to_pollset_locked(exec_ctx, t, op->bind_pollset);
     777             :   }
     778             : 
     779       11126 :   if (op->bind_pollset_set) {
     780        1547 :     add_to_pollset_set_locked(exec_ctx, t, op->bind_pollset_set);
     781             :   }
     782             : 
     783       11127 :   if (op->send_ping) {
     784           0 :     send_ping_locked(t, op->send_ping);
     785             :   }
     786             : 
     787       11127 :   if (op->disconnect) {
     788        1616 :     close_transport_locked(exec_ctx, t);
     789             :   }
     790             : 
     791       11127 :   unlock(exec_ctx, t);
     792             : 
     793       11128 :   if (close_transport) {
     794        1648 :     lock(t);
     795        1648 :     close_transport_locked(exec_ctx, t);
     796        1648 :     unlock(exec_ctx, t);
     797             :   }
     798       11128 : }
     799             : 
     800             : /*
     801             :  * INPUT PROCESSING
     802             :  */
     803             : 
     804    12629478 : static grpc_stream_state compute_state(gpr_uint8 write_closed,
     805             :                                        gpr_uint8 read_closed) {
     806    12629478 :   if (write_closed && read_closed) return GRPC_STREAM_CLOSED;
     807     9926843 :   if (write_closed) return GRPC_STREAM_SEND_CLOSED;
     808     9926843 :   if (read_closed) return GRPC_STREAM_RECV_CLOSED;
     809     7086222 :   return GRPC_STREAM_OPEN;
     810             : }
     811             : 
     812     2700826 : static void remove_stream(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
     813             :                           gpr_uint32 id) {
     814             :   size_t new_stream_count;
     815     2700826 :   grpc_chttp2_stream *s =
     816     2700826 :       grpc_chttp2_stream_map_delete(&t->parsing_stream_map, id);
     817     2704059 :   if (!s) {
     818          64 :     s = grpc_chttp2_stream_map_delete(&t->new_stream_map, id);
     819             :   }
     820     2704059 :   grpc_chttp2_list_remove_writable_stream(&t->global, &s->global);
     821     2703331 :   GPR_ASSERT(s);
     822     2703331 :   s->global.in_stream_map = 0;
     823     2703331 :   if (t->parsing.incoming_stream == &s->parsing) {
     824           9 :     t->parsing.incoming_stream = NULL;
     825           9 :     grpc_chttp2_parsing_become_skip_parser(&t->parsing);
     826             :   }
     827     2703331 :   if (grpc_chttp2_unregister_stream(t, s) && t->global.sent_goaway) {
     828         124 :     close_transport_locked(exec_ctx, t);
     829             :   }
     830             : 
     831     5406262 :   new_stream_count = grpc_chttp2_stream_map_size(&t->parsing_stream_map) +
     832     2703259 :                      grpc_chttp2_stream_map_size(&t->new_stream_map);
     833     2702958 :   GPR_ASSERT(new_stream_count <= GPR_UINT32_MAX);
     834     2702958 :   if (new_stream_count != t->global.concurrent_stream_count) {
     835     2703076 :     t->global.concurrent_stream_count = (gpr_uint32)new_stream_count;
     836     2703076 :     maybe_start_some_streams(exec_ctx, &t->global);
     837             :   }
     838     2702297 : }
     839             : 
     840    16813749 : static void unlock_check_read_write_state(grpc_exec_ctx *exec_ctx,
     841             :                                           grpc_chttp2_transport *t) {
     842    16813749 :   grpc_chttp2_transport_global *transport_global = &t->global;
     843             :   grpc_chttp2_stream_global *stream_global;
     844             :   grpc_stream_state state;
     845             : 
     846    16813749 :   if (!t->parsing_active) {
     847             :     /* if a stream is in the stream map, and gets cancelled, we need to ensure
     848             :        we are not parsing before continuing the cancellation to keep things in
     849             :        a sane state */
     850    30803735 :     while (grpc_chttp2_list_pop_closed_waiting_for_parsing(transport_global,
     851             :                                                            &stream_global)) {
     852       47035 :       GPR_ASSERT(stream_global->in_stream_map);
     853       47035 :       GPR_ASSERT(stream_global->write_state != GRPC_WRITE_STATE_OPEN);
     854       47035 :       GPR_ASSERT(stream_global->read_closed);
     855       47035 :       remove_stream(exec_ctx, t, stream_global->id);
     856       47035 :       grpc_chttp2_list_add_read_write_state_changed(transport_global,
     857             :                                                     stream_global);
     858             :     }
     859             :   }
     860             : 
     861    16817248 :   if (!t->writing_active) {
     862    25160935 :     while (grpc_chttp2_list_pop_cancelled_waiting_for_writing(transport_global,
     863             :                                                               &stream_global)) {
     864          81 :       grpc_chttp2_list_add_read_write_state_changed(transport_global,
     865             :                                                     stream_global);
     866             :     }
     867             :   }
     868             : 
     869    47815921 :   while (grpc_chttp2_list_pop_read_write_state_changed(transport_global,
     870             :                                                        &stream_global)) {
     871    14145070 :     if (stream_global->cancelled) {
     872      575912 :       if (t->writing_active &&
     873      273594 :           stream_global->write_state != GRPC_WRITE_STATE_SENT_CLOSE) {
     874          84 :         grpc_chttp2_list_add_cancelled_waiting_for_writing(transport_global,
     875             :                                                            stream_global);
     876             :       } else {
     877      302234 :         stream_global->write_state = GRPC_WRITE_STATE_SENT_CLOSE;
     878      302234 :         if (stream_global->outgoing_sopb != NULL) {
     879           1 :           grpc_sopb_reset(stream_global->outgoing_sopb);
     880           1 :           stream_global->outgoing_sopb = NULL;
     881           1 :           grpc_exec_ctx_enqueue(exec_ctx, stream_global->send_done_closure, 1);
     882             :         }
     883      302238 :         stream_global->read_closed = 1;
     884      302238 :         if (!stream_global->published_cancelled) {
     885             :           char buffer[GPR_LTOA_MIN_BUFSIZE];
     886      284248 :           gpr_ltoa(stream_global->cancelled_status, buffer);
     887      568508 :           grpc_chttp2_incoming_metadata_buffer_add(
     888      284257 :               &stream_global->incoming_metadata,
     889             :               grpc_mdelem_from_strings(t->metadata_context, "grpc-status",
     890             :                                        buffer));
     891      568506 :           grpc_chttp2_incoming_metadata_buffer_place_metadata_batch_into(
     892      568506 :               &stream_global->incoming_metadata, &stream_global->incoming_sopb);
     893      284255 :           stream_global->published_cancelled = 1;
     894             :         }
     895             :       }
     896             :     }
     897    21812232 :     if (stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE &&
     898    10667184 :         stream_global->read_closed && stream_global->in_stream_map) {
     899     2718468 :       if (t->parsing_active) {
     900       64631 :         grpc_chttp2_list_add_closed_waiting_for_parsing(transport_global,
     901             :                                                         stream_global);
     902             :       } else {
     903     2653837 :         remove_stream(exec_ctx, t, stream_global->id);
     904             :       }
     905             :     }
     906    14183824 :     if (!stream_global->publish_sopb) {
     907     1540855 :       continue;
     908             :     }
     909    12642969 :     if (stream_global->writing_now != 0) {
     910       25803 :       continue;
     911             :     }
     912             :     /* FIXME(ctiller): we include in_stream_map in our computation of
     913             :        whether the stream is write-closed. This is completely bogus,
     914             :        but has the effect of delaying stream-closed until the stream
     915             :        is indeed evicted from the stream map, making it safe to delete.
     916             :        To fix this will require having an edge after stream-closed
     917             :        indicating that the stream is closed AND safe to delete. */
     918    25234332 :     state = compute_state(
     919    18877984 :         stream_global->write_state == GRPC_WRITE_STATE_SENT_CLOSE &&
     920     6260818 :             !stream_global->in_stream_map,
     921    12617166 :         stream_global->read_closed);
     922    21182600 :     if (stream_global->incoming_sopb.nops == 0 &&
     923     8579979 :         state == stream_global->published_state) {
     924     7283477 :       continue;
     925             :     }
     926    15957432 :     grpc_chttp2_incoming_metadata_buffer_postprocess_sopb_and_begin_live_op(
     927    10638288 :         &stream_global->incoming_metadata, &stream_global->incoming_sopb,
     928     5319144 :         &stream_global->outstanding_metadata);
     929     5343511 :     grpc_sopb_swap(stream_global->publish_sopb, &stream_global->incoming_sopb);
     930     5343784 :     stream_global->published_state = *stream_global->publish_state = state;
     931     5343784 :     grpc_exec_ctx_enqueue(exec_ctx, stream_global->recv_done_closure, 1);
     932     5342312 :     stream_global->recv_done_closure = NULL;
     933     5342312 :     stream_global->publish_sopb = NULL;
     934     5342312 :     stream_global->publish_state = NULL;
     935             :   }
     936    16785611 : }
     937             : 
     938      278835 : static void cancel_from_api(grpc_chttp2_transport_global *transport_global,
     939             :                             grpc_chttp2_stream_global *stream_global,
     940             :                             grpc_status_code status) {
     941      278835 :   stream_global->cancelled = 1;
     942      278835 :   stream_global->cancelled_status = status;
     943      278835 :   if (stream_global->id != 0) {
     944      278742 :     gpr_slice_buffer_add(
     945             :         &transport_global->qbuf,
     946             :         grpc_chttp2_rst_stream_create(
     947             :             stream_global->id,
     948      278742 :             (gpr_uint32)grpc_chttp2_grpc_status_to_http2_error(status)));
     949             :   }
     950      278836 :   grpc_chttp2_list_add_read_write_state_changed(transport_global,
     951             :                                                 stream_global);
     952      278834 : }
     953             : 
     954           7 : static void close_from_api(grpc_chttp2_transport_global *transport_global,
     955             :                            grpc_chttp2_stream_global *stream_global,
     956             :                            grpc_status_code status,
     957             :                            gpr_slice *optional_message) {
     958             :   gpr_slice hdr;
     959             :   gpr_slice status_hdr;
     960             :   gpr_slice message_pfx;
     961             :   gpr_uint8 *p;
     962           7 :   gpr_uint32 len = 0;
     963             : 
     964           7 :   GPR_ASSERT(status >= 0 && (int)status < 100);
     965             : 
     966           7 :   stream_global->cancelled = 1;
     967           7 :   stream_global->cancelled_status = status;
     968           7 :   GPR_ASSERT(stream_global->id != 0);
     969           7 :   GPR_ASSERT(!stream_global->written_anything);
     970             : 
     971             :   /* Hand roll a header block.
     972             :      This is unnecessarily ugly - at some point we should find a more elegant
     973             :      solution.
     974             :      It's complicated by the fact that our send machinery would be dead by the
     975             :      time we got around to sending this, so instead we ignore HPACK compression
     976             :      and just write the uncompressed bytes onto the wire. */
     977           7 :   status_hdr = gpr_slice_malloc(15 + (status >= 10));
     978           7 :   p = GPR_SLICE_START_PTR(status_hdr);
     979           7 :   *p++ = 0x40; /* literal header */
     980           7 :   *p++ = 11;   /* len(grpc-status) */
     981           7 :   *p++ = 'g';
     982           7 :   *p++ = 'r';
     983           7 :   *p++ = 'p';
     984           7 :   *p++ = 'c';
     985           7 :   *p++ = '-';
     986           7 :   *p++ = 's';
     987           7 :   *p++ = 't';
     988           7 :   *p++ = 'a';
     989           7 :   *p++ = 't';
     990           7 :   *p++ = 'u';
     991           7 :   *p++ = 's';
     992           7 :   if (status < 10) {
     993           0 :     *p++ = 1;
     994           0 :     *p++ = (gpr_uint8)('0' + status);
     995             :   } else {
     996           7 :     *p++ = 2;
     997           7 :     *p++ = (gpr_uint8)('0' + (status / 10));
     998           7 :     *p++ = (gpr_uint8)('0' + (status % 10));
     999             :   }
    1000           7 :   GPR_ASSERT(p == GPR_SLICE_END_PTR(status_hdr));
    1001           7 :   len += (gpr_uint32)GPR_SLICE_LENGTH(status_hdr);
    1002             : 
    1003           7 :   if (optional_message) {
    1004           7 :     GPR_ASSERT(GPR_SLICE_LENGTH(*optional_message) < 127);
    1005           7 :     message_pfx = gpr_slice_malloc(15);
    1006           7 :     p = GPR_SLICE_START_PTR(message_pfx);
    1007           7 :     *p++ = 0x40;
    1008           7 :     *p++ = 12; /* len(grpc-message) */
    1009           7 :     *p++ = 'g';
    1010           7 :     *p++ = 'r';
    1011           7 :     *p++ = 'p';
    1012           7 :     *p++ = 'c';
    1013           7 :     *p++ = '-';
    1014           7 :     *p++ = 'm';
    1015           7 :     *p++ = 'e';
    1016           7 :     *p++ = 's';
    1017           7 :     *p++ = 's';
    1018           7 :     *p++ = 'a';
    1019           7 :     *p++ = 'g';
    1020           7 :     *p++ = 'e';
    1021           7 :     *p++ = (gpr_uint8)GPR_SLICE_LENGTH(*optional_message);
    1022           7 :     GPR_ASSERT(p == GPR_SLICE_END_PTR(message_pfx));
    1023           7 :     len += (gpr_uint32)GPR_SLICE_LENGTH(message_pfx);
    1024           7 :     len += (gpr_uint32)GPR_SLICE_LENGTH(*optional_message);
    1025             :   }
    1026             : 
    1027           7 :   hdr = gpr_slice_malloc(9);
    1028           7 :   p = GPR_SLICE_START_PTR(hdr);
    1029           7 :   *p++ = (gpr_uint8)(len >> 16);
    1030           7 :   *p++ = (gpr_uint8)(len >> 8);
    1031           7 :   *p++ = (gpr_uint8)(len);
    1032           7 :   *p++ = GRPC_CHTTP2_FRAME_HEADER;
    1033           7 :   *p++ = GRPC_CHTTP2_DATA_FLAG_END_STREAM | GRPC_CHTTP2_DATA_FLAG_END_HEADERS;
    1034           7 :   *p++ = (gpr_uint8)(stream_global->id >> 24);
    1035           7 :   *p++ = (gpr_uint8)(stream_global->id >> 16);
    1036           7 :   *p++ = (gpr_uint8)(stream_global->id >> 8);
    1037           7 :   *p++ = (gpr_uint8)(stream_global->id);
    1038           7 :   GPR_ASSERT(p == GPR_SLICE_END_PTR(hdr));
    1039             : 
    1040           7 :   gpr_slice_buffer_add(&transport_global->qbuf, hdr);
    1041           7 :   gpr_slice_buffer_add(&transport_global->qbuf, status_hdr);
    1042           7 :   if (optional_message) {
    1043           7 :     gpr_slice_buffer_add(&transport_global->qbuf, message_pfx);
    1044           7 :     gpr_slice_buffer_add(&transport_global->qbuf,
    1045             :                          gpr_slice_ref(*optional_message));
    1046             :   }
    1047             : 
    1048           7 :   gpr_slice_buffer_add(
    1049             :       &transport_global->qbuf,
    1050             :       grpc_chttp2_rst_stream_create(stream_global->id, GRPC_CHTTP2_NO_ERROR));
    1051             : 
    1052           7 :   grpc_chttp2_list_add_read_write_state_changed(transport_global,
    1053             :                                                 stream_global);
    1054           7 : }
    1055             : 
    1056          60 : static void cancel_stream_cb(grpc_chttp2_transport_global *transport_global,
    1057             :                              void *user_data,
    1058             :                              grpc_chttp2_stream_global *stream_global) {
    1059          60 :   cancel_from_api(transport_global, stream_global, GRPC_STATUS_UNAVAILABLE);
    1060          60 : }
    1061             : 
    1062        8084 : static void end_all_the_calls(grpc_chttp2_transport *t) {
    1063        8084 :   grpc_chttp2_for_all_streams(&t->global, NULL, cancel_stream_cb);
    1064        8084 : }
    1065             : 
    1066        8084 : static void drop_connection(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t) {
    1067        8084 :   close_transport_locked(exec_ctx, t);
    1068        8084 :   end_all_the_calls(t);
    1069        8084 : }
    1070             : 
    1071             : /** update window from a settings change */
    1072           0 : static void update_global_window(void *args, gpr_uint32 id, void *stream) {
    1073           0 :   grpc_chttp2_transport *t = args;
    1074           0 :   grpc_chttp2_stream *s = stream;
    1075           0 :   grpc_chttp2_transport_global *transport_global = &t->global;
    1076           0 :   grpc_chttp2_stream_global *stream_global = &s->global;
    1077             :   int was_zero;
    1078             :   int is_zero;
    1079             : 
    1080           0 :   GRPC_CHTTP2_FLOWCTL_TRACE_STREAM("settings", transport_global, stream_global,
    1081             :                                    outgoing_window,
    1082             :                                    t->parsing.initial_window_update);
    1083           0 :   was_zero = stream_global->outgoing_window <= 0;
    1084           0 :   stream_global->outgoing_window += t->parsing.initial_window_update;
    1085           0 :   is_zero = stream_global->outgoing_window <= 0;
    1086             : 
    1087           0 :   if (was_zero && !is_zero) {
    1088           0 :     grpc_chttp2_list_add_writable_stream(transport_global, stream_global);
    1089             :   }
    1090           0 : }
    1091             : 
    1092        4014 : static void read_error_locked(grpc_exec_ctx *exec_ctx,
    1093             :                               grpc_chttp2_transport *t) {
    1094        4014 :   t->endpoint_reading = 0;
    1095        4014 :   if (!t->writing_active && t->ep) {
    1096        3998 :     destroy_endpoint(exec_ctx, t);
    1097             :   }
    1098        4014 : }
    1099             : 
    1100             : /* tcp read callback */
    1101     2469546 : static void recv_data(grpc_exec_ctx *exec_ctx, void *tp, int success) {
    1102             :   size_t i;
    1103     2469546 :   int keep_reading = 0;
    1104     2469546 :   grpc_chttp2_transport *t = tp;
    1105             : 
    1106     2469546 :   lock(t);
    1107     2469634 :   i = 0;
    1108     2469634 :   GPR_ASSERT(!t->parsing_active);
    1109     2469634 :   if (!t->closed) {
    1110     2466034 :     t->parsing_active = 1;
    1111             :     /* merge stream lists */
    1112     2466034 :     grpc_chttp2_stream_map_move_into(&t->new_stream_map,
    1113             :                                      &t->parsing_stream_map);
    1114     2466012 :     grpc_chttp2_prepare_to_read(&t->global, &t->parsing);
    1115     2465830 :     gpr_mu_unlock(&t->mu);
    1116    16177566 :     for (; i < t->read_buffer.count &&
    1117     5622810 :                grpc_chttp2_perform_read(exec_ctx, &t->parsing,
    1118     5622810 :                                         t->read_buffer.slices[i]);
    1119     5622752 :          i++)
    1120             :       ;
    1121     2465988 :     gpr_mu_lock(&t->mu);
    1122     2466032 :     if (i != t->read_buffer.count) {
    1123          31 :       drop_connection(exec_ctx, t);
    1124             :     }
    1125             :     /* merge stream lists */
    1126     2466032 :     grpc_chttp2_stream_map_move_into(&t->new_stream_map,
    1127             :                                      &t->parsing_stream_map);
    1128     2466031 :     t->global.concurrent_stream_count =
    1129     2466031 :         (gpr_uint32)grpc_chttp2_stream_map_size(&t->parsing_stream_map);
    1130     2466031 :     if (t->parsing.initial_window_update != 0) {
    1131           0 :       grpc_chttp2_stream_map_for_each(&t->parsing_stream_map,
    1132             :                                       update_global_window, t);
    1133           0 :       t->parsing.initial_window_update = 0;
    1134             :     }
    1135             :     /* handle higher level things */
    1136     2466031 :     grpc_chttp2_publish_reads(exec_ctx, &t->global, &t->parsing);
    1137     2466005 :     t->parsing_active = 0;
    1138             :   }
    1139     2469605 :   if (!success || i != t->read_buffer.count || t->closed) {
    1140        4014 :     drop_connection(exec_ctx, t);
    1141        4014 :     read_error_locked(exec_ctx, t);
    1142     2465591 :   } else if (!t->closed) {
    1143     2465592 :     keep_reading = 1;
    1144     2465592 :     REF_TRANSPORT(t, "keep_reading");
    1145     2465626 :     prevent_endpoint_shutdown(t);
    1146             :   }
    1147     2469638 :   gpr_slice_buffer_reset_and_unref(&t->read_buffer);
    1148     2469632 :   unlock(exec_ctx, t);
    1149             : 
    1150     2469635 :   if (keep_reading) {
    1151     2465621 :     grpc_endpoint_read(exec_ctx, t->ep, &t->read_buffer, &t->recv_data);
    1152     2465619 :     allow_endpoint_shutdown_unlocked(exec_ctx, t);
    1153     2465627 :     UNREF_TRANSPORT(exec_ctx, t, "keep_reading");
    1154             :   } else {
    1155        4014 :     UNREF_TRANSPORT(exec_ctx, t, "recv_data");
    1156             :   }
    1157     2469641 : }
    1158             : 
    1159             : /*
    1160             :  * CALLBACK LOOP
    1161             :  */
    1162             : 
    1163        4413 : static void connectivity_state_set(
    1164             :     grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
    1165             :     grpc_connectivity_state state, const char *reason) {
    1166        4413 :   GRPC_CHTTP2_IF_TRACING(
    1167             :       gpr_log(GPR_DEBUG, "set connectivity_state=%d", state));
    1168        4413 :   grpc_connectivity_state_set(exec_ctx, &TRANSPORT_FROM_GLOBAL(transport_global)
    1169             :                                              ->channel_callback.state_tracker,
    1170             :                               state, reason);
    1171        4413 : }
    1172             : 
    1173             : /*
    1174             :  * POLLSET STUFF
    1175             :  */
    1176             : 
    1177     2705377 : static void add_to_pollset_locked(grpc_exec_ctx *exec_ctx,
    1178             :                                   grpc_chttp2_transport *t,
    1179             :                                   grpc_pollset *pollset) {
    1180     2705377 :   if (t->ep) {
    1181     2705943 :     grpc_endpoint_add_to_pollset(exec_ctx, t->ep, pollset);
    1182             :   }
    1183     2705615 : }
    1184             : 
    1185        1547 : static void add_to_pollset_set_locked(grpc_exec_ctx *exec_ctx,
    1186             :                                       grpc_chttp2_transport *t,
    1187             :                                       grpc_pollset_set *pollset_set) {
    1188        1547 :   if (t->ep) {
    1189        1547 :     grpc_endpoint_add_to_pollset_set(exec_ctx, t->ep, pollset_set);
    1190             :   }
    1191        1548 : }
    1192             : 
    1193             : /*
    1194             :  * TRACING
    1195             :  */
    1196             : 
    1197       11018 : void grpc_chttp2_flowctl_trace(const char *file, int line, const char *reason,
    1198             :                                const char *context, const char *var,
    1199             :                                int is_client, gpr_uint32 stream_id,
    1200             :                                gpr_int64 current_value, gpr_int64 delta) {
    1201             :   char *identifier;
    1202             :   char *context_scope;
    1203             :   char *context_thread;
    1204       11018 :   char *underscore_pos = strchr(context, '_');
    1205       11018 :   GPR_ASSERT(underscore_pos);
    1206       11018 :   context_thread = gpr_strdup(underscore_pos + 1);
    1207       11018 :   context_scope = gpr_strdup(context);
    1208       11018 :   context_scope[underscore_pos - context] = 0;
    1209       11018 :   if (stream_id) {
    1210        6564 :     gpr_asprintf(&identifier, "%s[%d]", context_scope, stream_id);
    1211             :   } else {
    1212        4454 :     identifier = gpr_strdup(context_scope);
    1213             :   }
    1214       11018 :   gpr_log(GPR_INFO,
    1215             :           "FLOWCTL: %s %-10s %8s %-27s %8lld %c %8lld = %8lld %-10s [%s:%d]",
    1216             :           is_client ? "client" : "server", identifier, context_thread, var,
    1217             :           current_value, delta < 0 ? '-' : '+', delta < 0 ? -delta : delta,
    1218             :           current_value + delta, reason, file, line);
    1219       11018 :   gpr_free(identifier);
    1220       11018 :   gpr_free(context_thread);
    1221       11018 :   gpr_free(context_scope);
    1222       11018 : }
    1223             : 
    1224             : /*
    1225             :  * INTEGRATION GLUE
    1226             :  */
    1227             : 
    1228        1094 : static char *chttp2_get_peer(grpc_exec_ctx *exec_ctx, grpc_transport *t) {
    1229        1094 :   return gpr_strdup(((grpc_chttp2_transport *)t)->peer_string);
    1230             : }
    1231             : 
    1232             : static const grpc_transport_vtable vtable = {
    1233             :     sizeof(grpc_chttp2_stream), init_stream, perform_stream_op,
    1234             :     perform_transport_op, destroy_stream, destroy_transport, chttp2_get_peer};
    1235             : 
    1236        4014 : grpc_transport *grpc_create_chttp2_transport(
    1237             :     grpc_exec_ctx *exec_ctx, const grpc_channel_args *channel_args,
    1238             :     grpc_endpoint *ep, grpc_mdctx *mdctx, int is_client) {
    1239        4014 :   grpc_chttp2_transport *t = gpr_malloc(sizeof(grpc_chttp2_transport));
    1240        4014 :   init_transport(exec_ctx, t, channel_args, ep, mdctx, is_client != 0);
    1241        4012 :   return &t->base;
    1242             : }
    1243             : 
    1244        4014 : void grpc_chttp2_transport_start_reading(grpc_exec_ctx *exec_ctx,
    1245             :                                          grpc_transport *transport,
    1246             :                                          gpr_slice *slices, size_t nslices) {
    1247        4014 :   grpc_chttp2_transport *t = (grpc_chttp2_transport *)transport;
    1248        4014 :   REF_TRANSPORT(t, "recv_data"); /* matches unref inside recv_data */
    1249        4014 :   gpr_slice_buffer_addn(&t->read_buffer, slices, nslices);
    1250        4014 :   recv_data(exec_ctx, t, 1);
    1251        4014 : }

Generated by: LCOV version 1.10