Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <assert.h>
35 : #include <string.h>
36 :
37 : #include <grpc/compression.h>
38 : #include <grpc/support/alloc.h>
39 : #include <grpc/support/log.h>
40 : #include <grpc/support/slice_buffer.h>
41 :
42 : #include "src/core/channel/compress_filter.h"
43 : #include "src/core/channel/channel_args.h"
44 : #include "src/core/compression/message_compress.h"
45 : #include "src/core/support/string.h"
46 :
47 : typedef struct call_data {
48 : gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
49 : grpc_linked_mdelem compression_algorithm_storage;
50 : grpc_linked_mdelem accept_encoding_storage;
51 : gpr_uint32 remaining_slice_bytes;
52 : /**< Input data to be read, as per BEGIN_MESSAGE */
53 : int written_initial_metadata; /**< Already processed initial md? */
54 : /** Compression algorithm we'll try to use. It may be given by incoming
55 : * metadata, or by the channel's default compression settings. */
56 : grpc_compression_algorithm compression_algorithm;
57 : /** If true, contents of \a compression_algorithm are authoritative */
58 : int has_compression_algorithm;
59 : } call_data;
60 :
61 : typedef struct channel_data {
62 : /** Metadata key for the incoming (requested) compression algorithm */
63 : grpc_mdstr *mdstr_request_compression_algorithm_key;
64 : /** Metadata key for the outgoing (used) compression algorithm */
65 : grpc_mdstr *mdstr_outgoing_compression_algorithm_key;
66 : /** Metadata key for the accepted encodings */
67 : grpc_mdstr *mdstr_compression_capabilities_key;
68 : /** Precomputed metadata elements for all available compression algorithms */
69 : grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
70 : /** Precomputed metadata elements for the accepted encodings */
71 : grpc_mdelem *mdelem_accept_encoding;
72 : /** The default, channel-level, compression algorithm */
73 : grpc_compression_algorithm default_compression_algorithm;
74 : /** Compression options for the channel */
75 : grpc_compression_options compression_options;
76 : } channel_data;
77 :
78 : /** Compress \a slices in place using \a algorithm. Returns 1 if compression did
79 : * actually happen, 0 otherwise (for example if the compressed output size was
80 : * larger than the raw input).
81 : *
82 : * Returns 1 if the data was actually compress and 0 otherwise. */
83 751 : static int compress_send_sb(grpc_compression_algorithm algorithm,
84 : gpr_slice_buffer *slices) {
85 : int did_compress;
86 : gpr_slice_buffer tmp;
87 751 : gpr_slice_buffer_init(&tmp);
88 751 : did_compress = grpc_msg_compress(algorithm, slices, &tmp);
89 751 : if (did_compress) {
90 64 : gpr_slice_buffer_swap(slices, &tmp);
91 : }
92 751 : gpr_slice_buffer_destroy(&tmp);
93 751 : return did_compress;
94 : }
95 :
96 : /** For each \a md element from the incoming metadata, filter out the entry for
97 : * "grpc-encoding", using its value to populate the call data's
98 : * compression_algorithm field. */
99 2808712 : static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
100 2808712 : grpc_call_element *elem = user_data;
101 2808712 : call_data *calld = elem->call_data;
102 2808712 : channel_data *channeld = elem->channel_data;
103 :
104 2808712 : if (md->key == channeld->mdstr_request_compression_algorithm_key) {
105 469 : const char *md_c_str = grpc_mdstr_as_c_string(md->value);
106 469 : if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
107 : &calld->compression_algorithm)) {
108 0 : gpr_log(GPR_ERROR,
109 : "Invalid compression algorithm: '%s' (unknown). Ignoring.",
110 : md_c_str);
111 0 : calld->compression_algorithm = GRPC_COMPRESS_NONE;
112 : }
113 938 : if (grpc_compression_options_is_algorithm_enabled(
114 469 : &channeld->compression_options, calld->compression_algorithm) ==
115 : 0) {
116 0 : gpr_log(GPR_ERROR,
117 : "Invalid compression algorithm: '%s' (previously disabled). "
118 : "Ignoring.",
119 : md_c_str);
120 0 : calld->compression_algorithm = GRPC_COMPRESS_NONE;
121 : }
122 469 : calld->has_compression_algorithm = 1;
123 469 : return NULL;
124 : }
125 :
126 2808243 : return md;
127 : }
128 :
129 3001239 : static int skip_compression(channel_data *channeld, call_data *calld) {
130 3001239 : if (calld->has_compression_algorithm) {
131 3001239 : if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
132 3000488 : return 1;
133 : }
134 751 : return 0; /* we have an actual call-specific algorithm */
135 : }
136 : /* no per-call compression override */
137 0 : return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
138 : }
139 :
140 : /** Assembles a new grpc_stream_op_buffer with the compressed slices, modifying
141 : * the associated GRPC_OP_BEGIN_MESSAGE accordingly (new compressed length,
142 : * flags indicating compression is in effect) and replaces \a send_ops with it.
143 : * */
144 64 : static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
145 : grpc_call_element *elem) {
146 : size_t i;
147 64 : call_data *calld = elem->call_data;
148 64 : int new_slices_added = 0; /* GPR_FALSE */
149 : grpc_metadata_batch metadata;
150 : grpc_stream_op_buffer new_send_ops;
151 64 : grpc_sopb_init(&new_send_ops);
152 :
153 256 : for (i = 0; i < send_ops->nops; i++) {
154 192 : grpc_stream_op *sop = &send_ops->ops[i];
155 192 : switch (sop->type) {
156 : case GRPC_OP_BEGIN_MESSAGE:
157 64 : GPR_ASSERT(calld->slices.length <= GPR_UINT32_MAX);
158 128 : grpc_sopb_add_begin_message(
159 64 : &new_send_ops, (gpr_uint32)calld->slices.length,
160 64 : sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
161 64 : break;
162 : case GRPC_OP_SLICE:
163 : /* Once we reach the slices section of the original buffer, simply add
164 : * all the new (compressed) slices. We obviously want to do this only
165 : * once, hence the "new_slices_added" guard. */
166 64 : if (!new_slices_added) {
167 : size_t j;
168 128 : for (j = 0; j < calld->slices.count; ++j) {
169 64 : grpc_sopb_add_slice(&new_send_ops,
170 64 : gpr_slice_ref(calld->slices.slices[j]));
171 : }
172 64 : new_slices_added = 1; /* GPR_TRUE */
173 : }
174 64 : break;
175 : case GRPC_OP_METADATA:
176 : /* move the metadata to the new buffer. */
177 64 : grpc_metadata_batch_move(&metadata, &sop->data.metadata);
178 64 : grpc_sopb_add_metadata(&new_send_ops, metadata);
179 64 : break;
180 : case GRPC_NO_OP:
181 0 : break;
182 : }
183 : }
184 64 : grpc_sopb_swap(send_ops, &new_send_ops);
185 64 : grpc_sopb_destroy(&new_send_ops);
186 64 : }
187 :
188 : /** Filter's "main" function, called for any incoming grpc_transport_stream_op
189 : * instance that holds a non-zero number of send operations, accesible to this
190 : * function in \a send_ops. */
191 3005398 : static void process_send_ops(grpc_call_element *elem,
192 : grpc_stream_op_buffer *send_ops) {
193 3005398 : call_data *calld = elem->call_data;
194 3005398 : channel_data *channeld = elem->channel_data;
195 : size_t i;
196 3005398 : int did_compress = 0;
197 :
198 : /* In streaming calls, we need to reset the previously accumulated slices */
199 3005398 : gpr_slice_buffer_reset_and_unref(&calld->slices);
200 13002732 : for (i = 0; i < send_ops->nops; ++i) {
201 9997704 : grpc_stream_op *sop = &send_ops->ops[i];
202 9997704 : switch (sop->type) {
203 : case GRPC_OP_BEGIN_MESSAGE:
204 : /* buffer up slices until we've processed all the expected ones (as
205 : * given by GRPC_OP_BEGIN_MESSAGE) */
206 3000623 : calld->remaining_slice_bytes = sop->data.begin_message.length;
207 3000623 : if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
208 40 : calld->has_compression_algorithm = 1; /* GPR_TRUE */
209 40 : calld->compression_algorithm = GRPC_COMPRESS_NONE;
210 : }
211 3000623 : break;
212 : case GRPC_OP_METADATA:
213 4003216 : if (!calld->written_initial_metadata) {
214 : /* Parse incoming request for compression. If any, it'll be available
215 : * at calld->compression_algorithm */
216 2703693 : grpc_metadata_batch_filter(&(sop->data.metadata),
217 : compression_md_filter, elem);
218 2703114 : if (!calld->has_compression_algorithm) {
219 : /* If no algorithm was found in the metadata and we aren't
220 : * exceptionally skipping compression, fall back to the channel
221 : * default */
222 2702688 : calld->compression_algorithm =
223 2702688 : channeld->default_compression_algorithm;
224 2702688 : calld->has_compression_algorithm = 1; /* GPR_TRUE */
225 : }
226 : /* hint compression algorithm */
227 2703114 : grpc_metadata_batch_add_tail(
228 : &(sop->data.metadata), &calld->compression_algorithm_storage,
229 2703114 : GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms
230 : [calld->compression_algorithm]));
231 :
232 : /* convey supported compression algorithms */
233 2703520 : grpc_metadata_batch_add_tail(
234 : &(sop->data.metadata), &calld->accept_encoding_storage,
235 : GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
236 :
237 2703374 : calld->written_initial_metadata = 1; /* GPR_TRUE */
238 : }
239 4002897 : break;
240 : case GRPC_OP_SLICE:
241 2999083 : if (skip_compression(channeld, calld)) continue;
242 751 : GPR_ASSERT(calld->remaining_slice_bytes > 0);
243 : /* Increase input ref count, gpr_slice_buffer_add takes ownership. */
244 751 : gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice));
245 751 : GPR_ASSERT(GPR_SLICE_LENGTH(sop->data.slice) <=
246 : calld->remaining_slice_bytes);
247 1502 : calld->remaining_slice_bytes -=
248 751 : (gpr_uint32)GPR_SLICE_LENGTH(sop->data.slice);
249 751 : if (calld->remaining_slice_bytes == 0) {
250 751 : did_compress =
251 751 : compress_send_sb(calld->compression_algorithm, &calld->slices);
252 : }
253 751 : break;
254 : case GRPC_NO_OP:
255 0 : break;
256 : }
257 : }
258 :
259 : /* Modify the send_ops stream_op_buffer depending on whether compression was
260 : * carried out */
261 3005028 : if (did_compress) {
262 64 : finish_compressed_sopb(send_ops, elem);
263 : }
264 3005028 : }
265 :
266 : /* Called either:
267 : - in response to an API call (or similar) from above, to send something
268 : - a network event (or similar) from below, to receive something
269 : op contains type and call direction information, in addition to the data
270 : that is being sent or received. */
271 7744443 : static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
272 : grpc_call_element *elem,
273 : grpc_transport_stream_op *op) {
274 7744443 : if (op->send_ops && op->send_ops->nops > 0) {
275 3005572 : process_send_ops(elem, op->send_ops);
276 : }
277 :
278 : /* pass control down the stack */
279 7743060 : grpc_call_next_op(exec_ctx, elem, op);
280 7757686 : }
281 :
282 : /* Constructor for call_data */
283 2703392 : static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
284 : const void *server_transport_data,
285 : grpc_transport_stream_op *initial_op) {
286 : /* grab pointers to our data from the call element */
287 2703392 : call_data *calld = elem->call_data;
288 :
289 : /* initialize members */
290 2703392 : gpr_slice_buffer_init(&calld->slices);
291 2704231 : calld->has_compression_algorithm = 0;
292 2704231 : calld->written_initial_metadata = 0; /* GPR_FALSE */
293 :
294 2704231 : if (initial_op) {
295 1301298 : if (initial_op->send_ops && initial_op->send_ops->nops > 0) {
296 0 : process_send_ops(elem, initial_op->send_ops);
297 : }
298 : }
299 2704231 : }
300 :
301 : /* Destructor for call_data */
302 2701828 : static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
303 : grpc_call_element *elem) {
304 : /* grab pointers to our data from the call element */
305 2701828 : call_data *calld = elem->call_data;
306 2701828 : gpr_slice_buffer_destroy(&calld->slices);
307 2703482 : }
308 :
309 : /* Constructor for channel_data */
310 3739 : static void init_channel_elem(grpc_exec_ctx *exec_ctx,
311 : grpc_channel_element *elem, grpc_channel *master,
312 : const grpc_channel_args *args, grpc_mdctx *mdctx,
313 : int is_first, int is_last) {
314 3739 : channel_data *channeld = elem->channel_data;
315 : grpc_compression_algorithm algo_idx;
316 : const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1];
317 3739 : size_t supported_algorithms_idx = 0;
318 : char *accept_encoding_str;
319 : size_t accept_encoding_str_len;
320 :
321 3739 : grpc_compression_options_init(&channeld->compression_options);
322 3739 : channeld->compression_options.enabled_algorithms_bitset =
323 3739 : (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(args);
324 :
325 3739 : channeld->default_compression_algorithm =
326 3739 : grpc_channel_args_get_compression_algorithm(args);
327 : /* Make sure the default isn't disabled. */
328 3739 : GPR_ASSERT(grpc_compression_options_is_algorithm_enabled(
329 : &channeld->compression_options, channeld->default_compression_algorithm));
330 3739 : channeld->compression_options.default_compression_algorithm =
331 3739 : channeld->default_compression_algorithm;
332 :
333 3739 : channeld->mdstr_request_compression_algorithm_key =
334 3739 : grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY);
335 :
336 3739 : channeld->mdstr_outgoing_compression_algorithm_key =
337 3739 : grpc_mdstr_from_string(mdctx, "grpc-encoding");
338 :
339 3739 : channeld->mdstr_compression_capabilities_key =
340 3739 : grpc_mdstr_from_string(mdctx, "grpc-accept-encoding");
341 :
342 14956 : for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
343 : char *algorithm_name;
344 : /* skip disabled algorithms */
345 11217 : if (grpc_compression_options_is_algorithm_enabled(
346 11217 : &channeld->compression_options, algo_idx) == 0) {
347 0 : continue;
348 : }
349 11217 : GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0);
350 11217 : channeld->mdelem_compression_algorithms[algo_idx] =
351 11217 : grpc_mdelem_from_metadata_strings(
352 : mdctx,
353 : GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
354 : grpc_mdstr_from_string(mdctx, algorithm_name));
355 11217 : if (algo_idx > 0) {
356 7478 : supported_algorithms_names[supported_algorithms_idx++] = algorithm_name;
357 : }
358 : }
359 :
360 : /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
361 : * arrays, as to avoid the heap allocs */
362 3739 : accept_encoding_str =
363 : gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx, ",",
364 : &accept_encoding_str_len);
365 :
366 3739 : channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings(
367 : mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
368 : grpc_mdstr_from_string(mdctx, accept_encoding_str));
369 3739 : gpr_free(accept_encoding_str);
370 :
371 3739 : GPR_ASSERT(!is_last);
372 3739 : }
373 :
374 : /* Destructor for channel data */
375 3739 : static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
376 : grpc_channel_element *elem) {
377 3739 : channel_data *channeld = elem->channel_data;
378 : grpc_compression_algorithm algo_idx;
379 :
380 3739 : GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key);
381 3739 : GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key);
382 3739 : GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key);
383 14956 : for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
384 11217 : GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]);
385 : }
386 3739 : GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding);
387 3739 : }
388 :
389 : const grpc_channel_filter grpc_compress_filter = {
390 : compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
391 : init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem,
392 : destroy_channel_elem, grpc_call_next_get_peer, "compress"};
|