Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/transport/chttp2/internal.h"
35 :
36 : #include <limits.h>
37 :
38 : #include <grpc/support/log.h>
39 :
40 : #include "src/core/profiling/timers.h"
41 : #include "src/core/transport/chttp2/http2_errors.h"
42 :
43 : static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
44 : grpc_chttp2_transport_writing *transport_writing);
45 :
46 34858101 : int grpc_chttp2_unlocking_check_writes(
47 : grpc_chttp2_transport_global *transport_global,
48 : grpc_chttp2_transport_writing *transport_writing, int is_parsing) {
49 : grpc_chttp2_stream_global *stream_global;
50 : grpc_chttp2_stream_writing *stream_writing;
51 :
52 : GPR_TIMER_BEGIN("grpc_chttp2_unlocking_check_writes", 0);
53 :
54 : /* simple writes are queued to qbuf, and flushed here */
55 34858101 : gpr_slice_buffer_swap(&transport_global->qbuf, &transport_writing->outbuf);
56 34925043 : GPR_ASSERT(transport_global->qbuf.count == 0);
57 :
58 34925043 : grpc_chttp2_hpack_compressor_set_max_table_size(
59 : &transport_writing->hpack_compressor,
60 : transport_global->settings[GRPC_PEER_SETTINGS]
61 : [GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);
62 :
63 34937227 : if (transport_global->dirtied_local_settings &&
64 12074 : !transport_global->sent_local_settings && !is_parsing) {
65 12076 : gpr_slice_buffer_add(
66 : &transport_writing->outbuf,
67 : grpc_chttp2_settings_create(
68 6038 : transport_global->settings[GRPC_SENT_SETTINGS],
69 6038 : transport_global->settings[GRPC_LOCAL_SETTINGS],
70 : transport_global->force_send_settings, GRPC_CHTTP2_NUM_SETTINGS));
71 6039 : transport_global->force_send_settings = 0;
72 6039 : transport_global->dirtied_local_settings = 0;
73 6039 : transport_global->sent_local_settings = 1;
74 : }
75 :
76 34931191 : GRPC_CHTTP2_FLOW_MOVE_TRANSPORT("write", transport_writing, outgoing_window,
77 : transport_global, outgoing_window);
78 :
79 : /* for each grpc_chttp2_stream that's become writable, frame it's data
80 : (according to available window sizes) and add to the output buffer */
81 77153057 : while (grpc_chttp2_list_pop_writable_stream(
82 : transport_global, transport_writing, &stream_global, &stream_writing)) {
83 : gpr_uint8 sent_initial_metadata;
84 :
85 7293948 : stream_writing->id = stream_global->id;
86 7293948 : stream_writing->read_closed = stream_global->read_closed;
87 :
88 7293948 : GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_writing, stream_writing,
89 : outgoing_window, stream_global,
90 : outgoing_window);
91 :
92 7297105 : sent_initial_metadata = stream_writing->sent_initial_metadata;
93 7297105 : if (!sent_initial_metadata && stream_global->send_initial_metadata) {
94 8880509 : stream_writing->send_initial_metadata =
95 4440173 : stream_global->send_initial_metadata;
96 4440336 : stream_global->send_initial_metadata = NULL;
97 4440336 : if (grpc_chttp2_list_add_writing_stream(transport_writing,
98 : stream_writing)) {
99 4437794 : GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
100 : }
101 4442514 : sent_initial_metadata = 1;
102 : }
103 7299446 : if (sent_initial_metadata) {
104 5801507 : if (stream_global->send_message != NULL) {
105 3963626 : gpr_slice hdr = gpr_slice_malloc(5);
106 3964002 : gpr_uint8 *p = GPR_SLICE_START_PTR(hdr);
107 3964002 : gpr_uint32 len = stream_global->send_message->length;
108 3964002 : GPR_ASSERT(stream_writing->send_message == NULL);
109 7928004 : p[0] = (stream_global->send_message->flags &
110 3964002 : GRPC_WRITE_INTERNAL_COMPRESS) != 0;
111 3964002 : p[1] = (gpr_uint8)(len >> 24);
112 3964002 : p[2] = (gpr_uint8)(len >> 16);
113 3964002 : p[3] = (gpr_uint8)(len >> 8);
114 3964002 : p[4] = (gpr_uint8)(len);
115 3964002 : gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer, hdr);
116 3963870 : if (stream_global->send_message->length > 0) {
117 2204824 : stream_writing->send_message = stream_global->send_message;
118 : } else {
119 1759046 : stream_writing->send_message = NULL;
120 : }
121 3963870 : stream_writing->stream_fetched = 0;
122 3963870 : stream_global->send_message = NULL;
123 : }
124 9398682 : if ((stream_writing->send_message != NULL ||
125 7588748 : stream_writing->flow_controlled_buffer.length > 0) &&
126 3991817 : stream_writing->outgoing_window > 0) {
127 3964021 : if (transport_writing->outgoing_window > 0) {
128 3964021 : if (grpc_chttp2_list_add_writing_stream(transport_writing,
129 : stream_writing)) {
130 568721 : GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
131 : }
132 : } else {
133 0 : grpc_chttp2_list_add_stalled_by_transport(transport_writing,
134 : stream_writing);
135 : }
136 : }
137 5801685 : if (stream_global->send_trailing_metadata) {
138 8884703 : stream_writing->send_trailing_metadata =
139 4442277 : stream_global->send_trailing_metadata;
140 4442426 : stream_global->send_trailing_metadata = NULL;
141 4442426 : if (grpc_chttp2_list_add_writing_stream(transport_writing,
142 : stream_writing)) {
143 1289 : GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
144 : }
145 : }
146 : }
147 :
148 10728342 : if (!stream_global->read_closed &&
149 3429718 : stream_global->unannounced_incoming_window_for_writing > 1024) {
150 28880 : GRPC_CHTTP2_FLOW_MOVE_STREAM("write", transport_global, stream_writing,
151 : announce_window, stream_global,
152 : unannounced_incoming_window_for_writing);
153 28880 : if (grpc_chttp2_list_add_writing_stream(transport_writing,
154 : stream_writing)) {
155 28880 : GRPC_CHTTP2_STREAM_REF(stream_global, "chttp2_writing");
156 : }
157 : }
158 : }
159 :
160 : /* if the grpc_chttp2_transport is ready to send a window update, do so here
161 : also; 3/4 is a magic number that will likely get tuned soon */
162 34895764 : if (transport_global->announce_incoming_window > 0) {
163 6516 : gpr_uint32 announced = (gpr_uint32)GPR_MIN(
164 : transport_global->announce_incoming_window, GPR_UINT32_MAX);
165 6516 : GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_global,
166 : announce_incoming_window, announced);
167 6516 : gpr_slice_buffer_add(&transport_writing->outbuf,
168 : grpc_chttp2_window_update_create(0, announced));
169 : }
170 :
171 : GPR_TIMER_END("grpc_chttp2_unlocking_check_writes", 0);
172 :
173 69823052 : return transport_writing->outbuf.count > 0 ||
174 34891279 : grpc_chttp2_list_have_writing_streams(transport_writing);
175 : }
176 :
177 4621509 : void grpc_chttp2_perform_writes(
178 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
179 : grpc_endpoint *endpoint) {
180 4621509 : GPR_ASSERT(transport_writing->outbuf.count > 0 ||
181 : grpc_chttp2_list_have_writing_streams(transport_writing));
182 :
183 4620510 : finalize_outbuf(exec_ctx, transport_writing);
184 :
185 4623388 : GPR_ASSERT(endpoint);
186 :
187 4623388 : if (transport_writing->outbuf.count > 0) {
188 4623385 : grpc_endpoint_write(exec_ctx, endpoint, &transport_writing->outbuf,
189 : &transport_writing->done_cb);
190 : } else {
191 3 : grpc_exec_ctx_enqueue(exec_ctx, &transport_writing->done_cb, 1);
192 : }
193 4623746 : }
194 :
195 4621068 : static void finalize_outbuf(grpc_exec_ctx *exec_ctx,
196 : grpc_chttp2_transport_writing *transport_writing) {
197 : grpc_chttp2_stream_writing *stream_writing;
198 :
199 : GPR_TIMER_BEGIN("finalize_outbuf", 0);
200 :
201 14281560 : while (
202 9664087 : grpc_chttp2_list_pop_writing_stream(transport_writing, &stream_writing)) {
203 5036619 : gpr_uint32 max_outgoing =
204 5037058 : (gpr_uint32)GPR_MIN(GRPC_CHTTP2_MAX_PAYLOAD_LENGTH,
205 : GPR_MIN(stream_writing->outgoing_window,
206 : transport_writing->outgoing_window));
207 : /* send initial metadata if it's available */
208 5037058 : if (stream_writing->send_initial_metadata != NULL) {
209 13305121 : grpc_chttp2_encode_header(
210 4434986 : &transport_writing->hpack_compressor, stream_writing->id,
211 4434986 : stream_writing->send_initial_metadata, 0, &transport_writing->outbuf);
212 4442962 : stream_writing->send_initial_metadata = NULL;
213 4442962 : stream_writing->sent_initial_metadata = 1;
214 : }
215 : /* send any window updates */
216 5073752 : if (stream_writing->announce_window > 0 &&
217 28881 : stream_writing->send_initial_metadata == NULL) {
218 28859 : gpr_uint32 announce = stream_writing->announce_window;
219 57740 : gpr_slice_buffer_add(
220 : &transport_writing->outbuf,
221 28859 : grpc_chttp2_window_update_create(stream_writing->id,
222 28859 : stream_writing->announce_window));
223 28881 : GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing, stream_writing,
224 : announce_window, announce);
225 28881 : stream_writing->announce_window = 0;
226 : }
227 : /* fetch any body bytes */
228 14503372 : while (!stream_writing->fetching && stream_writing->send_message &&
229 4415111 : stream_writing->flow_controlled_buffer.length < max_outgoing &&
230 4414332 : stream_writing->stream_fetched <
231 2207166 : stream_writing->send_message->length) {
232 4414208 : if (grpc_byte_stream_next(exec_ctx, stream_writing->send_message,
233 2207042 : &stream_writing->fetching_slice, max_outgoing,
234 2207042 : &stream_writing->finished_fetch)) {
235 4414244 : stream_writing->stream_fetched +=
236 2207122 : GPR_SLICE_LENGTH(stream_writing->fetching_slice);
237 4414120 : if (stream_writing->stream_fetched ==
238 2207122 : stream_writing->send_message->length) {
239 2204904 : stream_writing->send_message = NULL;
240 : }
241 2207122 : gpr_slice_buffer_add(&stream_writing->flow_controlled_buffer,
242 2206998 : stream_writing->fetching_slice);
243 : } else {
244 0 : stream_writing->fetching = 1;
245 : }
246 : }
247 : /* send any body bytes */
248 5043829 : if (stream_writing->flow_controlled_buffer.length > 0) {
249 3992871 : if (max_outgoing > 0) {
250 3967594 : gpr_uint32 send_bytes = (gpr_uint32)GPR_MIN(
251 : max_outgoing, stream_writing->flow_controlled_buffer.length);
252 3967299 : int is_last_data_frame =
253 7931317 : stream_writing->send_message == NULL &&
254 3963723 : send_bytes == stream_writing->flow_controlled_buffer.length;
255 7928907 : int is_last_frame = is_last_data_frame &&
256 7360546 : stream_writing->send_trailing_metadata != NULL &&
257 3392302 : grpc_metadata_batch_is_empty(
258 3392219 : stream_writing->send_trailing_metadata);
259 11904437 : grpc_chttp2_encode_data(
260 7936193 : stream_writing->id, &stream_writing->flow_controlled_buffer,
261 : send_bytes, is_last_frame, &transport_writing->outbuf);
262 3966290 : GRPC_CHTTP2_FLOW_DEBIT_STREAM("write", transport_writing,
263 : stream_writing, outgoing_window,
264 : send_bytes);
265 3966764 : GRPC_CHTTP2_FLOW_DEBIT_TRANSPORT("write", transport_writing,
266 : outgoing_window, send_bytes);
267 3966764 : if (is_last_frame) {
268 1748465 : stream_writing->send_trailing_metadata = NULL;
269 1748465 : stream_writing->sent_trailing_metadata = 1;
270 : }
271 3966764 : if (is_last_data_frame) {
272 3962212 : GPR_ASSERT(stream_writing->send_message == NULL);
273 3962212 : stream_writing->sent_message = 1;
274 : }
275 25277 : } else if (transport_writing->outgoing_window == 0) {
276 0 : grpc_chttp2_list_add_stalled_by_transport(transport_writing,
277 : stream_writing);
278 0 : grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
279 : }
280 : }
281 : /* send trailing metadata if it's available and we're ready for it */
282 10085946 : if (stream_writing->send_message == NULL &&
283 10057953 : stream_writing->flow_controlled_buffer.length == 0 &&
284 5015006 : stream_writing->send_trailing_metadata != NULL) {
285 2693835 : if (grpc_metadata_batch_is_empty(
286 2693735 : stream_writing->send_trailing_metadata)) {
287 1046726 : grpc_chttp2_encode_data(stream_writing->id,
288 523363 : &stream_writing->flow_controlled_buffer, 0, 1,
289 : &transport_writing->outbuf);
290 : } else {
291 6511377 : grpc_chttp2_encode_header(&transport_writing->hpack_compressor,
292 2170411 : stream_writing->id,
293 2170483 : stream_writing->send_trailing_metadata, 1,
294 : &transport_writing->outbuf);
295 : }
296 2694071 : if (!transport_writing->is_client && !stream_writing->read_closed) {
297 258 : gpr_slice_buffer_add(&transport_writing->outbuf,
298 : grpc_chttp2_rst_stream_create(
299 258 : stream_writing->id, GRPC_CHTTP2_NO_ERROR));
300 : }
301 2694071 : stream_writing->send_trailing_metadata = NULL;
302 2694071 : stream_writing->sent_trailing_metadata = 1;
303 : }
304 : /* if there's more to write, then loop, otherwise prepare to finish the
305 : * write */
306 10057371 : if ((stream_writing->flow_controlled_buffer.length > 0 ||
307 5043985 : (stream_writing->send_message && !stream_writing->fetching)) &&
308 29474 : stream_writing->outgoing_window > 0) {
309 7688 : if (transport_writing->outgoing_window > 0) {
310 3889 : if (grpc_chttp2_list_add_writing_stream(transport_writing,
311 : stream_writing)) {
312 : /* do nothing - already reffed */
313 : }
314 : } else {
315 6 : grpc_chttp2_list_add_stalled_by_transport(transport_writing,
316 : stream_writing);
317 6 : grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
318 : }
319 : } else {
320 5039340 : grpc_chttp2_list_add_written_stream(transport_writing, stream_writing);
321 : }
322 : }
323 :
324 : GPR_TIMER_END("finalize_outbuf", 0);
325 4623434 : }
326 :
327 4625504 : void grpc_chttp2_cleanup_writing(
328 : grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
329 : grpc_chttp2_transport_writing *transport_writing) {
330 : grpc_chttp2_stream_writing *stream_writing;
331 : grpc_chttp2_stream_global *stream_global;
332 :
333 14292808 : while (grpc_chttp2_list_pop_written_stream(
334 : transport_global, transport_writing, &stream_global, &stream_writing)) {
335 5038962 : if (stream_writing->sent_trailing_metadata) {
336 4441955 : grpc_chttp2_mark_stream_closed(exec_ctx, transport_global, stream_global,
337 4441955 : !transport_global->is_client, 1);
338 : }
339 5038594 : if (stream_writing->sent_initial_metadata) {
340 5038336 : grpc_chttp2_complete_closure_step(
341 5038336 : exec_ctx, &stream_global->send_initial_metadata_finished, 1);
342 : }
343 5038676 : if (stream_writing->sent_message) {
344 3960587 : GPR_ASSERT(stream_writing->send_message == NULL);
345 3960587 : GPR_ASSERT(stream_global->send_message_finished);
346 3960587 : grpc_chttp2_complete_closure_step(
347 3960400 : exec_ctx, &stream_global->send_message_finished, 1);
348 3958883 : stream_writing->sent_message = 0;
349 : }
350 5036972 : if (stream_writing->sent_trailing_metadata) {
351 4437850 : grpc_chttp2_complete_closure_step(
352 4437850 : exec_ctx, &stream_global->send_trailing_metadata_finished, 1);
353 : }
354 5037213 : GRPC_CHTTP2_STREAM_UNREF(exec_ctx, stream_global, "chttp2_writing");
355 : }
356 4625612 : gpr_slice_buffer_reset_and_unref(&transport_writing->outbuf);
357 4622280 : }
|