LCOV - code coverage report
Current view: top level - src/core/channel - compress_filter.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 141 150 94.0 %
Date: 2015-10-10 Functions: 10 10 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include <assert.h>
      35             : #include <string.h>
      36             : 
      37             : #include <grpc/compression.h>
      38             : #include <grpc/support/alloc.h>
      39             : #include <grpc/support/log.h>
      40             : #include <grpc/support/slice_buffer.h>
      41             : 
      42             : #include "src/core/channel/compress_filter.h"
      43             : #include "src/core/channel/channel_args.h"
      44             : #include "src/core/compression/message_compress.h"
      45             : #include "src/core/support/string.h"
      46             : 
      47             : typedef struct call_data {
      48             :   gpr_slice_buffer slices; /**< Buffers up input slices to be compressed */
      49             :   grpc_linked_mdelem compression_algorithm_storage;
      50             :   grpc_linked_mdelem accept_encoding_storage;
      51             :   gpr_uint32 remaining_slice_bytes;
      52             :   /**< Input data to be read, as per BEGIN_MESSAGE */
      53             :   int written_initial_metadata; /**< Already processed initial md? */
      54             :   /** Compression algorithm we'll try to use. It may be given by incoming
      55             :    * metadata, or by the channel's default compression settings. */
      56             :   grpc_compression_algorithm compression_algorithm;
      57             :   /** If true, contents of \a compression_algorithm are authoritative */
      58             :   int has_compression_algorithm;
      59             : } call_data;
      60             : 
      61             : typedef struct channel_data {
      62             :   /** Metadata key for the incoming (requested) compression algorithm */
      63             :   grpc_mdstr *mdstr_request_compression_algorithm_key;
      64             :   /** Metadata key for the outgoing (used) compression algorithm */
      65             :   grpc_mdstr *mdstr_outgoing_compression_algorithm_key;
      66             :   /** Metadata key for the accepted encodings */
      67             :   grpc_mdstr *mdstr_compression_capabilities_key;
      68             :   /** Precomputed metadata elements for all available compression algorithms */
      69             :   grpc_mdelem *mdelem_compression_algorithms[GRPC_COMPRESS_ALGORITHMS_COUNT];
      70             :   /** Precomputed metadata elements for the accepted encodings */
      71             :   grpc_mdelem *mdelem_accept_encoding;
      72             :   /** The default, channel-level, compression algorithm */
      73             :   grpc_compression_algorithm default_compression_algorithm;
      74             :   /** Compression options for the channel */
      75             :   grpc_compression_options compression_options;
      76             : } channel_data;
      77             : 
      78             : /** Compress \a slices in place using \a algorithm. Returns 1 if compression did
      79             :  * actually happen, 0 otherwise (for example if the compressed output size was
      80             :  * larger than the raw input).
      81             :  *
      82             :  * Returns 1 if the data was actually compress and 0 otherwise. */
      83         751 : static int compress_send_sb(grpc_compression_algorithm algorithm,
      84             :                             gpr_slice_buffer *slices) {
      85             :   int did_compress;
      86             :   gpr_slice_buffer tmp;
      87         751 :   gpr_slice_buffer_init(&tmp);
      88         751 :   did_compress = grpc_msg_compress(algorithm, slices, &tmp);
      89         751 :   if (did_compress) {
      90          64 :     gpr_slice_buffer_swap(slices, &tmp);
      91             :   }
      92         751 :   gpr_slice_buffer_destroy(&tmp);
      93         751 :   return did_compress;
      94             : }
      95             : 
      96             : /** For each \a md element from the incoming metadata, filter out the entry for
      97             :  * "grpc-encoding", using its value to populate the call data's
      98             :  * compression_algorithm field. */
      99     2808712 : static grpc_mdelem *compression_md_filter(void *user_data, grpc_mdelem *md) {
     100     2808712 :   grpc_call_element *elem = user_data;
     101     2808712 :   call_data *calld = elem->call_data;
     102     2808712 :   channel_data *channeld = elem->channel_data;
     103             : 
     104     2808712 :   if (md->key == channeld->mdstr_request_compression_algorithm_key) {
     105         469 :     const char *md_c_str = grpc_mdstr_as_c_string(md->value);
     106         469 :     if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
     107             :                                           &calld->compression_algorithm)) {
     108           0 :       gpr_log(GPR_ERROR,
     109             :               "Invalid compression algorithm: '%s' (unknown). Ignoring.",
     110             :               md_c_str);
     111           0 :       calld->compression_algorithm = GRPC_COMPRESS_NONE;
     112             :     }
     113         938 :     if (grpc_compression_options_is_algorithm_enabled(
     114         469 :             &channeld->compression_options, calld->compression_algorithm) ==
     115             :         0) {
     116           0 :       gpr_log(GPR_ERROR,
     117             :               "Invalid compression algorithm: '%s' (previously disabled). "
     118             :               "Ignoring.",
     119             :               md_c_str);
     120           0 :       calld->compression_algorithm = GRPC_COMPRESS_NONE;
     121             :     }
     122         469 :     calld->has_compression_algorithm = 1;
     123         469 :     return NULL;
     124             :   }
     125             : 
     126     2808243 :   return md;
     127             : }
     128             : 
     129     3001239 : static int skip_compression(channel_data *channeld, call_data *calld) {
     130     3001239 :   if (calld->has_compression_algorithm) {
     131     3001239 :     if (calld->compression_algorithm == GRPC_COMPRESS_NONE) {
     132     3000488 :       return 1;
     133             :     }
     134         751 :     return 0; /* we have an actual call-specific algorithm */
     135             :   }
     136             :   /* no per-call compression override */
     137           0 :   return channeld->default_compression_algorithm == GRPC_COMPRESS_NONE;
     138             : }
     139             : 
     140             : /** Assembles a new grpc_stream_op_buffer with the compressed slices, modifying
     141             :  * the associated GRPC_OP_BEGIN_MESSAGE accordingly (new compressed length,
     142             :  * flags indicating compression is in effect) and replaces \a send_ops with it.
     143             :  * */
     144          64 : static void finish_compressed_sopb(grpc_stream_op_buffer *send_ops,
     145             :                                    grpc_call_element *elem) {
     146             :   size_t i;
     147          64 :   call_data *calld = elem->call_data;
     148          64 :   int new_slices_added = 0; /* GPR_FALSE */
     149             :   grpc_metadata_batch metadata;
     150             :   grpc_stream_op_buffer new_send_ops;
     151          64 :   grpc_sopb_init(&new_send_ops);
     152             : 
     153         256 :   for (i = 0; i < send_ops->nops; i++) {
     154         192 :     grpc_stream_op *sop = &send_ops->ops[i];
     155         192 :     switch (sop->type) {
     156             :       case GRPC_OP_BEGIN_MESSAGE:
     157          64 :         GPR_ASSERT(calld->slices.length <= GPR_UINT32_MAX);
     158         128 :         grpc_sopb_add_begin_message(
     159          64 :             &new_send_ops, (gpr_uint32)calld->slices.length,
     160          64 :             sop->data.begin_message.flags | GRPC_WRITE_INTERNAL_COMPRESS);
     161          64 :         break;
     162             :       case GRPC_OP_SLICE:
     163             :         /* Once we reach the slices section of the original buffer, simply add
     164             :          * all the new (compressed) slices. We obviously want to do this only
     165             :          * once, hence the "new_slices_added" guard. */
     166          64 :         if (!new_slices_added) {
     167             :           size_t j;
     168         128 :           for (j = 0; j < calld->slices.count; ++j) {
     169          64 :             grpc_sopb_add_slice(&new_send_ops,
     170          64 :                                 gpr_slice_ref(calld->slices.slices[j]));
     171             :           }
     172          64 :           new_slices_added = 1; /* GPR_TRUE */
     173             :         }
     174          64 :         break;
     175             :       case GRPC_OP_METADATA:
     176             :         /* move the metadata to the new buffer. */
     177          64 :         grpc_metadata_batch_move(&metadata, &sop->data.metadata);
     178          64 :         grpc_sopb_add_metadata(&new_send_ops, metadata);
     179          64 :         break;
     180             :       case GRPC_NO_OP:
     181           0 :         break;
     182             :     }
     183             :   }
     184          64 :   grpc_sopb_swap(send_ops, &new_send_ops);
     185          64 :   grpc_sopb_destroy(&new_send_ops);
     186          64 : }
     187             : 
     188             : /** Filter's "main" function, called for any incoming grpc_transport_stream_op
     189             :  * instance that holds a non-zero number of send operations, accesible to this
     190             :  * function in \a send_ops.  */
     191     3005398 : static void process_send_ops(grpc_call_element *elem,
     192             :                              grpc_stream_op_buffer *send_ops) {
     193     3005398 :   call_data *calld = elem->call_data;
     194     3005398 :   channel_data *channeld = elem->channel_data;
     195             :   size_t i;
     196     3005398 :   int did_compress = 0;
     197             : 
     198             :   /* In streaming calls, we need to reset the previously accumulated slices */
     199     3005398 :   gpr_slice_buffer_reset_and_unref(&calld->slices);
     200    13002732 :   for (i = 0; i < send_ops->nops; ++i) {
     201     9997704 :     grpc_stream_op *sop = &send_ops->ops[i];
     202     9997704 :     switch (sop->type) {
     203             :       case GRPC_OP_BEGIN_MESSAGE:
     204             :         /* buffer up slices until we've processed all the expected ones (as
     205             :          * given by GRPC_OP_BEGIN_MESSAGE) */
     206     3000623 :         calld->remaining_slice_bytes = sop->data.begin_message.length;
     207     3000623 :         if (sop->data.begin_message.flags & GRPC_WRITE_NO_COMPRESS) {
     208          40 :           calld->has_compression_algorithm = 1; /* GPR_TRUE */
     209          40 :           calld->compression_algorithm = GRPC_COMPRESS_NONE;
     210             :         }
     211     3000623 :         break;
     212             :       case GRPC_OP_METADATA:
     213     4003216 :         if (!calld->written_initial_metadata) {
     214             :           /* Parse incoming request for compression. If any, it'll be available
     215             :            * at calld->compression_algorithm */
     216     2703693 :           grpc_metadata_batch_filter(&(sop->data.metadata),
     217             :                                      compression_md_filter, elem);
     218     2703114 :           if (!calld->has_compression_algorithm) {
     219             :             /* If no algorithm was found in the metadata and we aren't
     220             :              * exceptionally skipping compression, fall back to the channel
     221             :              * default */
     222     2702688 :             calld->compression_algorithm =
     223     2702688 :                 channeld->default_compression_algorithm;
     224     2702688 :             calld->has_compression_algorithm = 1; /* GPR_TRUE */
     225             :           }
     226             :           /* hint compression algorithm */
     227     2703114 :           grpc_metadata_batch_add_tail(
     228             :               &(sop->data.metadata), &calld->compression_algorithm_storage,
     229     2703114 :               GRPC_MDELEM_REF(channeld->mdelem_compression_algorithms
     230             :                                   [calld->compression_algorithm]));
     231             : 
     232             :           /* convey supported compression algorithms */
     233     2703520 :           grpc_metadata_batch_add_tail(
     234             :               &(sop->data.metadata), &calld->accept_encoding_storage,
     235             :               GRPC_MDELEM_REF(channeld->mdelem_accept_encoding));
     236             : 
     237     2703374 :           calld->written_initial_metadata = 1; /* GPR_TRUE */
     238             :         }
     239     4002897 :         break;
     240             :       case GRPC_OP_SLICE:
     241     2999083 :         if (skip_compression(channeld, calld)) continue;
     242         751 :         GPR_ASSERT(calld->remaining_slice_bytes > 0);
     243             :         /* Increase input ref count, gpr_slice_buffer_add takes ownership.  */
     244         751 :         gpr_slice_buffer_add(&calld->slices, gpr_slice_ref(sop->data.slice));
     245         751 :         GPR_ASSERT(GPR_SLICE_LENGTH(sop->data.slice) <=
     246             :                    calld->remaining_slice_bytes);
     247        1502 :         calld->remaining_slice_bytes -=
     248         751 :             (gpr_uint32)GPR_SLICE_LENGTH(sop->data.slice);
     249         751 :         if (calld->remaining_slice_bytes == 0) {
     250         751 :           did_compress =
     251         751 :               compress_send_sb(calld->compression_algorithm, &calld->slices);
     252             :         }
     253         751 :         break;
     254             :       case GRPC_NO_OP:
     255           0 :         break;
     256             :     }
     257             :   }
     258             : 
     259             :   /* Modify the send_ops stream_op_buffer depending on whether compression was
     260             :    * carried out */
     261     3005028 :   if (did_compress) {
     262          64 :     finish_compressed_sopb(send_ops, elem);
     263             :   }
     264     3005028 : }
     265             : 
     266             : /* Called either:
     267             :      - in response to an API call (or similar) from above, to send something
     268             :      - a network event (or similar) from below, to receive something
     269             :    op contains type and call direction information, in addition to the data
     270             :    that is being sent or received. */
     271     7744443 : static void compress_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
     272             :                                                grpc_call_element *elem,
     273             :                                                grpc_transport_stream_op *op) {
     274     7744443 :   if (op->send_ops && op->send_ops->nops > 0) {
     275     3005572 :     process_send_ops(elem, op->send_ops);
     276             :   }
     277             : 
     278             :   /* pass control down the stack */
     279     7743060 :   grpc_call_next_op(exec_ctx, elem, op);
     280     7757686 : }
     281             : 
     282             : /* Constructor for call_data */
     283     2703392 : static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
     284             :                            const void *server_transport_data,
     285             :                            grpc_transport_stream_op *initial_op) {
     286             :   /* grab pointers to our data from the call element */
     287     2703392 :   call_data *calld = elem->call_data;
     288             : 
     289             :   /* initialize members */
     290     2703392 :   gpr_slice_buffer_init(&calld->slices);
     291     2704231 :   calld->has_compression_algorithm = 0;
     292     2704231 :   calld->written_initial_metadata = 0; /* GPR_FALSE */
     293             : 
     294     2704231 :   if (initial_op) {
     295     1301298 :     if (initial_op->send_ops && initial_op->send_ops->nops > 0) {
     296           0 :       process_send_ops(elem, initial_op->send_ops);
     297             :     }
     298             :   }
     299     2704231 : }
     300             : 
     301             : /* Destructor for call_data */
     302     2701828 : static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
     303             :                               grpc_call_element *elem) {
     304             :   /* grab pointers to our data from the call element */
     305     2701828 :   call_data *calld = elem->call_data;
     306     2701828 :   gpr_slice_buffer_destroy(&calld->slices);
     307     2703482 : }
     308             : 
     309             : /* Constructor for channel_data */
     310        3739 : static void init_channel_elem(grpc_exec_ctx *exec_ctx,
     311             :                               grpc_channel_element *elem, grpc_channel *master,
     312             :                               const grpc_channel_args *args, grpc_mdctx *mdctx,
     313             :                               int is_first, int is_last) {
     314        3739 :   channel_data *channeld = elem->channel_data;
     315             :   grpc_compression_algorithm algo_idx;
     316             :   const char *supported_algorithms_names[GRPC_COMPRESS_ALGORITHMS_COUNT - 1];
     317        3739 :   size_t supported_algorithms_idx = 0;
     318             :   char *accept_encoding_str;
     319             :   size_t accept_encoding_str_len;
     320             : 
     321        3739 :   grpc_compression_options_init(&channeld->compression_options);
     322        3739 :   channeld->compression_options.enabled_algorithms_bitset =
     323        3739 :       (gpr_uint32)grpc_channel_args_compression_algorithm_get_states(args);
     324             : 
     325        3739 :   channeld->default_compression_algorithm =
     326        3739 :       grpc_channel_args_get_compression_algorithm(args);
     327             :   /* Make sure the default isn't disabled. */
     328        3739 :   GPR_ASSERT(grpc_compression_options_is_algorithm_enabled(
     329             :       &channeld->compression_options, channeld->default_compression_algorithm));
     330        3739 :   channeld->compression_options.default_compression_algorithm =
     331        3739 :       channeld->default_compression_algorithm;
     332             : 
     333        3739 :   channeld->mdstr_request_compression_algorithm_key =
     334        3739 :       grpc_mdstr_from_string(mdctx, GRPC_COMPRESS_REQUEST_ALGORITHM_KEY);
     335             : 
     336        3739 :   channeld->mdstr_outgoing_compression_algorithm_key =
     337        3739 :       grpc_mdstr_from_string(mdctx, "grpc-encoding");
     338             : 
     339        3739 :   channeld->mdstr_compression_capabilities_key =
     340        3739 :       grpc_mdstr_from_string(mdctx, "grpc-accept-encoding");
     341             : 
     342       14956 :   for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
     343             :     char *algorithm_name;
     344             :     /* skip disabled algorithms */
     345       11217 :     if (grpc_compression_options_is_algorithm_enabled(
     346       11217 :             &channeld->compression_options, algo_idx) == 0) {
     347           0 :       continue;
     348             :     }
     349       11217 :     GPR_ASSERT(grpc_compression_algorithm_name(algo_idx, &algorithm_name) != 0);
     350       11217 :     channeld->mdelem_compression_algorithms[algo_idx] =
     351       11217 :         grpc_mdelem_from_metadata_strings(
     352             :             mdctx,
     353             :             GRPC_MDSTR_REF(channeld->mdstr_outgoing_compression_algorithm_key),
     354             :             grpc_mdstr_from_string(mdctx, algorithm_name));
     355       11217 :     if (algo_idx > 0) {
     356        7478 :       supported_algorithms_names[supported_algorithms_idx++] = algorithm_name;
     357             :     }
     358             :   }
     359             : 
     360             :   /* TODO(dgq): gpr_strjoin_sep could be made to work with statically allocated
     361             :    * arrays, as to avoid the heap allocs */
     362        3739 :   accept_encoding_str =
     363             :       gpr_strjoin_sep(supported_algorithms_names, supported_algorithms_idx, ",",
     364             :                       &accept_encoding_str_len);
     365             : 
     366        3739 :   channeld->mdelem_accept_encoding = grpc_mdelem_from_metadata_strings(
     367             :       mdctx, GRPC_MDSTR_REF(channeld->mdstr_compression_capabilities_key),
     368             :       grpc_mdstr_from_string(mdctx, accept_encoding_str));
     369        3739 :   gpr_free(accept_encoding_str);
     370             : 
     371        3739 :   GPR_ASSERT(!is_last);
     372        3739 : }
     373             : 
     374             : /* Destructor for channel data */
     375        3739 : static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
     376             :                                  grpc_channel_element *elem) {
     377        3739 :   channel_data *channeld = elem->channel_data;
     378             :   grpc_compression_algorithm algo_idx;
     379             : 
     380        3739 :   GRPC_MDSTR_UNREF(channeld->mdstr_request_compression_algorithm_key);
     381        3739 :   GRPC_MDSTR_UNREF(channeld->mdstr_outgoing_compression_algorithm_key);
     382        3739 :   GRPC_MDSTR_UNREF(channeld->mdstr_compression_capabilities_key);
     383       14956 :   for (algo_idx = 0; algo_idx < GRPC_COMPRESS_ALGORITHMS_COUNT; ++algo_idx) {
     384       11217 :     GRPC_MDELEM_UNREF(channeld->mdelem_compression_algorithms[algo_idx]);
     385             :   }
     386        3739 :   GRPC_MDELEM_UNREF(channeld->mdelem_accept_encoding);
     387        3739 : }
     388             : 
     389             : const grpc_channel_filter grpc_compress_filter = {
     390             :     compress_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
     391             :     init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem,
     392             :     destroy_channel_elem, grpc_call_next_get_peer, "compress"};

Generated by: LCOV version 1.10