Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <assert.h>
35 : #include <string.h>
36 :
37 : #include <grpc/compression.h>
38 : #include <grpc/support/alloc.h>
39 : #include <grpc/support/log.h>
40 : #include <grpc/support/slice_buffer.h>
41 :
42 : #include "src/core/channel/channel_args.h"
43 : #include "src/core/channel/compress_filter.h"
44 : #include "src/core/compression/algorithm_metadata.h"
45 : #include "src/core/compression/message_compress.h"
46 : #include "src/core/profiling/timers.h"
47 : #include "src/core/support/string.h"
48 : #include "src/core/transport/static_metadata.h"
49 :
50 : typedef struct call_data {
51 : gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
52 : grpc_linked_mdelem compression_algorithm_storage;
53 : grpc_linked_mdelem accept_encoding_storage;
54 : gpr_uint32 remaining_slice_bytes;
55 : /** Compression algorithm we'll try to use. It may be given by incoming
56 : * metadata, or by the channel's default compression settings. */
57 : grpc_compression_algorithm compression_algorithm;
58 : /** If true, contents of \a compression_algorithm are authoritative */
59 : int has_compression_algorithm;
60 :
61 : grpc_transport_stream_op send_op;
62 : gpr_uint32 send_length;
63 : gpr_uint32 send_flags;
64 : gpr_slice incoming_slice;
65 : grpc_slice_buffer_stream replacement_stream;
66 : grpc_closure *post_send;
67 : grpc_closure send_done;
68 : grpc_closure got_slice;
69 : } call_data;
70 :
71 : typedef struct channel_data {
72 : /** The default, channel-level, compression algorithm */
73 : grpc_compression_algorithm default_compression_algorithm;
74 : /** Compression options for the channel */
75 : grpc_compression_options compression_options;
76 : /** Supported compression algorithms */
77 : gpr_uint32 supported_compression_algorithms;
78 : } channel_data;
79 :
80 : /** For each \a md element from the incoming metadata, filter out the entry for
81 : * "grpc-encoding", using its value to populate the call data's
82 : * compression_algorithm field. */
83 6631241 : static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
84 6630998 : grpc_call_element *elem = user_data;
85 6631241 : call_data *calld = elem->call_data;
86 6631241 : channel_data *channeld = elem->channel_data;
87 :
88 6631241 : if (md->key == GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST) {
89 475 : const char *md_c_str = grpc_mdstr_as_c_string(md->value);
90 475 : if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
91 : &calld->compression_algorithm)) {
92 0 : gpr_log(GPR_ERROR,
93 : "Invalid compression algorithm: '%s' (unknown). Ignoring.",
94 : md_c_str);
95 0 : calld->compression_algorithm = GRPC_COMPRESS_NONE;
96 : }
97 950 : if (grpc_compression_options_is_algorithm_enabled(
98 475 : &channeld->compression_options, calld->compression_algorithm) ==
99 : 0) {
100 0 : gpr_log(GPR_ERROR,
101 : "Invalid compression algorithm: '%s' (previously disabled). "
102 : "Ignoring.",
103 : md_c_str);
104 0 : calld->compression_algorithm = GRPC_COMPRESS_NONE;
105 : }
106 475 : calld->has_compression_algorithm = 1;
107 475 : return NULL;
108 : }
109 :
110 6630523 : return md;
111 : }
112 :
113 3959505 : static int skip_compression(grpc_call_element *elem) {
114 3959505 : call_data *calld = elem->call_data;
115 3959505 : channel_data *channeld = elem->channel_data;
116 3959696 : if (calld->has_compression_algorithm) {
117 3959696 : if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
118 3958724 : return 1;
119 : }
120 781 : return 0; /* we have an actual call-specific algorithm */
121 : }
122 : /* no per-call compression override */
123 0 : return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
124 : }
125 :
126 : /** Filter initial metadata */
127 4337281 : static void process_send_initial_metadata(
128 : grpc_call_element *elem, grpc_metadata_batch *initial_metadata) {
129 4337281 : call_data *calld = elem->call_data;
130 4337281 : channel_data *channeld = elem->channel_data;
131 : /* Parse incoming request for compression. If any, it'll be available
132 : * at calld->compression_algorithm */
133 4337281 : grpc_metadata_batch_filter(initial_metadata, compression_md_filter, elem);
134 4332444 : if (!calld->has_compression_algorithm) {
135 : /* If no algorithm was found in the metadata and we aren't
136 : * exceptionally skipping compression, fall back to the channel
137 : * default */
138 4333205 : calld->compression_algorithm = channeld->default_compression_algorithm;
139 4333205 : calld->has_compression_algorithm = 1; /* GPR_TRUE */
140 : }
141 : /* hint compression algorithm */
142 4332444 : grpc_metadata_batch_add_tail(
143 : initial_metadata, &calld->compression_algorithm_storage,
144 : grpc_compression_encoding_mdelem(calld->compression_algorithm));
145 :
146 : /* convey supported compression algorithms */
147 4330261 : grpc_metadata_batch_add_tail(initial_metadata,
148 : &calld->accept_encoding_storage,
149 4330261 : GRPC_MDELEM_ACCEPT_ENCODING_FOR_ALGORITHMS(
150 : channeld->supported_compression_algorithms));
151 4334096 : }
152 :
153 : static void continue_send_message(grpc_exec_ctx *exec_ctx,
154 : grpc_call_element *elem);
155 :
156 757 : static void send_done(grpc_exec_ctx *exec_ctx, void *elemp, int success) {
157 757 : grpc_call_element *elem = elemp;
158 757 : call_data *calld = elem->call_data;
159 757 : gpr_slice_buffer_reset_and_unref(&calld->slices);
160 757 : calld->post_send->cb(exec_ctx, calld->post_send->cb_arg, success);
161 757 : }
162 :
163 757 : static void finish_send_message(grpc_exec_ctx *exec_ctx,
164 : grpc_call_element *elem) {
165 757 : call_data *calld = elem->call_data;
166 : int did_compress;
167 : gpr_slice_buffer tmp;
168 757 : gpr_slice_buffer_init(&tmp);
169 757 : did_compress =
170 757 : grpc_msg_compress(calld->compression_algorithm, &calld->slices, &tmp);
171 757 : if (did_compress) {
172 70 : gpr_slice_buffer_swap(&calld->slices, &tmp);
173 70 : calld->send_flags |= GRPC_WRITE_INTERNAL_COMPRESS;
174 : }
175 757 : gpr_slice_buffer_destroy(&tmp);
176 :
177 757 : grpc_slice_buffer_stream_init(&calld->replacement_stream, &calld->slices,
178 : calld->send_flags);
179 757 : calld->send_op.send_message = &calld->replacement_stream.base;
180 757 : calld->post_send = calld->send_op.on_complete;
181 757 : calld->send_op.on_complete = &calld->send_done;
182 :
183 757 : grpc_call_next_op(exec_ctx, elem, &calld->send_op);
184 757 : }
185 :
186 0 : static void got_slice(grpc_exec_ctx *exec_ctx, void *elemp, int success) {
187 0 : grpc_call_element *elem = elemp;
188 0 : call_data *calld = elem->call_data;
189 0 : gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
190 0 : if (calld->send_length == calld->slices.length) {
191 0 : finish_send_message(exec_ctx, elem);
192 : } else {
193 0 : continue_send_message(exec_ctx, elem);
194 : }
195 0 : }
196 :
197 757 : static void continue_send_message(grpc_exec_ctx *exec_ctx,
198 : grpc_call_element *elem) {
199 757 : call_data *calld = elem->call_data;
200 1514 : while (grpc_byte_stream_next(exec_ctx, calld->send_op.send_message,
201 : &calld->incoming_slice, ~(size_t)0,
202 : &calld->got_slice)) {
203 757 : gpr_slice_buffer_add(&calld->slices, calld->incoming_slice);
204 757 : if (calld->send_length == calld->slices.length) {
205 757 : finish_send_message(exec_ctx, elem);
206 757 : break;
207 : }
208 : }
209 757 : }
210 :
211 12144589 : static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
212 : grpc_call_element *elem,
213 : grpc_transport_stream_op *op) {
214 12144589 : call_data *calld = elem->call_data;
215 :
216 : GPR_TIMER_BEGIN("compress_start_transport_stream_op", 0);
217 :
218 12144589 : if (op->send_initial_metadata) {
219 4338009 : process_send_initial_metadata(elem, op->send_initial_metadata);
220 : }
221 12161031 : if (op->send_message != NULL && !skip_compression(elem) &&
222 781 : 0 == (op->send_message->flags & GRPC_WRITE_NO_COMPRESS)) {
223 757 : calld->send_op = *op;
224 757 : calld->send_length = op->send_message->length;
225 757 : calld->send_flags = op->send_message->flags;
226 757 : continue_send_message(exec_ctx, elem);
227 : } else {
228 : /* pass control down the stack */
229 12159615 : grpc_call_next_op(exec_ctx, elem, op);
230 : }
231 :
232 : GPR_TIMER_END("compress_start_transport_stream_op", 0);
233 12167962 : }
234 :
235 : /* Constructor for call_data */
236 4336690 : static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
237 : grpc_call_element_args *args) {
238 : /* grab pointers to our data from the call element */
239 4336690 : call_data *calld = elem->call_data;
240 :
241 : /* initialize members */
242 4336690 : gpr_slice_buffer_init(&calld->slices);
243 4337422 : calld->has_compression_algorithm = 0;
244 4337422 : grpc_closure_init(&calld->got_slice, got_slice, elem);
245 4336978 : grpc_closure_init(&calld->send_done, send_done, elem);
246 4336332 : }
247 :
248 : /* Destructor for call_data */
249 4334975 : static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
250 : grpc_call_element *elem) {
251 : /* grab pointers to our data from the call element */
252 4334975 : call_data *calld = elem->call_data;
253 4334975 : gpr_slice_buffer_destroy(&calld->slices);
254 4336455 : }
255 :
256 : /* Constructor for channel_data */
257 5734 : static void init_channel_elem(grpc_exec_ctx *exec_ctx,
258 : grpc_channel_element *elem,
259 : grpc_channel_element_args *args) {
260 5734 : channel_data *channeld = elem->channel_data;
261 : grpc_compression_algorithm algo_idx;
262 :
263 5734 : grpc_compression_options_init(&channeld->compression_options);
264 5734 : channeld->compression_options.enabled_algorithms_bitset =
265 5734 : (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(
266 : args->channel_args);
267 :
268 5734 : channeld->default_compression_algorithm =
269 5734 : grpc_channel_args_get_compression_algorithm(args->channel_args);
270 : /* Make sure the default isn't disabled. */
271 5734 : GPR_ASSERT(grpc_compression_options_is_algorithm_enabled(
272 : &channeld->compression_options, channeld->default_compression_algorithm));
273 5734 : channeld->compression_options.default_compression_algorithm =
274 5734 : channeld->default_compression_algorithm;
275 :
276 5734 : channeld->supported_compression_algorithms = 0;
277 22936 : for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
278 : /* skip disabled algorithms */
279 17202 : if (grpc_compression_options_is_algorithm_enabled(
280 16905 : &channeld->compression_options, algo_idx) == 0) {
281 0 : continue;
282 : }
283 17202 : channeld->supported_compression_algorithms |= 1u << algo_idx;
284 : }
285 :
286 5734 : GPR_ASSERT(!args->is_last);
287 5734 : }
288 :
289 : /* Destructor for channel data */
290 5642 : static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
291 : grpc_channel_element *elem) {
292 5642 : }
293 :
294 : const grpc_channel_filter grpc_compress_filter = {
295 : compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
296 : init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
297 : sizeof(channel_data), init_channel_elem, destroy_channel_elem,
298 : grpc_call_next_get_peer, "compress"};
|