LCOV - code coverage report
Current view: top level - src/core/surface - call.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 863 965 89.4 %
Date: 2015-10-10 Functions: 61 63 96.8 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : #include <assert.h>
      34             : #include <stdio.h>
      35             : #include <stdlib.h>
      36             : #include <string.h>
      37             : 
      38             : #include <grpc/compression.h>
      39             : #include <grpc/support/alloc.h>
      40             : #include <grpc/support/log.h>
      41             : #include <grpc/support/string_util.h>
      42             : #include <grpc/support/useful.h>
      43             : 
      44             : #include "src/core/channel/channel_stack.h"
      45             : #include "src/core/iomgr/alarm.h"
      46             : #include "src/core/profiling/timers.h"
      47             : #include "src/core/support/string.h"
      48             : #include "src/core/surface/api_trace.h"
      49             : #include "src/core/surface/byte_buffer_queue.h"
      50             : #include "src/core/surface/call.h"
      51             : #include "src/core/surface/channel.h"
      52             : #include "src/core/surface/completion_queue.h"
      53             : 
      54             : /** The maximum number of completions possible.
      55             :     Based upon the maximum number of individually queueable ops in the batch
      56             :    api:
      57             :       - initial metadata send
      58             :       - message send
      59             :       - status/close send (depending on client/server)
      60             :       - initial metadata recv
      61             :       - message recv
      62             :       - status/close recv (depending on client/server) */
      63             : #define MAX_CONCURRENT_COMPLETIONS 6
      64             : 
      65             : typedef struct {
      66             :   grpc_ioreq_completion_func on_complete;
      67             :   void *user_data;
      68             :   int success;
      69             : } completed_request;
      70             : 
      71             : /* See request_set in grpc_call below for a description */
      72             : #define REQSET_EMPTY 'X'
      73             : #define REQSET_DONE 'Y'
      74             : 
      75             : #define MAX_SEND_INITIAL_METADATA_COUNT 3
      76             : 
      77             : typedef struct {
      78             :   /* Overall status of the operation: starts OK, may degrade to
      79             :      non-OK */
      80             :   gpr_uint8 success;
      81             :   /* a bit mask of which request ops are needed (1u << opid) */
      82             :   gpr_uint16 need_mask;
      83             :   /* a bit mask of which request ops are now completed */
      84             :   gpr_uint16 complete_mask;
      85             :   /* Completion function to call at the end of the operation */
      86             :   grpc_ioreq_completion_func on_complete;
      87             :   void *user_data;
      88             : } reqinfo_master;
      89             : 
      90             : /* Status data for a request can come from several sources; this
      91             :    enumerates them all, and acts as a priority sorting for which
      92             :    status to return to the application - earlier entries override
      93             :    later ones */
      94             : typedef enum {
      95             :   /* Status came from the application layer overriding whatever
      96             :      the wire says */
      97             :   STATUS_FROM_API_OVERRIDE = 0,
      98             :   /* Status was created by some internal channel stack operation */
      99             :   STATUS_FROM_CORE,
     100             :   /* Status came from 'the wire' - or somewhere below the surface
     101             :      layer */
     102             :   STATUS_FROM_WIRE,
     103             :   /* Status came from the server sending status */
     104             :   STATUS_FROM_SERVER_STATUS,
     105             :   STATUS_SOURCE_COUNT
     106             : } status_source;
     107             : 
     108             : typedef struct {
     109             :   gpr_uint8 is_set;
     110             :   grpc_status_code code;
     111             :   grpc_mdstr *details;
     112             : } received_status;
     113             : 
     114             : /* How far through the GRPC stream have we read? */
     115             : typedef enum {
     116             :   /* We are still waiting for initial metadata to complete */
     117             :   READ_STATE_INITIAL = 0,
     118             :   /* We have gotten initial metadata, and are reading either
     119             :      messages or trailing metadata */
     120             :   READ_STATE_GOT_INITIAL_METADATA,
     121             :   /* The stream is closed for reading */
     122             :   READ_STATE_READ_CLOSED,
     123             :   /* The stream is closed for reading & writing */
     124             :   READ_STATE_STREAM_CLOSED
     125             : } read_state;
     126             : 
     127             : typedef enum {
     128             :   WRITE_STATE_INITIAL = 0,
     129             :   WRITE_STATE_STARTED,
     130             :   WRITE_STATE_WRITE_CLOSED
     131             : } write_state;
     132             : 
     133             : struct grpc_call {
     134             :   grpc_completion_queue *cq;
     135             :   grpc_channel *channel;
     136             :   grpc_call *parent;
     137             :   grpc_call *first_child;
     138             :   grpc_mdctx *metadata_context;
     139             :   /* TODO(ctiller): share with cq if possible? */
     140             :   gpr_mu mu;
     141             :   gpr_mu completion_mu;
     142             : 
     143             :   /* how far through the stream have we read? */
     144             :   read_state read_state;
     145             :   /* how far through the stream have we written? */
     146             :   write_state write_state;
     147             :   /* client or server call */
     148             :   gpr_uint8 is_client;
     149             :   /* is the alarm set */
     150             :   gpr_uint8 have_alarm;
     151             :   /* are we currently performing a send operation */
     152             :   gpr_uint8 sending;
     153             :   /* are we currently performing a recv operation */
     154             :   gpr_uint8 receiving;
     155             :   /* are we currently completing requests */
     156             :   gpr_uint8 completing;
     157             :   /** has grpc_call_destroy been called */
     158             :   gpr_uint8 destroy_called;
     159             :   /* pairs with completed_requests */
     160             :   gpr_uint8 num_completed_requests;
     161             :   /* are we currently reading a message? */
     162             :   gpr_uint8 reading_message;
     163             :   /* have we bound a pollset yet? */
     164             :   gpr_uint8 bound_pollset;
     165             :   /* is an error status set */
     166             :   gpr_uint8 error_status_set;
     167             :   /** bitmask of allocated completion events in completions */
     168             :   gpr_uint8 allocated_completions;
     169             :   /** flag indicating that cancellation is inherited */
     170             :   gpr_uint8 cancellation_is_inherited;
     171             : 
     172             :   /* flags with bits corresponding to write states allowing us to determine
     173             :      what was sent */
     174             :   gpr_uint16 last_send_contains;
     175             :   /* cancel with this status on the next outgoing transport op */
     176             :   grpc_status_code cancel_with_status;
     177             : 
     178             :   /* Active ioreqs.
     179             :      request_set and request_data contain one element per active ioreq
     180             :      operation.
     181             : 
     182             :      request_set[op] is an integer specifying a set of operations to which
     183             :      the request belongs:
     184             :      - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending
     185             :      completion, and the integer represents to which group of operations
     186             :      the ioreq belongs. Each group is represented by one master, and the
     187             :      integer in request_set is an index into masters to find the master
     188             :      data.
     189             :      - if it is REQSET_EMPTY, the ioreq op is inactive and available to be
     190             :      started
     191             :      - finally, if request_set[op] is REQSET_DONE, then the operation is
     192             :      complete and unavailable to be started again
     193             : 
     194             :      request_data[op] is the request data as supplied by the initiator of
     195             :      a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT.
     196             :      The set fields are as per the request type specified by op.
     197             : 
     198             :      Finally, one element of masters is set per active _set_ of ioreq
     199             :      operations. It describes work left outstanding, result status, and
     200             :      what work to perform upon operation completion. As one ioreq of each
     201             :      op type can be active at once, by convention we choose the first element
     202             :      of the group to be the master -- ie the master of in-progress operation
     203             :      op is masters[request_set[op]]. This allows constant time allocation
     204             :      and a strong upper bound of a count of masters to be calculated. */
     205             :   gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT];
     206             :   grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT];
     207             :   gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT];
     208             :   reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
     209             : 
     210             :   /* Dynamic array of ioreq's that have completed: the count of
     211             :      elements is queued in num_completed_requests.
     212             :      This list is built up under lock(), and flushed entirely during
     213             :      unlock().
     214             :      We know the upper bound of the number of elements as we can only
     215             :      have one ioreq of each type active at once. */
     216             :   completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
     217             :   /* Incoming buffer of messages */
     218             :   grpc_byte_buffer_queue incoming_queue;
     219             :   /* Buffered read metadata waiting to be returned to the application.
     220             :      Element 0 is initial metadata, element 1 is trailing metadata. */
     221             :   grpc_metadata_array buffered_metadata[2];
     222             :   /* All metadata received - unreffed at once at the end of the call */
     223             :   grpc_mdelem **owned_metadata;
     224             :   size_t owned_metadata_count;
     225             :   size_t owned_metadata_capacity;
     226             : 
     227             :   /* Received call statuses from various sources */
     228             :   received_status status[STATUS_SOURCE_COUNT];
     229             : 
     230             :   /* Compression algorithm for the call */
     231             :   grpc_compression_algorithm compression_algorithm;
     232             : 
     233             :   /* Supported encodings (compression algorithms), a bitset */
     234             :   gpr_uint32 encodings_accepted_by_peer;
     235             : 
     236             :   /* Contexts for various subsystems (security, tracing, ...). */
     237             :   grpc_call_context_element context[GRPC_CONTEXT_COUNT];
     238             : 
     239             :   /* Deadline alarm - if have_alarm is non-zero */
     240             :   grpc_alarm alarm;
     241             : 
     242             :   /* Call refcount - to keep the call alive during asynchronous operations */
     243             :   gpr_refcount internal_refcount;
     244             : 
     245             :   grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT];
     246             :   grpc_linked_mdelem status_link;
     247             :   grpc_linked_mdelem details_link;
     248             :   size_t send_initial_metadata_count;
     249             :   gpr_timespec send_deadline;
     250             : 
     251             :   grpc_stream_op_buffer send_ops;
     252             :   grpc_stream_op_buffer recv_ops;
     253             :   grpc_stream_state recv_state;
     254             : 
     255             :   gpr_slice_buffer incoming_message;
     256             :   gpr_uint32 incoming_message_length;
     257             :   gpr_uint32 incoming_message_flags;
     258             :   grpc_closure destroy_closure;
     259             :   grpc_closure on_done_recv;
     260             :   grpc_closure on_done_send;
     261             :   grpc_closure on_done_bind;
     262             : 
     263             :   /** completion events - for completion queue use */
     264             :   grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
     265             : 
     266             :   /** siblings: children of the same parent form a list, and this list is
     267             :      protected under
     268             :       parent->mu */
     269             :   grpc_call *sibling_next;
     270             :   grpc_call *sibling_prev;
     271             : };
     272             : 
     273             : #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
     274             : #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
     275             : #define CALL_ELEM_FROM_CALL(call, idx) \
     276             :   grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
     277             : #define CALL_FROM_TOP_ELEM(top_elem) \
     278             :   CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
     279             : 
     280             : static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
     281             :                                gpr_timespec deadline);
     282             : static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *call, int success);
     283             : static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *call, int success);
     284             : static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op);
     285             : static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
     286             :                        grpc_transport_stream_op *op);
     287             : static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call,
     288             :                           grpc_metadata_batch *metadata);
     289             : static void finish_read_ops(grpc_call *call);
     290             : static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
     291             :                                           const char *description);
     292             : static void finished_loose_op(grpc_exec_ctx *exec_ctx, void *call, int success);
     293             : 
     294             : static void lock(grpc_call *call);
     295             : static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call);
     296             : 
     297     2703891 : grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
     298             :                             gpr_uint32 propagation_mask,
     299             :                             grpc_completion_queue *cq,
     300             :                             const void *server_transport_data,
     301             :                             grpc_mdelem **add_initial_metadata,
     302             :                             size_t add_initial_metadata_count,
     303             :                             gpr_timespec send_deadline) {
     304             :   size_t i;
     305             :   grpc_transport_stream_op initial_op;
     306     2703891 :   grpc_transport_stream_op *initial_op_ptr = NULL;
     307     2703891 :   grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
     308     2704453 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     309     2704453 :   grpc_call *call =
     310     2704453 :       gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
     311     2704088 :   memset(call, 0, sizeof(grpc_call));
     312     2704088 :   gpr_mu_init(&call->mu);
     313     2704145 :   gpr_mu_init(&call->completion_mu);
     314     2703747 :   call->channel = channel;
     315     2703747 :   call->cq = cq;
     316     2703747 :   if (cq != NULL) {
     317     1403260 :     GRPC_CQ_INTERNAL_REF(cq, "bind");
     318             :   }
     319     2704535 :   call->parent = parent_call;
     320     2704535 :   call->is_client = server_transport_data == NULL;
     321    32428029 :   for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
     322    29723494 :     call->request_set[i] = REQSET_EMPTY;
     323             :   }
     324     2704535 :   if (call->is_client) {
     325     1403299 :     call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
     326     1403299 :     call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
     327             :   }
     328     2704535 :   GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
     329     5510183 :   for (i = 0; i < add_initial_metadata_count; i++) {
     330     2805648 :     call->send_initial_metadata[i].md = add_initial_metadata[i];
     331             :   }
     332     2704535 :   call->send_initial_metadata_count = add_initial_metadata_count;
     333     2704535 :   call->send_deadline = send_deadline;
     334     2704535 :   GRPC_CHANNEL_INTERNAL_REF(channel, "call");
     335     2705047 :   call->metadata_context = grpc_channel_get_metadata_context(channel);
     336     2705042 :   grpc_sopb_init(&call->send_ops);
     337     2704983 :   grpc_sopb_init(&call->recv_ops);
     338     2704852 :   gpr_slice_buffer_init(&call->incoming_message);
     339     2704792 :   grpc_closure_init(&call->on_done_recv, call_on_done_recv, call);
     340     2704755 :   grpc_closure_init(&call->on_done_send, call_on_done_send, call);
     341     2704477 :   grpc_closure_init(&call->on_done_bind, finished_loose_op, call);
     342             :   /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */
     343     2703932 :   gpr_ref_init(&call->internal_refcount, 2);
     344             :   /* server hack: start reads immediately so we can get initial metadata.
     345             :      TODO(ctiller): figure out a cleaner solution */
     346     2703730 :   if (!call->is_client) {
     347     1301630 :     memset(&initial_op, 0, sizeof(initial_op));
     348     1301630 :     initial_op.recv_ops = &call->recv_ops;
     349     1301630 :     initial_op.recv_state = &call->recv_state;
     350     1301630 :     initial_op.on_done_recv = &call->on_done_recv;
     351     1301630 :     initial_op.context = call->context;
     352     1301630 :     call->receiving = 1;
     353     1301630 :     GRPC_CALL_INTERNAL_REF(call, "receiving");
     354     1301853 :     initial_op_ptr = &initial_op;
     355             :   }
     356     2703953 :   grpc_call_stack_init(&exec_ctx, channel_stack, server_transport_data,
     357             :                        initial_op_ptr, CALL_STACK_FROM_CALL(call));
     358     2703925 :   if (parent_call != NULL) {
     359         502 :     GRPC_CALL_INTERNAL_REF(parent_call, "child");
     360         502 :     GPR_ASSERT(call->is_client);
     361         502 :     GPR_ASSERT(!parent_call->is_client);
     362             : 
     363         502 :     gpr_mu_lock(&parent_call->mu);
     364             : 
     365         502 :     if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
     366         502 :       send_deadline = gpr_time_min(
     367             :           gpr_convert_clock_type(send_deadline,
     368             :                                  parent_call->send_deadline.clock_type),
     369             :           parent_call->send_deadline);
     370             :     }
     371             :     /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
     372             :      * GRPC_PROPAGATE_STATS_CONTEXT */
     373             :     /* TODO(ctiller): This should change to use the appropriate census start_op
     374             :      * call. */
     375         502 :     if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
     376         502 :       GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
     377         502 :       grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
     378             :                             parent_call->context[GRPC_CONTEXT_TRACING].value,
     379             :                             NULL);
     380             :     } else {
     381           0 :       GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
     382             :     }
     383         502 :     if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
     384         502 :       call->cancellation_is_inherited = 1;
     385             :     }
     386             : 
     387         502 :     if (parent_call->first_child == NULL) {
     388         502 :       parent_call->first_child = call;
     389         502 :       call->sibling_next = call->sibling_prev = call;
     390             :     } else {
     391           0 :       call->sibling_next = parent_call->first_child;
     392           0 :       call->sibling_prev = parent_call->first_child->sibling_prev;
     393           0 :       call->sibling_next->sibling_prev = call->sibling_prev->sibling_next =
     394             :           call;
     395             :     }
     396             : 
     397         502 :     gpr_mu_unlock(&parent_call->mu);
     398             :   }
     399     2703925 :   if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
     400             :       0) {
     401        3160 :     set_deadline_alarm(&exec_ctx, call, send_deadline);
     402             :   }
     403     2703513 :   grpc_exec_ctx_finish(&exec_ctx);
     404     2704697 :   return call;
     405             : }
     406             : 
     407     1301717 : void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
     408             :                                     grpc_completion_queue *cq) {
     409     1301717 :   lock(call);
     410     1301594 :   call->cq = cq;
     411     1301594 :   if (cq) {
     412     1301733 :     GRPC_CQ_INTERNAL_REF(cq, "bind");
     413             :   }
     414     1301603 :   unlock(exec_ctx, call);
     415     1301525 : }
     416             : 
     417           0 : grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
     418           0 :   return call->cq;
     419             : }
     420             : 
     421     5644836 : static grpc_cq_completion *allocate_completion(grpc_call *call) {
     422             :   gpr_uint8 i;
     423     5644836 :   gpr_mu_lock(&call->completion_mu);
     424    13283240 :   for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) {
     425     6641620 :     if (call->allocated_completions & (1u << i)) {
     426      994996 :       continue;
     427             :     }
     428             :     /* NB: the following integer arithmetic operation needs to be in its
     429             :      * expanded form due to the "integral promotion" performed (see section
     430             :      * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type
     431             :      * is then required to avoid the compiler warning */
     432     5646624 :     call->allocated_completions =
     433     5646624 :         (gpr_uint8)(call->allocated_completions | (1u << i));
     434     5646624 :     gpr_mu_unlock(&call->completion_mu);
     435     5646159 :     return &call->completions[i];
     436             :   }
     437           0 :   GPR_UNREACHABLE_CODE(return NULL);
     438             :   return NULL;
     439             : }
     440             : 
     441     5641180 : static void done_completion(grpc_exec_ctx *exec_ctx, void *call,
     442             :                             grpc_cq_completion *completion) {
     443     5641180 :   grpc_call *c = call;
     444     5641180 :   gpr_mu_lock(&c->completion_mu);
     445    11290624 :   c->allocated_completions &=
     446     5645312 :       (gpr_uint8) ~(1u << (completion - c->completions));
     447     5645312 :   gpr_mu_unlock(&c->completion_mu);
     448     5643445 :   GRPC_CALL_INTERNAL_UNREF(exec_ctx, c, "completion");
     449     5644369 : }
     450             : 
     451             : #ifdef GRPC_CALL_REF_COUNT_DEBUG
     452             : void grpc_call_internal_ref(grpc_call *c, const char *reason) {
     453             :   gpr_log(GPR_DEBUG, "CALL:   ref %p %d -> %d [%s]", c,
     454             :           c->internal_refcount.count, c->internal_refcount.count + 1, reason);
     455             : #else
     456    25005364 : void grpc_call_internal_ref(grpc_call *c) {
     457             : #endif
     458    25005364 :   gpr_ref(&c->internal_refcount);
     459    25120528 : }
     460             : 
     461     2705085 : static void destroy_call(grpc_exec_ctx *exec_ctx, grpc_call *call) {
     462             :   size_t i;
     463     2705085 :   grpc_call *c = call;
     464     2705085 :   grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
     465     2703236 :   GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
     466     2705039 :   gpr_mu_destroy(&c->mu);
     467     2704850 :   gpr_mu_destroy(&c->completion_mu);
     468    13519042 :   for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
     469    10815128 :     if (c->status[i].details) {
     470      385254 :       GRPC_MDSTR_UNREF(c->status[i].details);
     471             :     }
     472             :   }
     473     4006230 :   for (i = 0; i < c->owned_metadata_count; i++) {
     474     1302197 :     GRPC_MDELEM_UNREF(c->owned_metadata[i]);
     475             :   }
     476     2704033 :   gpr_free(c->owned_metadata);
     477     8109120 :   for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) {
     478     5406473 :     gpr_free(c->buffered_metadata[i].metadata);
     479             :   }
     480     2703041 :   for (i = 0; i < c->send_initial_metadata_count; i++) {
     481         394 :     GRPC_MDELEM_UNREF(c->send_initial_metadata[i].md);
     482             :   }
     483     8108433 :   for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
     484     5405566 :     if (c->context[i].destroy) {
     485        1740 :       c->context[i].destroy(c->context[i].value);
     486             :     }
     487             :   }
     488     2702867 :   grpc_sopb_destroy(&c->send_ops);
     489     2702723 :   grpc_sopb_destroy(&c->recv_ops);
     490     2702591 :   grpc_bbq_destroy(&c->incoming_queue);
     491     2703987 :   gpr_slice_buffer_destroy(&c->incoming_message);
     492     2703939 :   if (c->cq) {
     493     2703823 :     GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
     494             :   }
     495     2705124 :   gpr_free(c);
     496     2704632 : }
     497             : 
     498             : #ifdef GRPC_CALL_REF_COUNT_DEBUG
     499             : void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c,
     500             :                               const char *reason) {
     501             :   gpr_log(GPR_DEBUG, "CALL: unref %p %d -> %d [%s]", c,
     502             :           c->internal_refcount.count, c->internal_refcount.count - 1, reason);
     503             : #else
     504    30421284 : void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c) {
     505             : #endif
     506    30421284 :   if (gpr_unref(&c->internal_refcount)) {
     507     2705089 :     destroy_call(exec_ctx, c);
     508             :   }
     509    30583198 : }
     510             : 
     511     2997059 : static void set_status_code(grpc_call *call, status_source source,
     512             :                             gpr_uint32 status) {
     513     5994117 :   if (call->status[source].is_set) return;
     514             : 
     515     2991661 :   call->status[source].is_set = 1;
     516     2991661 :   call->status[source].code = (grpc_status_code)status;
     517     2991661 :   call->error_status_set = status != GRPC_STATUS_OK;
     518             : 
     519     2991661 :   if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) {
     520         131 :     grpc_bbq_flush(&call->incoming_queue);
     521             :   }
     522             : }
     523             : 
     524     2703435 : static void set_compression_algorithm(grpc_call *call,
     525             :                                       grpc_compression_algorithm algo) {
     526     2703435 :   call->compression_algorithm = algo;
     527     2703435 : }
     528             : 
     529           4 : grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
     530             :     grpc_call *call) {
     531             :   grpc_compression_algorithm algorithm;
     532           4 :   gpr_mu_lock(&call->mu);
     533           4 :   algorithm = call->compression_algorithm;
     534           4 :   gpr_mu_unlock(&call->mu);
     535           4 :   return algorithm;
     536             : }
     537             : 
     538     2703127 : static void set_encodings_accepted_by_peer(
     539             :     grpc_call *call, const gpr_slice accept_encoding_slice) {
     540             :   size_t i;
     541             :   grpc_compression_algorithm algorithm;
     542             :   gpr_slice_buffer accept_encoding_parts;
     543             : 
     544     2703127 :   gpr_slice_buffer_init(&accept_encoding_parts);
     545     2703544 :   gpr_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
     546             : 
     547             :   /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
     548             :    * zeroes the whole grpc_call */
     549             :   /* Always support no compression */
     550     2700330 :   GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
     551     8103114 :   for (i = 0; i < accept_encoding_parts.count; i++) {
     552     5399894 :     const gpr_slice *accept_encoding_entry_slice =
     553     5399894 :         &accept_encoding_parts.slices[i];
     554    16200387 :     if (grpc_compression_algorithm_parse(
     555     5399894 :             (const char *)GPR_SLICE_START_PTR(*accept_encoding_entry_slice),
     556    10800493 :             GPR_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) {
     557     5402464 :       GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
     558             :     } else {
     559           0 :       char *accept_encoding_entry_str =
     560             :           gpr_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII);
     561           0 :       gpr_log(GPR_ERROR,
     562             :               "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
     563             :               accept_encoding_entry_str);
     564           0 :       gpr_free(accept_encoding_entry_str);
     565             :     }
     566             :   }
     567     2703220 : }
     568             : 
     569        1320 : gpr_uint32 grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
     570             :   gpr_uint32 encodings_accepted_by_peer;
     571        1320 :   gpr_mu_lock(&call->mu);
     572        1320 :   encodings_accepted_by_peer = call->encodings_accepted_by_peer;
     573        1320 :   gpr_mu_unlock(&call->mu);
     574        1320 :   return encodings_accepted_by_peer;
     575             : }
     576             : 
     577           4 : gpr_uint32 grpc_call_test_only_get_message_flags(grpc_call *call) {
     578             :   gpr_uint32 flags;
     579           4 :   gpr_mu_lock(&call->mu);
     580           4 :   flags = call->incoming_message_flags;
     581           4 :   gpr_mu_unlock(&call->mu);
     582           4 :   return flags;
     583             : }
     584             : 
     585      385293 : static void set_status_details(grpc_call *call, status_source source,
     586             :                                grpc_mdstr *status) {
     587      385293 :   if (call->status[source].details != NULL) {
     588          35 :     GRPC_MDSTR_UNREF(call->status[source].details);
     589             :   }
     590      385293 :   call->status[source].details = status;
     591      385293 : }
     592             : 
     593   124556254 : static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
     594   124556254 :   gpr_uint8 set = call->request_set[op];
     595             :   reqinfo_master *master;
     596   124556254 :   if (set >= GRPC_IOREQ_OP_COUNT) return 0;
     597    44509805 :   master = &call->masters[set];
     598    44509805 :   return (master->complete_mask & (1u << op)) == 0;
     599             : }
     600             : 
     601    26340839 : static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
     602             : 
     603    15974155 : static int need_more_data(grpc_call *call) {
     604    15974155 :   if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
     605             :   /* TODO(ctiller): this needs some serious cleanup */
     606    22920025 :   return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
     607     7900690 :          (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) &&
     608     6712465 :           grpc_bbq_empty(&call->incoming_queue)) ||
     609    11044874 :          is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
     610    10845861 :          is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
     611    10652444 :          is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
     612     5336191 :          (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
     613     5329456 :           grpc_bbq_empty(&call->incoming_queue)) ||
     614    11780257 :          (call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
     615    16082117 :          (call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called;
     616             : }
     617             : 
     618    26265701 : static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call) {
     619             :   grpc_transport_stream_op op;
     620             :   completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
     621    26265701 :   int completing_requests = 0;
     622    26265701 :   int start_op = 0;
     623             :   int i;
     624    26265701 :   const size_t MAX_RECV_PEEK_AHEAD = 65536;
     625             :   size_t buffered_bytes;
     626             : 
     627    26265701 :   memset(&op, 0, sizeof(op));
     628             : 
     629    26265701 :   op.cancel_with_status = call->cancel_with_status;
     630    26265701 :   start_op = op.cancel_with_status != GRPC_STATUS_OK;
     631    26265701 :   call->cancel_with_status = GRPC_STATUS_OK; /* reset */
     632             : 
     633    26265701 :   if (!call->receiving && need_more_data(call)) {
     634     4133603 :     if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
     635     3030303 :       op.max_recv_bytes = call->incoming_message_length -
     636     2020202 :                           call->incoming_message.length + MAX_RECV_PEEK_AHEAD;
     637             :     } else {
     638     3123498 :       buffered_bytes = grpc_bbq_bytes(&call->incoming_queue);
     639     3123572 :       if (buffered_bytes > MAX_RECV_PEEK_AHEAD) {
     640       88502 :         op.max_recv_bytes = 0;
     641             :       } else {
     642     3035070 :         op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes;
     643             :       }
     644             :     }
     645             :     /* TODO(ctiller): 1024 is basically to cover a bug
     646             :        I don't understand yet */
     647     4133673 :     if (op.max_recv_bytes > 1024) {
     648     4045062 :       op.recv_ops = &call->recv_ops;
     649     4045062 :       op.recv_state = &call->recv_state;
     650     4045062 :       op.on_done_recv = &call->on_done_recv;
     651     4045062 :       call->receiving = 1;
     652     4045062 :       GRPC_CALL_INTERNAL_REF(call, "receiving");
     653     4046429 :       start_op = 1;
     654             :     }
     655             :   }
     656             : 
     657    26258619 :   if (!call->sending) {
     658    26125853 :     if (fill_send_ops(call, &op)) {
     659     3006765 :       call->sending = 1;
     660     3006765 :       GRPC_CALL_INTERNAL_REF(call, "sending");
     661     3007494 :       start_op = 1;
     662             :     }
     663             :   }
     664             : 
     665    26290565 :   if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) {
     666     2595821 :     call->bound_pollset = 1;
     667     2595821 :     op.bind_pollset = grpc_cq_pollset(call->cq);
     668     2703657 :     start_op = 1;
     669             :   }
     670             : 
     671    26398401 :   if (!call->completing && call->num_completed_requests != 0) {
     672     6921240 :     completing_requests = call->num_completed_requests;
     673     6921240 :     memcpy(completed_requests, call->completed_requests,
     674             :            sizeof(completed_requests));
     675     6921240 :     call->num_completed_requests = 0;
     676     6921240 :     call->completing = 1;
     677     6921240 :     GRPC_CALL_INTERNAL_REF(call, "completing");
     678             :   }
     679             : 
     680    26406718 :   gpr_mu_unlock(&call->mu);
     681             : 
     682    26448252 :   if (start_op) {
     683     8268252 :     execute_op(exec_ctx, call, &op);
     684             :   }
     685             : 
     686    26491981 :   if (completing_requests > 0) {
     687    13873935 :     for (i = 0; i < completing_requests; i++) {
     688     6945532 :       completed_requests[i].on_complete(exec_ctx, call,
     689             :                                         completed_requests[i].success,
     690             :                                         completed_requests[i].user_data);
     691             :     }
     692     6928403 :     lock(call);
     693     6930515 :     call->completing = 0;
     694     6930515 :     unlock(exec_ctx, call);
     695     6931324 :     GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completing");
     696             :   }
     697    26492799 : }
     698             : 
     699     2702089 : static void get_final_status(grpc_call *call, grpc_ioreq_data out) {
     700             :   int i;
     701     8575586 :   for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
     702     8575586 :     if (call->status[i].is_set) {
     703     2702089 :       out.recv_status.set_value(call->status[i].code,
     704             :                                 out.recv_status.user_data);
     705     5407384 :       return;
     706             :     }
     707             :   }
     708           0 :   if (call->is_client) {
     709           0 :     out.recv_status.set_value(GRPC_STATUS_UNKNOWN, out.recv_status.user_data);
     710             :   } else {
     711           0 :     out.recv_status.set_value(GRPC_STATUS_OK, out.recv_status.user_data);
     712             :   }
     713             : }
     714             : 
     715     1402818 : static void get_final_details(grpc_call *call, grpc_ioreq_data out) {
     716             :   int i;
     717     4207386 :   for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
     718     4207345 :     if (call->status[i].is_set) {
     719     1402777 :       if (call->status[i].details) {
     720      103885 :         gpr_slice details = call->status[i].details->slice;
     721      103885 :         size_t len = GPR_SLICE_LENGTH(details);
     722      103885 :         if (len + 1 > *out.recv_status_details.details_capacity) {
     723        2886 :           *out.recv_status_details.details_capacity = GPR_MAX(
     724             :               len + 1, *out.recv_status_details.details_capacity * 3 / 2);
     725        5831 :           *out.recv_status_details.details =
     726        2886 :               gpr_realloc(*out.recv_status_details.details,
     727        2886 :                           *out.recv_status_details.details_capacity);
     728             :         }
     729      103944 :         memcpy(*out.recv_status_details.details, GPR_SLICE_START_PTR(details),
     730             :                len);
     731      103944 :         (*out.recv_status_details.details)[len] = 0;
     732             :       } else {
     733     1298892 :         goto no_details;
     734             :       }
     735     1507045 :       return;
     736             :     }
     737             :   }
     738             : 
     739             : no_details:
     740     1298933 :   if (0 == *out.recv_status_details.details_capacity) {
     741     1298927 :     *out.recv_status_details.details_capacity = 8;
     742     2598078 :     *out.recv_status_details.details =
     743     1298927 :         gpr_malloc(*out.recv_status_details.details_capacity);
     744             :   }
     745     1299157 :   **out.recv_status_details.details = 0;
     746             : }
     747             : 
     748    24789204 : static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
     749             :                                  int success) {
     750             :   completed_request *cr;
     751    24789204 :   gpr_uint8 master_set = call->request_set[op];
     752             :   reqinfo_master *master;
     753             :   size_t i;
     754             :   /* ioreq is live: we need to do something */
     755    24789204 :   master = &call->masters[master_set];
     756             :   /* NB: the following integer arithmetic operation needs to be in its
     757             :    * expanded form due to the "integral promotion" performed (see section
     758             :    * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type
     759             :    * is then required to avoid the compiler warning */
     760    24789204 :   master->complete_mask = (gpr_uint16)(master->complete_mask | (1u << op));
     761    24789204 :   if (!success) {
     762         489 :     master->success = 0;
     763             :   }
     764    24789204 :   if (master->complete_mask == master->need_mask) {
     765    82914140 :     for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
     766    75993161 :       if (call->request_set[i] != master_set) {
     767    51300248 :         continue;
     768             :       }
     769    24692913 :       call->request_set[i] = REQSET_DONE;
     770    24692913 :       switch ((grpc_ioreq_op)i) {
     771             :         case GRPC_IOREQ_RECV_MESSAGE:
     772             :         case GRPC_IOREQ_SEND_MESSAGE:
     773     5988929 :           call->request_set[i] = REQSET_EMPTY;
     774     5988929 :           if (!master->success) {
     775         195 :             call->write_state = WRITE_STATE_WRITE_CLOSED;
     776             :           }
     777     5988929 :           break;
     778             :         case GRPC_IOREQ_SEND_STATUS:
     779     1301234 :           if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details !=
     780             :               NULL) {
     781           8 :             GRPC_MDSTR_UNREF(
     782             :                 call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details);
     783           8 :             call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
     784             :                 NULL;
     785             :           }
     786     1301234 :           break;
     787             :         case GRPC_IOREQ_RECV_CLOSE:
     788             :         case GRPC_IOREQ_SEND_INITIAL_METADATA:
     789             :         case GRPC_IOREQ_SEND_TRAILING_METADATA:
     790             :         case GRPC_IOREQ_SEND_CLOSE:
     791     9392359 :           break;
     792             :         case GRPC_IOREQ_RECV_STATUS:
     793     2661991 :           get_final_status(call, call->request_data[GRPC_IOREQ_RECV_STATUS]);
     794     2702352 :           break;
     795             :         case GRPC_IOREQ_RECV_STATUS_DETAILS:
     796     1402818 :           get_final_details(call,
     797             :                             call->request_data[GRPC_IOREQ_RECV_STATUS_DETAILS]);
     798     1403097 :           break;
     799             :         case GRPC_IOREQ_RECV_INITIAL_METADATA:
     800     2702880 :           GPR_SWAP(grpc_metadata_array, call->buffered_metadata[0],
     801             :                    *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA]
     802             :                         .recv_metadata);
     803     2702880 :           break;
     804             :         case GRPC_IOREQ_RECV_TRAILING_METADATA:
     805     1402823 :           GPR_SWAP(grpc_metadata_array, call->buffered_metadata[1],
     806             :                    *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA]
     807             :                         .recv_metadata);
     808     1402823 :           break;
     809             :         case GRPC_IOREQ_OP_COUNT:
     810           0 :           abort();
     811             :           break;
     812             :       }
     813             :     }
     814     6920979 :     cr = &call->completed_requests[call->num_completed_requests++];
     815     6920979 :     cr->success = master->success;
     816     6920979 :     cr->on_complete = master->on_complete;
     817     6920979 :     cr->user_data = master->user_data;
     818             :   }
     819    24829844 : }
     820             : 
     821    57977240 : static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) {
     822    57977240 :   if (is_op_live(call, op)) {
     823    21813360 :     finish_live_ioreq_op(call, op, success);
     824             :   }
     825    57947537 : }
     826             : 
     827     6918375 : static void early_out_write_ops(grpc_call *call) {
     828     6918375 :   switch (call->write_state) {
     829             :     case WRITE_STATE_WRITE_CLOSED:
     830     1088267 :       finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
     831     1088247 :       finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
     832     1088143 :       finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
     833     1087991 :       finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
     834             :     /* fallthrough */
     835             :     case WRITE_STATE_STARTED:
     836     1589614 :       finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
     837             :     /* fallthrough */
     838             :     case WRITE_STATE_INITIAL:
     839             :       /* do nothing */
     840     6937384 :       break;
     841             :   }
     842     6918062 : }
     843             : 
     844     3006948 : static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *pc, int success) {
     845     3006948 :   grpc_call *call = pc;
     846     3006948 :   lock(call);
     847     3007292 :   if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
     848     2704589 :     finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success);
     849     2704510 :     call->write_state = WRITE_STATE_STARTED;
     850             :   }
     851     3007213 :   if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
     852     2999048 :     finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success);
     853             :   }
     854     3006586 :   if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) {
     855     2703420 :     finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success);
     856     2702588 :     finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success);
     857     2702046 :     finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
     858     2700937 :     call->write_state = WRITE_STATE_WRITE_CLOSED;
     859             :   }
     860     3004103 :   if (!success) {
     861         282 :     call->write_state = WRITE_STATE_WRITE_CLOSED;
     862         282 :     early_out_write_ops(call);
     863             :   }
     864     3004103 :   call->send_ops.nops = 0;
     865     3004103 :   call->last_send_contains = 0;
     866     3004103 :   call->sending = 0;
     867     3004103 :   unlock(exec_ctx, call);
     868     3007430 :   GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "sending");
     869     3007046 : }
     870             : 
     871     3001213 : static void finish_message(grpc_call *call) {
     872     3001213 :   if (call->error_status_set == 0) {
     873             :     /* TODO(ctiller): this could be a lot faster if coded directly */
     874             :     grpc_byte_buffer *byte_buffer;
     875             :     /* some aliases for readability */
     876     3001226 :     gpr_slice *slices = call->incoming_message.slices;
     877     3001226 :     const size_t nslices = call->incoming_message.count;
     878             : 
     879     3001292 :     if ((call->incoming_message_flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
     880          66 :         (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
     881          66 :       byte_buffer = grpc_raw_compressed_byte_buffer_create(
     882             :           slices, nslices, call->compression_algorithm);
     883             :     } else {
     884     3001160 :       byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
     885             :     }
     886     3000079 :     grpc_bbq_push(&call->incoming_queue, byte_buffer);
     887             :   }
     888     3000705 :   gpr_slice_buffer_reset_and_unref(&call->incoming_message);
     889     3000676 :   GPR_ASSERT(call->incoming_message.count == 0);
     890     3000676 :   call->reading_message = 0;
     891     3000676 : }
     892             : 
     893     3001131 : static int begin_message(grpc_call *call, grpc_begin_message msg) {
     894             :   /* can't begin a message when we're still reading a message */
     895     3001131 :   if (call->reading_message) {
     896           0 :     char *message = NULL;
     897           0 :     gpr_asprintf(
     898             :         &message, "Message terminated early; read %d bytes, expected %d",
     899           0 :         (int)call->incoming_message.length, (int)call->incoming_message_length);
     900           0 :     cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
     901           0 :     gpr_free(message);
     902           0 :     return 0;
     903             :   }
     904             :   /* sanity check: if message flags indicate a compressed message, the
     905             :    * compression level should already be present in the call, as parsed off its
     906             :    * corresponding metadata. */
     907     3001210 :   if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
     908          79 :       (call->compression_algorithm == GRPC_COMPRESS_NONE)) {
     909          13 :     char *message = NULL;
     910             :     char *alg_name;
     911          13 :     if (!grpc_compression_algorithm_name(call->compression_algorithm,
     912             :                                          &alg_name)) {
     913             :       /* This shouldn't happen, other than due to data corruption */
     914           0 :       alg_name = "<unknown>";
     915             :     }
     916          13 :     gpr_asprintf(&message,
     917             :                  "Invalid compression algorithm (%s) for compressed message.",
     918             :                  alg_name);
     919          13 :     cancel_with_status(call, GRPC_STATUS_INTERNAL, message);
     920          13 :     gpr_free(message);
     921          13 :     return 0;
     922             :   }
     923             :   /* stash away parameters, and prepare for incoming slices */
     924     3001118 :   if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
     925          25 :     char *message = NULL;
     926          25 :     gpr_asprintf(
     927             :         &message,
     928             :         "Maximum message length of %d exceeded by a message of length %d",
     929             :         grpc_channel_get_max_message_length(call->channel), msg.length);
     930          25 :     cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
     931          25 :     gpr_free(message);
     932          25 :     return 0;
     933     3000877 :   } else if (msg.length > 0) {
     934     3000871 :     call->reading_message = 1;
     935     3000871 :     call->incoming_message_length = msg.length;
     936     3000871 :     call->incoming_message_flags = msg.flags;
     937     3000871 :     return 1;
     938             :   } else {
     939           6 :     finish_message(call);
     940           6 :     return 1;
     941             :   }
     942             : }
     943             : 
     944     7042052 : static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
     945     7042052 :   if (GPR_SLICE_LENGTH(slice) == 0) {
     946           6 :     gpr_slice_unref(slice);
     947           6 :     return 1;
     948             :   }
     949             :   /* we have to be reading a message to know what to do here */
     950     7042046 :   if (!call->reading_message) {
     951           1 :     gpr_slice_unref(slice);
     952           1 :     cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT,
     953             :                        "Received payload data while not reading a message");
     954           1 :     return 0;
     955             :   }
     956             :   /* append the slice to the incoming buffer */
     957     7042045 :   gpr_slice_buffer_add(&call->incoming_message, slice);
     958     7041702 :   if (call->incoming_message.length > call->incoming_message_length) {
     959             :     /* if we got too many bytes, complain */
     960           0 :     char *message = NULL;
     961           0 :     gpr_asprintf(
     962             :         &message, "Receiving message overflow; read %d bytes, expected %d",
     963           0 :         (int)call->incoming_message.length, (int)call->incoming_message_length);
     964           0 :     cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
     965           0 :     gpr_free(message);
     966           0 :     return 0;
     967     7041702 :   } else if (call->incoming_message.length == call->incoming_message_length) {
     968     3000850 :     finish_message(call);
     969     3000674 :     return 1;
     970             :   } else {
     971     4040852 :     return 1;
     972             :   }
     973             : }
     974             : 
     975     5346044 : static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *pc, int success) {
     976     5346044 :   grpc_call *call = pc;
     977             :   grpc_call *child_call;
     978             :   grpc_call *next_child_call;
     979             :   size_t i;
     980             :   GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
     981     5346044 :   lock(call);
     982     5347218 :   call->receiving = 0;
     983     5347218 :   if (success) {
     984    19485532 :     for (i = 0; success && i < call->recv_ops.nops; i++) {
     985    14137496 :       grpc_stream_op *op = &call->recv_ops.ops[i];
     986    14137496 :       switch (op->type) {
     987             :         case GRPC_NO_OP:
     988           0 :           break;
     989             :         case GRPC_OP_METADATA:
     990     4120363 :           recv_metadata(exec_ctx, call, &op->data.metadata);
     991     4121819 :           break;
     992             :         case GRPC_OP_BEGIN_MESSAGE:
     993     3001396 :           success = begin_message(call, op->data.begin_message);
     994     3001308 :           break;
     995             :         case GRPC_OP_SLICE:
     996     7042073 :           success = add_slice_to_message(call, op->data.slice);
     997     7041523 :           break;
     998             :       }
     999             :     }
    1000     5348036 :     if (!success) {
    1001          39 :       grpc_stream_ops_unref_owned_objects(&call->recv_ops.ops[i],
    1002          39 :                                           call->recv_ops.nops - i);
    1003             :     }
    1004     5347507 :     if (call->recv_state == GRPC_STREAM_RECV_CLOSED) {
    1005     1310234 :       GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED);
    1006     1310234 :       call->read_state = READ_STATE_READ_CLOSED;
    1007             :     }
    1008     5347507 :     if (call->recv_state == GRPC_STREAM_CLOSED) {
    1009     2704308 :       GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
    1010     2704308 :       call->read_state = READ_STATE_STREAM_CLOSED;
    1011     2704308 :       if (call->have_alarm) {
    1012        5477 :         grpc_alarm_cancel(exec_ctx, &call->alarm);
    1013             :       }
    1014             :       /* propagate cancellation to any interested children */
    1015     2704309 :       child_call = call->first_child;
    1016     2704309 :       if (child_call != NULL) {
    1017             :         do {
    1018         288 :           next_child_call = child_call->sibling_next;
    1019         288 :           if (child_call->cancellation_is_inherited) {
    1020         288 :             GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
    1021         288 :             grpc_call_cancel(child_call, NULL);
    1022         288 :             GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel");
    1023             :           }
    1024         288 :           child_call = next_child_call;
    1025         288 :         } while (child_call != call->first_child);
    1026             :       }
    1027     2704309 :       GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "closed");
    1028             :     }
    1029     5348389 :     finish_read_ops(call);
    1030             :   } else {
    1031           0 :     finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 0);
    1032           0 :     finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 0);
    1033           0 :     finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 0);
    1034           0 :     finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 0);
    1035           0 :     finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 0);
    1036           0 :     finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 0);
    1037             :   }
    1038     5343031 :   call->recv_ops.nops = 0;
    1039     5343031 :   unlock(exec_ctx, call);
    1040             : 
    1041     5348325 :   GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "receiving");
    1042             :   GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
    1043     5348135 : }
    1044             : 
    1045     4004110 : static int prepare_application_metadata(grpc_call *call, size_t count,
    1046             :                                         grpc_metadata *metadata) {
    1047             :   size_t i;
    1048     4005303 :   for (i = 0; i < count; i++) {
    1049        1193 :     grpc_metadata *md = &metadata[i];
    1050        1193 :     grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
    1051        1193 :     grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1];
    1052        1193 :     grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
    1053             :     GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
    1054        2386 :     l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
    1055        1193 :                                                (const gpr_uint8 *)md->value,
    1056             :                                                md->value_length);
    1057        1193 :     if (!grpc_mdstr_is_legal_header(l->md->key)) {
    1058           0 :       gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s",
    1059           0 :               grpc_mdstr_as_c_string(l->md->key));
    1060           0 :       return 0;
    1061        2276 :     } else if (!grpc_mdstr_is_bin_suffixed(l->md->key) &&
    1062        1083 :                !grpc_mdstr_is_legal_nonbin_header(l->md->value)) {
    1063           0 :       gpr_log(GPR_ERROR, "attempt to send invalid metadata value");
    1064           0 :       return 0;
    1065             :     }
    1066        1193 :     l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
    1067        1193 :     l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
    1068             :   }
    1069     4004110 :   return 1;
    1070             : }
    1071             : 
    1072     4003089 : static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
    1073             :                                                 grpc_metadata *metadata) {
    1074             :   grpc_mdelem_list out;
    1075     4003089 :   if (count == 0) {
    1076     4002114 :     out.head = out.tail = NULL;
    1077     4002114 :     return out;
    1078             :   }
    1079         975 :   out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
    1080         975 :   out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
    1081         975 :   return out;
    1082             : }
    1083             : 
    1084             : /* Copy the contents of a byte buffer into stream ops */
    1085     2998777 : static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
    1086             :                                            grpc_stream_op_buffer *sopb) {
    1087             :   size_t i;
    1088             : 
    1089     2998777 :   switch (byte_buffer->type) {
    1090             :     case GRPC_BB_RAW:
    1091     6000033 :       for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) {
    1092     3000640 :         gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i];
    1093     3000640 :         gpr_slice_ref(slice);
    1094     3000878 :         grpc_sopb_add_slice(sopb, slice);
    1095             :       }
    1096     2999393 :       break;
    1097             :   }
    1098     2998009 : }
    1099             : 
    1100    26101760 : static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) {
    1101             :   grpc_ioreq_data data;
    1102             :   gpr_uint32 flags;
    1103             :   grpc_metadata_batch mdb;
    1104             :   size_t i;
    1105    26101760 :   GPR_ASSERT(op->send_ops == NULL);
    1106             : 
    1107    26101760 :   switch (call->write_state) {
    1108             :     case WRITE_STATE_INITIAL:
    1109     9165149 :       if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
    1110     6473368 :         break;
    1111             :       }
    1112     2703878 :       data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
    1113     2703878 :       mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
    1114             :                                          data.send_metadata.metadata);
    1115     2701874 :       mdb.garbage.head = mdb.garbage.tail = NULL;
    1116     2701874 :       mdb.deadline = call->send_deadline;
    1117     5507499 :       for (i = 0; i < call->send_initial_metadata_count; i++) {
    1118     2804632 :         grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]);
    1119             :       }
    1120     2702867 :       grpc_sopb_add_metadata(&call->send_ops, mdb);
    1121     2702553 :       op->send_ops = &call->send_ops;
    1122     2702553 :       call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
    1123     2702553 :       call->send_initial_metadata_count = 0;
    1124             :     /* fall through intended */
    1125             :     case WRITE_STATE_STARTED:
    1126     4704632 :       if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
    1127             :         size_t length;
    1128     2999588 :         data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
    1129     2999588 :         flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE];
    1130     2999588 :         length = grpc_byte_buffer_length(data.send_message);
    1131     2999233 :         GPR_ASSERT(length <= GPR_UINT32_MAX);
    1132     2999233 :         grpc_sopb_add_begin_message(&call->send_ops, (gpr_uint32)length, flags);
    1133     2999009 :         copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
    1134     2999249 :         op->send_ops = &call->send_ops;
    1135     2999249 :         call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
    1136             :       }
    1137     4715860 :       if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
    1138     2702048 :         op->is_last_send = 1;
    1139     2702048 :         op->send_ops = &call->send_ops;
    1140     2702048 :         call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
    1141     2702048 :         if (!call->is_client) {
    1142             :           /* send trailing metadata */
    1143     1301350 :           data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
    1144     1301350 :           mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
    1145             :                                              data.send_metadata.metadata);
    1146     1301362 :           mdb.garbage.head = mdb.garbage.tail = NULL;
    1147     1301362 :           mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
    1148             :           /* send status */
    1149             :           /* TODO(ctiller): cache common status values */
    1150     1301558 :           data = call->request_data[GRPC_IOREQ_SEND_STATUS];
    1151     1301558 :           grpc_metadata_batch_add_tail(
    1152             :               &mdb, &call->status_link,
    1153             :               grpc_channel_get_reffed_status_elem(call->channel,
    1154     1301558 :                                                   data.send_status.code));
    1155     1301420 :           if (data.send_status.details) {
    1156        2423 :             grpc_metadata_batch_add_tail(
    1157             :                 &mdb, &call->details_link,
    1158             :                 grpc_mdelem_from_metadata_strings(
    1159             :                     call->metadata_context,
    1160             :                     GRPC_MDSTR_REF(
    1161             :                         grpc_channel_get_message_string(call->channel)),
    1162             :                     data.send_status.details));
    1163        2423 :             call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
    1164             :                 NULL;
    1165             :           }
    1166     1301420 :           grpc_sopb_add_metadata(&call->send_ops, mdb);
    1167             :         }
    1168             :       }
    1169     4717555 :       break;
    1170             :     case WRITE_STATE_WRITE_CLOSED:
    1171    15031928 :       break;
    1172             :   }
    1173    26125455 :   if (op->send_ops) {
    1174     3006735 :     op->on_done_send = &call->on_done_send;
    1175             :   }
    1176    26125455 :   return op->send_ops != NULL;
    1177             : }
    1178             : 
    1179           0 : static grpc_call_error start_ioreq_error(grpc_call *call,
    1180             :                                          gpr_uint32 mutated_ops,
    1181             :                                          grpc_call_error ret) {
    1182             :   size_t i;
    1183           0 :   for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
    1184           0 :     if (mutated_ops & (1u << i)) {
    1185           0 :       call->request_set[i] = REQSET_EMPTY;
    1186             :     }
    1187             :   }
    1188           0 :   return ret;
    1189             : }
    1190             : 
    1191    12257604 : static void finish_read_ops(grpc_call *call) {
    1192             :   int empty;
    1193             : 
    1194    12257604 :   if (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE)) {
    1195     6694528 :     empty =
    1196    13389370 :         (NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message =
    1197     6694842 :                       grpc_bbq_pop(&call->incoming_queue)));
    1198     6694528 :     if (!empty) {
    1199     2997897 :       finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1);
    1200     2998466 :       empty = grpc_bbq_empty(&call->incoming_queue);
    1201             :     }
    1202             :   } else {
    1203     5592939 :     empty = grpc_bbq_empty(&call->incoming_queue);
    1204             :   }
    1205             : 
    1206    12270398 :   switch (call->read_state) {
    1207             :     case READ_STATE_STREAM_CLOSED:
    1208     2707484 :       if (empty && !call->have_alarm) {
    1209     2702042 :         finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 1);
    1210             :       }
    1211             :     /* fallthrough */
    1212             :     case READ_STATE_READ_CLOSED:
    1213     6882575 :       if (empty) {
    1214     6607231 :         finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1);
    1215             :       }
    1216     6883651 :       finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 1);
    1217     6879849 :       finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 1);
    1218     6876468 :       finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 1);
    1219             :     /* fallthrough */
    1220             :     case READ_STATE_GOT_INITIAL_METADATA:
    1221     8706685 :       finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 1);
    1222             :     /* fallthrough */
    1223             :     case READ_STATE_INITIAL:
    1224             :       /* do nothing */
    1225    12267534 :       break;
    1226             :   }
    1227    12267534 : }
    1228             : 
    1229     6940053 : static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
    1230             :                                    size_t nreqs,
    1231             :                                    grpc_ioreq_completion_func completion,
    1232             :                                    void *user_data) {
    1233             :   size_t i;
    1234     6940053 :   gpr_uint16 have_ops = 0;
    1235             :   grpc_ioreq_op op;
    1236             :   reqinfo_master *master;
    1237             :   grpc_ioreq_data data;
    1238             :   gpr_uint8 set;
    1239             : 
    1240     6940053 :   if (nreqs == 0) {
    1241           0 :     return GRPC_CALL_OK;
    1242             :   }
    1243             : 
    1244     6940053 :   set = reqs[0].op;
    1245             : 
    1246    31803378 :   for (i = 0; i < nreqs; i++) {
    1247    24867596 :     op = reqs[i].op;
    1248    24867596 :     if (call->request_set[op] < GRPC_IOREQ_OP_COUNT) {
    1249           0 :       return start_ioreq_error(call, have_ops,
    1250             :                                GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
    1251    24869627 :     } else if (call->request_set[op] == REQSET_DONE) {
    1252           0 :       return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED);
    1253             :     }
    1254    24869627 :     data = reqs[i].data;
    1255    24869627 :     if (op == GRPC_IOREQ_SEND_INITIAL_METADATA ||
    1256             :         op == GRPC_IOREQ_SEND_TRAILING_METADATA) {
    1257     4011339 :       if (!prepare_application_metadata(call, data.send_metadata.count,
    1258             :                                         data.send_metadata.metadata)) {
    1259           0 :         return start_ioreq_error(call, have_ops,
    1260             :                                  GRPC_CALL_ERROR_INVALID_METADATA);
    1261             :       }
    1262             :     }
    1263    24863282 :     if (op == GRPC_IOREQ_SEND_STATUS) {
    1264     1301389 :       set_status_code(call, STATUS_FROM_SERVER_STATUS,
    1265     1301389 :                       (gpr_uint32)reqs[i].data.send_status.code);
    1266     1301432 :       if (reqs[i].data.send_status.details) {
    1267        2431 :         set_status_details(call, STATUS_FROM_SERVER_STATUS,
    1268        2431 :                            GRPC_MDSTR_REF(reqs[i].data.send_status.details));
    1269             :       }
    1270             :     }
    1271             :     /* NB: the following integer arithmetic operation needs to be in its
    1272             :      * expanded form due to the "integral promotion" performed (see section
    1273             :      * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type
    1274             :      * is then required to avoid the compiler warning */
    1275    24863325 :     have_ops = (gpr_uint16)(have_ops | (1u << op));
    1276             : 
    1277    24863325 :     call->request_data[op] = data;
    1278    24863325 :     call->request_flags[op] = reqs[i].flags;
    1279    24863325 :     call->request_set[op] = set;
    1280             :   }
    1281             : 
    1282     6935782 :   master = &call->masters[set];
    1283     6935782 :   master->success = 1;
    1284     6935782 :   master->need_mask = have_ops;
    1285     6935782 :   master->complete_mask = 0;
    1286     6935782 :   master->on_complete = completion;
    1287     6935782 :   master->user_data = user_data;
    1288             : 
    1289     6935782 :   finish_read_ops(call);
    1290     6938428 :   early_out_write_ops(call);
    1291             : 
    1292     6938777 :   return GRPC_CALL_OK;
    1293             : }
    1294             : 
    1295     6940451 : grpc_call_error grpc_call_start_ioreq_and_call_back(
    1296             :     grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_ioreq *reqs,
    1297             :     size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data) {
    1298             :   grpc_call_error err;
    1299     6940451 :   lock(call);
    1300     6945719 :   err = start_ioreq(call, reqs, nreqs, on_complete, user_data);
    1301     6939074 :   unlock(exec_ctx, call);
    1302     6940686 :   return err;
    1303             : }
    1304             : 
    1305     2703957 : void grpc_call_destroy(grpc_call *c) {
    1306             :   int cancel;
    1307     2703957 :   grpc_call *parent = c->parent;
    1308     2703957 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
    1309             : 
    1310     2703957 :   GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c));
    1311             : 
    1312     2704334 :   if (parent) {
    1313         501 :     gpr_mu_lock(&parent->mu);
    1314         502 :     if (c == parent->first_child) {
    1315         502 :       parent->first_child = c->sibling_next;
    1316         502 :       if (c == parent->first_child) {
    1317         502 :         parent->first_child = NULL;
    1318             :       }
    1319         502 :       c->sibling_prev->sibling_next = c->sibling_next;
    1320         502 :       c->sibling_next->sibling_prev = c->sibling_prev;
    1321             :     }
    1322         502 :     gpr_mu_unlock(&parent->mu);
    1323         502 :     GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child");
    1324             :   }
    1325             : 
    1326     2704335 :   lock(c);
    1327     2704762 :   GPR_ASSERT(!c->destroy_called);
    1328     2704762 :   c->destroy_called = 1;
    1329     2704762 :   if (c->have_alarm) {
    1330         221 :     grpc_alarm_cancel(&exec_ctx, &c->alarm);
    1331             :   }
    1332     2704762 :   cancel = c->read_state != READ_STATE_STREAM_CLOSED;
    1333     2704762 :   unlock(&exec_ctx, c);
    1334     2704986 :   if (cancel) grpc_call_cancel(c, NULL);
    1335     2704983 :   GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
    1336     2704492 :   grpc_exec_ctx_finish(&exec_ctx);
    1337     2704672 : }
    1338             : 
    1339      278732 : grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
    1340      278732 :   GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
    1341      278732 :   GPR_ASSERT(!reserved);
    1342      278732 :   return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled",
    1343             :                                       NULL);
    1344             : }
    1345             : 
    1346      278733 : grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
    1347             :                                              grpc_status_code status,
    1348             :                                              const char *description,
    1349             :                                              void *reserved) {
    1350             :   grpc_call_error r;
    1351      278733 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
    1352      278733 :   GRPC_API_TRACE(
    1353             :       "grpc_call_cancel_with_status("
    1354             :       "c=%p, status=%d, description=%s, reserved=%p)",
    1355             :       4, (c, (int)status, description, reserved));
    1356      278733 :   GPR_ASSERT(reserved == NULL);
    1357      278733 :   lock(c);
    1358      278736 :   r = cancel_with_status(c, status, description);
    1359      278735 :   unlock(&exec_ctx, c);
    1360      278737 :   grpc_exec_ctx_finish(&exec_ctx);
    1361      278738 :   return r;
    1362             : }
    1363             : 
    1364      279118 : static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
    1365             :                                           const char *description) {
    1366      279119 :   grpc_mdstr *details =
    1367      279118 :       description ? grpc_mdstr_from_string(c->metadata_context, description)
    1368      558236 :                   : NULL;
    1369             : 
    1370      279119 :   GPR_ASSERT(status != GRPC_STATUS_OK);
    1371             : 
    1372      279119 :   set_status_code(c, STATUS_FROM_API_OVERRIDE, (gpr_uint32)status);
    1373      279120 :   set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
    1374             : 
    1375      279117 :   c->cancel_with_status = status;
    1376             : 
    1377      279117 :   return GRPC_CALL_OK;
    1378             : }
    1379             : 
    1380     2982132 : static void finished_loose_op(grpc_exec_ctx *exec_ctx, void *call,
    1381             :                               int success_ignored) {
    1382     2982132 :   GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "loose-op");
    1383     2983683 : }
    1384             : 
    1385             : typedef struct {
    1386             :   grpc_call *call;
    1387             :   grpc_closure closure;
    1388             : } finished_loose_op_allocated_args;
    1389             : 
    1390      278955 : static void finished_loose_op_allocated(grpc_exec_ctx *exec_ctx, void *alloc,
    1391             :                                         int success) {
    1392      278955 :   finished_loose_op_allocated_args *args = alloc;
    1393      278955 :   finished_loose_op(exec_ctx, args->call, success);
    1394      278956 :   gpr_free(args);
    1395      278956 : }
    1396             : 
    1397     8268023 : static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
    1398             :                        grpc_transport_stream_op *op) {
    1399             :   grpc_call_element *elem;
    1400             : 
    1401     8268023 :   GPR_ASSERT(op->on_consumed == NULL);
    1402     8268023 :   if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) {
    1403     2982030 :     GRPC_CALL_INTERNAL_REF(call, "loose-op");
    1404     2983997 :     if (op->bind_pollset) {
    1405     2705041 :       op->on_consumed = &call->on_done_bind;
    1406             :     } else {
    1407      278956 :       finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
    1408      278956 :       args->call = call;
    1409      278956 :       grpc_closure_init(&args->closure, finished_loose_op_allocated, args);
    1410      278956 :       op->on_consumed = &args->closure;
    1411             :     }
    1412             :   }
    1413             : 
    1414     8269990 :   elem = CALL_ELEM_FROM_CALL(call, 0);
    1415     8261110 :   op->context = call->context;
    1416     8261110 :   elem->filter->start_transport_stream_op(exec_ctx, elem, op);
    1417     8260953 : }
    1418             : 
    1419        1434 : char *grpc_call_get_peer(grpc_call *call) {
    1420        1434 :   grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
    1421        1434 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
    1422        1434 :   char *result = elem->filter->get_peer(&exec_ctx, elem);
    1423        1434 :   GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
    1424        1434 :   grpc_exec_ctx_finish(&exec_ctx);
    1425        1434 :   return result;
    1426             : }
    1427             : 
    1428     1302018 : grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
    1429     1302018 :   return CALL_FROM_TOP_ELEM(elem);
    1430             : }
    1431             : 
    1432        5858 : static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, int success) {
    1433        5858 :   grpc_call *call = arg;
    1434        5858 :   lock(call);
    1435        5858 :   call->have_alarm = 0;
    1436        5858 :   if (success) {
    1437         343 :     cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
    1438             :                        "Deadline Exceeded");
    1439             :   }
    1440        5858 :   finish_read_ops(call);
    1441        5858 :   unlock(exec_ctx, call);
    1442        5858 :   GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm");
    1443        5858 : }
    1444             : 
    1445        5858 : static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
    1446             :                                gpr_timespec deadline) {
    1447        5858 :   if (call->have_alarm) {
    1448           0 :     gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
    1449           0 :     assert(0);
    1450             :     return;
    1451             :   }
    1452        5858 :   GRPC_CALL_INTERNAL_REF(call, "alarm");
    1453        5858 :   call->have_alarm = 1;
    1454        5858 :   call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
    1455        5858 :   grpc_alarm_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call,
    1456             :                   gpr_now(GPR_CLOCK_MONOTONIC));
    1457        5858 : }
    1458             : 
    1459             : /* we offset status by a small amount when storing it into transport metadata
    1460             :    as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
    1461             :    */
    1462             : #define STATUS_OFFSET 1
    1463        2451 : static void destroy_status(void *ignored) {}
    1464             : 
    1465     1417621 : static gpr_uint32 decode_status(grpc_mdelem *md) {
    1466             :   gpr_uint32 status;
    1467     1417621 :   void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
    1468     1417796 :   if (user_data) {
    1469     1415345 :     status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
    1470             :   } else {
    1471        4902 :     if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
    1472        4902 :                                    GPR_SLICE_LENGTH(md->value->slice),
    1473             :                                    &status)) {
    1474           0 :       status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
    1475             :     }
    1476        2451 :     grpc_mdelem_set_user_data(md, destroy_status,
    1477        2451 :                               (void *)(gpr_intptr)(status + STATUS_OFFSET));
    1478             :   }
    1479     1417796 :   return status;
    1480             : }
    1481             : 
    1482             : /* just as for status above, we need to offset: metadata userdata can't hold a
    1483             :  * zero (null), which in this case is used to signal no compression */
    1484             : #define COMPRESS_OFFSET 1
    1485        2953 : static void destroy_compression(void *ignored) {}
    1486             : 
    1487     2703026 : static gpr_uint32 decode_compression(grpc_mdelem *md) {
    1488             :   grpc_compression_algorithm algorithm;
    1489     2703026 :   void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
    1490     2703467 :   if (user_data) {
    1491     5401028 :     algorithm =
    1492     5401028 :         ((grpc_compression_algorithm)(gpr_intptr)user_data) - COMPRESS_OFFSET;
    1493             :   } else {
    1494        2953 :     const char *md_c_str = grpc_mdstr_as_c_string(md->value);
    1495        2953 :     if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
    1496             :                                           &algorithm)) {
    1497           0 :       gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
    1498           0 :       assert(0);
    1499             :     }
    1500        2953 :     grpc_mdelem_set_user_data(
    1501             :         md, destroy_compression,
    1502        2953 :         (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
    1503             :   }
    1504     2703631 :   return algorithm;
    1505             : }
    1506             : 
    1507     4112395 : static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call,
    1508             :                           grpc_metadata_batch *md) {
    1509             :   grpc_linked_mdelem *l;
    1510             :   grpc_metadata_array *dest;
    1511             :   grpc_metadata *mdusr;
    1512             :   int is_trailing;
    1513     4112395 :   grpc_mdctx *mdctx = call->metadata_context;
    1514             : 
    1515     4112395 :   is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
    1516    12329996 :   for (l = md->list.head; l != NULL; l = l->next) {
    1517     8210729 :     grpc_mdelem *mdel = l->md;
    1518     8210729 :     grpc_mdstr *key = mdel->key;
    1519     8210729 :     if (key == grpc_channel_get_status_string(call->channel)) {
    1520     1417622 :       set_status_code(call, STATUS_FROM_WIRE, decode_status(mdel));
    1521     6804424 :     } else if (key == grpc_channel_get_message_string(call->channel)) {
    1522      103743 :       set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(mdel->value));
    1523     6706908 :     } else if (key ==
    1524     6700282 :                grpc_channel_get_compression_algorithm_string(call->channel)) {
    1525     2703415 :       set_compression_algorithm(call, decode_compression(mdel));
    1526     4003493 :     } else if (key == grpc_channel_get_encodings_accepted_by_peer_string(
    1527             :                           call->channel)) {
    1528     2703562 :       set_encodings_accepted_by_peer(call, mdel->value->slice);
    1529             :     } else {
    1530     1302235 :       dest = &call->buffered_metadata[is_trailing];
    1531     1302235 :       if (dest->count == dest->capacity) {
    1532     1301890 :         dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
    1533     1301911 :         dest->metadata =
    1534     1301890 :             gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
    1535             :       }
    1536     1302256 :       mdusr = &dest->metadata[dest->count++];
    1537     1302256 :       mdusr->key = grpc_mdstr_as_c_string(mdel->key);
    1538     1302247 :       mdusr->value = grpc_mdstr_as_c_string(mdel->value);
    1539     1302317 :       mdusr->value_length = GPR_SLICE_LENGTH(mdel->value->slice);
    1540     1302317 :       if (call->owned_metadata_count == call->owned_metadata_capacity) {
    1541     1301840 :         call->owned_metadata_capacity =
    1542     1301840 :             GPR_MAX(call->owned_metadata_capacity + 8,
    1543             :                     call->owned_metadata_capacity * 2);
    1544     1301736 :         call->owned_metadata =
    1545     1301840 :             gpr_realloc(call->owned_metadata,
    1546     1301840 :                         sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
    1547             :       }
    1548     1302213 :       call->owned_metadata[call->owned_metadata_count++] = mdel;
    1549     1302213 :       l->md = NULL;
    1550             :     }
    1551             :   }
    1552     4119267 :   if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
    1553        5182 :           0 &&
    1554        5182 :       !call->is_client) {
    1555        2698 :     set_deadline_alarm(exec_ctx, call, md->deadline);
    1556             :   }
    1557     4122234 :   if (!is_trailing) {
    1558     2705113 :     call->read_state = READ_STATE_GOT_INITIAL_METADATA;
    1559             :   }
    1560             : 
    1561     4122234 :   grpc_mdctx_lock(mdctx);
    1562    12349591 :   for (l = md->list.head; l; l = l->next) {
    1563     8227511 :     if (l->md) GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md);
    1564             :   }
    1565    14730288 :   for (l = md->garbage.head; l; l = l->next) {
    1566    10609232 :     GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md);
    1567             :   }
    1568     4121056 :   grpc_mdctx_unlock(mdctx);
    1569     4121957 : }
    1570             : 
    1571     1301861 : grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
    1572     1301861 :   return CALL_STACK_FROM_CALL(call);
    1573             : }
    1574             : 
    1575             : /*
    1576             :  * BATCH API IMPLEMENTATION
    1577             :  */
    1578             : 
    1579     1402773 : static void set_status_value_directly(grpc_status_code status, void *dest) {
    1580     1402773 :   *(grpc_status_code *)dest = status;
    1581     1402773 : }
    1582             : 
    1583     1301646 : static void set_cancelled_value(grpc_status_code status, void *dest) {
    1584     1301646 :   *(grpc_status_code *)dest = (status != GRPC_STATUS_OK);
    1585     1301646 : }
    1586             : 
    1587     2941877 : static void finish_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, int success,
    1588             :                          void *tag) {
    1589     2941877 :   grpc_cq_end_op(exec_ctx, call->cq, tag, success, done_completion, call,
    1590             :                  allocate_completion(call));
    1591     2942097 : }
    1592             : 
    1593     2704804 : static void finish_batch_with_close(grpc_exec_ctx *exec_ctx, grpc_call *call,
    1594             :                                     int success, void *tag) {
    1595     2704804 :   grpc_cq_end_op(exec_ctx, call->cq, tag, 1, done_completion, call,
    1596             :                  allocate_completion(call));
    1597     2703767 : }
    1598             : 
    1599     3000850 : static int are_write_flags_valid(gpr_uint32 flags) {
    1600             :   /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
    1601     3000850 :   const gpr_uint32 allowed_write_positions =
    1602             :       (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
    1603     3000850 :   const gpr_uint32 invalid_positions = ~allowed_write_positions;
    1604     3000850 :   return !(flags & invalid_positions);
    1605             : }
    1606             : 
    1607     5636116 : grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
    1608             :                                       size_t nops, void *tag, void *reserved) {
    1609             :   grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
    1610             :   size_t in;
    1611             :   size_t out;
    1612             :   const grpc_op *op;
    1613             :   grpc_ioreq *req;
    1614     5636116 :   void (*finish_func)(grpc_exec_ctx *, grpc_call *, int, void *) = finish_batch;
    1615             :   grpc_call_error error;
    1616     5636116 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
    1617             : 
    1618     5636116 :   GRPC_API_TRACE(
    1619             :       "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)",
    1620             :       5, (call, ops, (unsigned long)nops, tag, reserved));
    1621             : 
    1622     5638652 :   if (reserved != NULL) {
    1623           0 :     error = GRPC_CALL_ERROR;
    1624           0 :     goto done;
    1625             :   }
    1626             : 
    1627     5638652 :   GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
    1628             : 
    1629     5626296 :   if (nops == 0) {
    1630          23 :     grpc_cq_begin_op(call->cq);
    1631          23 :     GRPC_CALL_INTERNAL_REF(call, "completion");
    1632          23 :     grpc_cq_end_op(&exec_ctx, call->cq, tag, 1, done_completion, call,
    1633             :                    allocate_completion(call));
    1634          23 :     error = GRPC_CALL_OK;
    1635          23 :     goto done;
    1636             :   }
    1637             : 
    1638             :   /* rewrite batch ops into ioreq ops */
    1639    19807898 :   for (in = 0, out = 0; in < nops; in++) {
    1640    14169142 :     op = &ops[in];
    1641    14169142 :     if (op->reserved != NULL) {
    1642           0 :       error = GRPC_CALL_ERROR;
    1643           0 :       goto done;
    1644             :     }
    1645    14182430 :     switch (op->op) {
    1646             :       case GRPC_OP_SEND_INITIAL_METADATA:
    1647             :         /* Flag validation: currently allow no flags */
    1648     2702572 :         if (op->flags != 0) {
    1649          20 :           error = GRPC_CALL_ERROR_INVALID_FLAGS;
    1650          20 :           goto done;
    1651             :         }
    1652     2702552 :         req = &reqs[out++];
    1653     2702552 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1654           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1655           0 :           goto done;
    1656             :         }
    1657     2702552 :         req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
    1658     2702552 :         req->data.send_metadata.count = op->data.send_initial_metadata.count;
    1659     2702552 :         req->data.send_metadata.metadata =
    1660     2702552 :             op->data.send_initial_metadata.metadata;
    1661     2702552 :         req->flags = op->flags;
    1662     2702552 :         break;
    1663             :       case GRPC_OP_SEND_MESSAGE:
    1664     3000562 :         if (!are_write_flags_valid(op->flags)) {
    1665          20 :           error = GRPC_CALL_ERROR_INVALID_FLAGS;
    1666          20 :           goto done;
    1667             :         }
    1668     2999917 :         if (op->data.send_message == NULL) {
    1669           0 :           error = GRPC_CALL_ERROR_INVALID_MESSAGE;
    1670           0 :           goto done;
    1671             :         }
    1672     2999917 :         req = &reqs[out++];
    1673     2999917 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1674           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1675           0 :           goto done;
    1676             :         }
    1677     2999917 :         req->op = GRPC_IOREQ_SEND_MESSAGE;
    1678     2999917 :         req->data.send_message = op->data.send_message;
    1679     2999917 :         req->flags = op->flags;
    1680     2999917 :         break;
    1681             :       case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
    1682             :         /* Flag validation: currently allow no flags */
    1683     1402671 :         if (op->flags != 0) {
    1684          20 :           error = GRPC_CALL_ERROR_INVALID_FLAGS;
    1685          20 :           goto done;
    1686             :         }
    1687     1402651 :         if (!call->is_client) {
    1688           0 :           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
    1689           0 :           goto done;
    1690             :         }
    1691     1402651 :         req = &reqs[out++];
    1692     1402651 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1693           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1694           0 :           goto done;
    1695             :         }
    1696     1402651 :         req->op = GRPC_IOREQ_SEND_CLOSE;
    1697     1402651 :         req->flags = op->flags;
    1698     1402651 :         break;
    1699             :       case GRPC_OP_SEND_STATUS_FROM_SERVER:
    1700             :         /* Flag validation: currently allow no flags */
    1701     1301350 :         if (op->flags != 0) {
    1702           0 :           error = GRPC_CALL_ERROR_INVALID_FLAGS;
    1703           0 :           goto done;
    1704             :         }
    1705     1301350 :         if (call->is_client) {
    1706           0 :           error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
    1707           0 :           goto done;
    1708             :         }
    1709     1301350 :         req = &reqs[out++];
    1710     1301350 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1711           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1712           0 :           goto done;
    1713             :         }
    1714     1301350 :         req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
    1715     1301350 :         req->flags = op->flags;
    1716     1301350 :         req->data.send_metadata.count =
    1717     1301350 :             op->data.send_status_from_server.trailing_metadata_count;
    1718     1301350 :         req->data.send_metadata.metadata =
    1719     1301350 :             op->data.send_status_from_server.trailing_metadata;
    1720     1301350 :         req = &reqs[out++];
    1721     1301350 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1722           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1723           0 :           goto done;
    1724             :         }
    1725     1301350 :         req->op = GRPC_IOREQ_SEND_STATUS;
    1726     1301350 :         req->data.send_status.code = op->data.send_status_from_server.status;
    1727     1301270 :         req->data.send_status.details =
    1728     1301350 :             op->data.send_status_from_server.status_details != NULL
    1729        2431 :                 ? grpc_mdstr_from_string(
    1730             :                       call->metadata_context,
    1731             :                       op->data.send_status_from_server.status_details)
    1732     1303781 :                 : NULL;
    1733     1301270 :         req = &reqs[out++];
    1734     1301270 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1735           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1736           0 :           goto done;
    1737             :         }
    1738     1301270 :         req->op = GRPC_IOREQ_SEND_CLOSE;
    1739     1301270 :         break;
    1740             :       case GRPC_OP_RECV_INITIAL_METADATA:
    1741             :         /* Flag validation: currently allow no flags */
    1742     1402567 :         if (op->flags != 0) {
    1743          20 :           error = GRPC_CALL_ERROR_INVALID_FLAGS;
    1744          20 :           goto done;
    1745             :         }
    1746     1402547 :         if (!call->is_client) {
    1747           0 :           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
    1748           0 :           goto done;
    1749             :         }
    1750     1402547 :         req = &reqs[out++];
    1751     1402547 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1752           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1753           0 :           goto done;
    1754             :         }
    1755     1402547 :         req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
    1756     1402547 :         req->data.recv_metadata = op->data.recv_initial_metadata;
    1757     1402547 :         req->data.recv_metadata->count = 0;
    1758     1402547 :         req->flags = op->flags;
    1759     1402547 :         break;
    1760             :       case GRPC_OP_RECV_MESSAGE:
    1761             :         /* Flag validation: currently allow no flags */
    1762     1702865 :         if (op->flags != 0) {
    1763           0 :           error = GRPC_CALL_ERROR_INVALID_FLAGS;
    1764           0 :           goto done;
    1765             :         }
    1766     1702865 :         req = &reqs[out++];
    1767     1702865 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1768           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1769           0 :           goto done;
    1770             :         }
    1771     1702865 :         req->op = GRPC_IOREQ_RECV_MESSAGE;
    1772     1702865 :         req->data.recv_message = op->data.recv_message;
    1773     1702865 :         req->flags = op->flags;
    1774     1702865 :         break;
    1775             :       case GRPC_OP_RECV_STATUS_ON_CLIENT:
    1776             :         /* Flag validation: currently allow no flags */
    1777     1402244 :         if (op->flags != 0) {
    1778          20 :           error = GRPC_CALL_ERROR_INVALID_FLAGS;
    1779          20 :           goto done;
    1780             :         }
    1781     1402224 :         if (!call->is_client) {
    1782           0 :           error = GRPC_CALL_ERROR_NOT_ON_SERVER;
    1783           0 :           goto done;
    1784             :         }
    1785     1402224 :         req = &reqs[out++];
    1786     1402224 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1787           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1788           0 :           goto done;
    1789             :         }
    1790     1402224 :         req->op = GRPC_IOREQ_RECV_STATUS;
    1791     1402224 :         req->flags = op->flags;
    1792     1402224 :         req->data.recv_status.set_value = set_status_value_directly;
    1793     1402224 :         req->data.recv_status.user_data = op->data.recv_status_on_client.status;
    1794     1402224 :         req = &reqs[out++];
    1795     1402224 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1796           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1797           0 :           goto done;
    1798             :         }
    1799     1402224 :         req->op = GRPC_IOREQ_RECV_STATUS_DETAILS;
    1800     1402224 :         req->data.recv_status_details.details =
    1801     1402224 :             op->data.recv_status_on_client.status_details;
    1802     1402224 :         req->data.recv_status_details.details_capacity =
    1803     1402224 :             op->data.recv_status_on_client.status_details_capacity;
    1804     1402224 :         req = &reqs[out++];
    1805     1402224 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1806           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1807           0 :           goto done;
    1808             :         }
    1809     1402224 :         req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
    1810     1402224 :         req->data.recv_metadata =
    1811     1402224 :             op->data.recv_status_on_client.trailing_metadata;
    1812     1402224 :         req->data.recv_metadata->count = 0;
    1813     1402224 :         req = &reqs[out++];
    1814     1402224 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1815           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1816           0 :           goto done;
    1817             :         }
    1818     1402224 :         req->op = GRPC_IOREQ_RECV_CLOSE;
    1819     1402224 :         finish_func = finish_batch_with_close;
    1820     1402224 :         break;
    1821             :       case GRPC_OP_RECV_CLOSE_ON_SERVER:
    1822             :         /* Flag validation: currently allow no flags */
    1823     1301306 :         if (op->flags != 0) {
    1824           0 :           error = GRPC_CALL_ERROR_INVALID_FLAGS;
    1825           0 :           goto done;
    1826             :         }
    1827     1301306 :         req = &reqs[out++];
    1828     1301306 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1829           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1830           0 :           goto done;
    1831             :         }
    1832     1301306 :         req->op = GRPC_IOREQ_RECV_STATUS;
    1833     1301306 :         req->flags = op->flags;
    1834     1301306 :         req->data.recv_status.set_value = set_cancelled_value;
    1835     1301306 :         req->data.recv_status.user_data =
    1836     1301306 :             op->data.recv_close_on_server.cancelled;
    1837     1301306 :         req = &reqs[out++];
    1838     1301306 :         if (out > GRPC_IOREQ_OP_COUNT) {
    1839           0 :           error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
    1840           0 :           goto done;
    1841             :         }
    1842     1301306 :         req->op = GRPC_IOREQ_RECV_CLOSE;
    1843     1301306 :         finish_func = finish_batch_with_close;
    1844     1301306 :         break;
    1845             :     }
    1846             :   }
    1847             : 
    1848     5638756 :   GRPC_CALL_INTERNAL_REF(call, "completion");
    1849     5643282 :   grpc_cq_begin_op(call->cq);
    1850             : 
    1851     5645403 :   error = grpc_call_start_ioreq_and_call_back(&exec_ctx, call, reqs, out,
    1852             :                                               finish_func, tag);
    1853             : done:
    1854     5629918 :   grpc_exec_ctx_finish(&exec_ctx);
    1855     5643613 :   return error;
    1856             : }
    1857             : 
    1858         530 : void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
    1859             :                            void *value, void (*destroy)(void *value)) {
    1860         530 :   if (call->context[elem].destroy) {
    1861           0 :     call->context[elem].destroy(call->context[elem].value);
    1862             :   }
    1863         530 :   call->context[elem].value = value;
    1864         530 :   call->context[elem].destroy = destroy;
    1865         530 : }
    1866             : 
    1867     1299171 : void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) {
    1868     1299171 :   return call->context[elem].value;
    1869             : }
    1870             : 
    1871         325 : gpr_uint8 grpc_call_is_client(grpc_call *call) { return call->is_client; }

Generated by: LCOV version 1.10