Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 : #include <assert.h>
34 : #include <stdio.h>
35 : #include <stdlib.h>
36 : #include <string.h>
37 :
38 : #include <grpc/compression.h>
39 : #include <grpc/support/alloc.h>
40 : #include <grpc/support/log.h>
41 : #include <grpc/support/string_util.h>
42 : #include <grpc/support/useful.h>
43 :
44 : #include "src/core/channel/channel_stack.h"
45 : #include "src/core/iomgr/alarm.h"
46 : #include "src/core/profiling/timers.h"
47 : #include "src/core/support/string.h"
48 : #include "src/core/surface/api_trace.h"
49 : #include "src/core/surface/byte_buffer_queue.h"
50 : #include "src/core/surface/call.h"
51 : #include "src/core/surface/channel.h"
52 : #include "src/core/surface/completion_queue.h"
53 :
54 : /** The maximum number of completions possible.
55 : Based upon the maximum number of individually queueable ops in the batch
56 : api:
57 : - initial metadata send
58 : - message send
59 : - status/close send (depending on client/server)
60 : - initial metadata recv
61 : - message recv
62 : - status/close recv (depending on client/server) */
63 : #define MAX_CONCURRENT_COMPLETIONS 6
64 :
65 : typedef struct {
66 : grpc_ioreq_completion_func on_complete;
67 : void *user_data;
68 : int success;
69 : } completed_request;
70 :
71 : /* See request_set in grpc_call below for a description */
72 : #define REQSET_EMPTY 'X'
73 : #define REQSET_DONE 'Y'
74 :
75 : #define MAX_SEND_INITIAL_METADATA_COUNT 3
76 :
77 : typedef struct {
78 : /* Overall status of the operation: starts OK, may degrade to
79 : non-OK */
80 : gpr_uint8 success;
81 : /* a bit mask of which request ops are needed (1u << opid) */
82 : gpr_uint16 need_mask;
83 : /* a bit mask of which request ops are now completed */
84 : gpr_uint16 complete_mask;
85 : /* Completion function to call at the end of the operation */
86 : grpc_ioreq_completion_func on_complete;
87 : void *user_data;
88 : } reqinfo_master;
89 :
90 : /* Status data for a request can come from several sources; this
91 : enumerates them all, and acts as a priority sorting for which
92 : status to return to the application - earlier entries override
93 : later ones */
94 : typedef enum {
95 : /* Status came from the application layer overriding whatever
96 : the wire says */
97 : STATUS_FROM_API_OVERRIDE = 0,
98 : /* Status was created by some internal channel stack operation */
99 : STATUS_FROM_CORE,
100 : /* Status came from 'the wire' - or somewhere below the surface
101 : layer */
102 : STATUS_FROM_WIRE,
103 : /* Status came from the server sending status */
104 : STATUS_FROM_SERVER_STATUS,
105 : STATUS_SOURCE_COUNT
106 : } status_source;
107 :
108 : typedef struct {
109 : gpr_uint8 is_set;
110 : grpc_status_code code;
111 : grpc_mdstr *details;
112 : } received_status;
113 :
114 : /* How far through the GRPC stream have we read? */
115 : typedef enum {
116 : /* We are still waiting for initial metadata to complete */
117 : READ_STATE_INITIAL = 0,
118 : /* We have gotten initial metadata, and are reading either
119 : messages or trailing metadata */
120 : READ_STATE_GOT_INITIAL_METADATA,
121 : /* The stream is closed for reading */
122 : READ_STATE_READ_CLOSED,
123 : /* The stream is closed for reading & writing */
124 : READ_STATE_STREAM_CLOSED
125 : } read_state;
126 :
127 : typedef enum {
128 : WRITE_STATE_INITIAL = 0,
129 : WRITE_STATE_STARTED,
130 : WRITE_STATE_WRITE_CLOSED
131 : } write_state;
132 :
133 : struct grpc_call {
134 : grpc_completion_queue *cq;
135 : grpc_channel *channel;
136 : grpc_call *parent;
137 : grpc_call *first_child;
138 : grpc_mdctx *metadata_context;
139 : /* TODO(ctiller): share with cq if possible? */
140 : gpr_mu mu;
141 : gpr_mu completion_mu;
142 :
143 : /* how far through the stream have we read? */
144 : read_state read_state;
145 : /* how far through the stream have we written? */
146 : write_state write_state;
147 : /* client or server call */
148 : gpr_uint8 is_client;
149 : /* is the alarm set */
150 : gpr_uint8 have_alarm;
151 : /* are we currently performing a send operation */
152 : gpr_uint8 sending;
153 : /* are we currently performing a recv operation */
154 : gpr_uint8 receiving;
155 : /* are we currently completing requests */
156 : gpr_uint8 completing;
157 : /** has grpc_call_destroy been called */
158 : gpr_uint8 destroy_called;
159 : /* pairs with completed_requests */
160 : gpr_uint8 num_completed_requests;
161 : /* are we currently reading a message? */
162 : gpr_uint8 reading_message;
163 : /* have we bound a pollset yet? */
164 : gpr_uint8 bound_pollset;
165 : /* is an error status set */
166 : gpr_uint8 error_status_set;
167 : /** bitmask of allocated completion events in completions */
168 : gpr_uint8 allocated_completions;
169 : /** flag indicating that cancellation is inherited */
170 : gpr_uint8 cancellation_is_inherited;
171 :
172 : /* flags with bits corresponding to write states allowing us to determine
173 : what was sent */
174 : gpr_uint16 last_send_contains;
175 : /* cancel with this status on the next outgoing transport op */
176 : grpc_status_code cancel_with_status;
177 :
178 : /* Active ioreqs.
179 : request_set and request_data contain one element per active ioreq
180 : operation.
181 :
182 : request_set[op] is an integer specifying a set of operations to which
183 : the request belongs:
184 : - if it is < GRPC_IOREQ_OP_COUNT, then this operation is pending
185 : completion, and the integer represents to which group of operations
186 : the ioreq belongs. Each group is represented by one master, and the
187 : integer in request_set is an index into masters to find the master
188 : data.
189 : - if it is REQSET_EMPTY, the ioreq op is inactive and available to be
190 : started
191 : - finally, if request_set[op] is REQSET_DONE, then the operation is
192 : complete and unavailable to be started again
193 :
194 : request_data[op] is the request data as supplied by the initiator of
195 : a request, and is valid iff request_set[op] <= GRPC_IOREQ_OP_COUNT.
196 : The set fields are as per the request type specified by op.
197 :
198 : Finally, one element of masters is set per active _set_ of ioreq
199 : operations. It describes work left outstanding, result status, and
200 : what work to perform upon operation completion. As one ioreq of each
201 : op type can be active at once, by convention we choose the first element
202 : of the group to be the master -- ie the master of in-progress operation
203 : op is masters[request_set[op]]. This allows constant time allocation
204 : and a strong upper bound of a count of masters to be calculated. */
205 : gpr_uint8 request_set[GRPC_IOREQ_OP_COUNT];
206 : grpc_ioreq_data request_data[GRPC_IOREQ_OP_COUNT];
207 : gpr_uint32 request_flags[GRPC_IOREQ_OP_COUNT];
208 : reqinfo_master masters[GRPC_IOREQ_OP_COUNT];
209 :
210 : /* Dynamic array of ioreq's that have completed: the count of
211 : elements is queued in num_completed_requests.
212 : This list is built up under lock(), and flushed entirely during
213 : unlock().
214 : We know the upper bound of the number of elements as we can only
215 : have one ioreq of each type active at once. */
216 : completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
217 : /* Incoming buffer of messages */
218 : grpc_byte_buffer_queue incoming_queue;
219 : /* Buffered read metadata waiting to be returned to the application.
220 : Element 0 is initial metadata, element 1 is trailing metadata. */
221 : grpc_metadata_array buffered_metadata[2];
222 : /* All metadata received - unreffed at once at the end of the call */
223 : grpc_mdelem **owned_metadata;
224 : size_t owned_metadata_count;
225 : size_t owned_metadata_capacity;
226 :
227 : /* Received call statuses from various sources */
228 : received_status status[STATUS_SOURCE_COUNT];
229 :
230 : /* Compression algorithm for the call */
231 : grpc_compression_algorithm compression_algorithm;
232 :
233 : /* Supported encodings (compression algorithms), a bitset */
234 : gpr_uint32 encodings_accepted_by_peer;
235 :
236 : /* Contexts for various subsystems (security, tracing, ...). */
237 : grpc_call_context_element context[GRPC_CONTEXT_COUNT];
238 :
239 : /* Deadline alarm - if have_alarm is non-zero */
240 : grpc_alarm alarm;
241 :
242 : /* Call refcount - to keep the call alive during asynchronous operations */
243 : gpr_refcount internal_refcount;
244 :
245 : grpc_linked_mdelem send_initial_metadata[MAX_SEND_INITIAL_METADATA_COUNT];
246 : grpc_linked_mdelem status_link;
247 : grpc_linked_mdelem details_link;
248 : size_t send_initial_metadata_count;
249 : gpr_timespec send_deadline;
250 :
251 : grpc_stream_op_buffer send_ops;
252 : grpc_stream_op_buffer recv_ops;
253 : grpc_stream_state recv_state;
254 :
255 : gpr_slice_buffer incoming_message;
256 : gpr_uint32 incoming_message_length;
257 : gpr_uint32 incoming_message_flags;
258 : grpc_closure destroy_closure;
259 : grpc_closure on_done_recv;
260 : grpc_closure on_done_send;
261 : grpc_closure on_done_bind;
262 :
263 : /** completion events - for completion queue use */
264 : grpc_cq_completion completions[MAX_CONCURRENT_COMPLETIONS];
265 :
266 : /** siblings: children of the same parent form a list, and this list is
267 : protected under
268 : parent->mu */
269 : grpc_call *sibling_next;
270 : grpc_call *sibling_prev;
271 : };
272 :
273 : #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
274 : #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
275 : #define CALL_ELEM_FROM_CALL(call, idx) \
276 : grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
277 : #define CALL_FROM_TOP_ELEM(top_elem) \
278 : CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
279 :
280 : static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
281 : gpr_timespec deadline);
282 : static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *call, int success);
283 : static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *call, int success);
284 : static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op);
285 : static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
286 : grpc_transport_stream_op *op);
287 : static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call,
288 : grpc_metadata_batch *metadata);
289 : static void finish_read_ops(grpc_call *call);
290 : static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
291 : const char *description);
292 : static void finished_loose_op(grpc_exec_ctx *exec_ctx, void *call, int success);
293 :
294 : static void lock(grpc_call *call);
295 : static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call);
296 :
297 2703891 : grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
298 : gpr_uint32 propagation_mask,
299 : grpc_completion_queue *cq,
300 : const void *server_transport_data,
301 : grpc_mdelem **add_initial_metadata,
302 : size_t add_initial_metadata_count,
303 : gpr_timespec send_deadline) {
304 : size_t i;
305 : grpc_transport_stream_op initial_op;
306 2703891 : grpc_transport_stream_op *initial_op_ptr = NULL;
307 2703891 : grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
308 2704453 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
309 2704453 : grpc_call *call =
310 2704453 : gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
311 2704088 : memset(call, 0, sizeof(grpc_call));
312 2704088 : gpr_mu_init(&call->mu);
313 2704145 : gpr_mu_init(&call->completion_mu);
314 2703747 : call->channel = channel;
315 2703747 : call->cq = cq;
316 2703747 : if (cq != NULL) {
317 1403260 : GRPC_CQ_INTERNAL_REF(cq, "bind");
318 : }
319 2704535 : call->parent = parent_call;
320 2704535 : call->is_client = server_transport_data == NULL;
321 32428029 : for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
322 29723494 : call->request_set[i] = REQSET_EMPTY;
323 : }
324 2704535 : if (call->is_client) {
325 1403299 : call->request_set[GRPC_IOREQ_SEND_TRAILING_METADATA] = REQSET_DONE;
326 1403299 : call->request_set[GRPC_IOREQ_SEND_STATUS] = REQSET_DONE;
327 : }
328 2704535 : GPR_ASSERT(add_initial_metadata_count < MAX_SEND_INITIAL_METADATA_COUNT);
329 5510183 : for (i = 0; i < add_initial_metadata_count; i++) {
330 2805648 : call->send_initial_metadata[i].md = add_initial_metadata[i];
331 : }
332 2704535 : call->send_initial_metadata_count = add_initial_metadata_count;
333 2704535 : call->send_deadline = send_deadline;
334 2704535 : GRPC_CHANNEL_INTERNAL_REF(channel, "call");
335 2705047 : call->metadata_context = grpc_channel_get_metadata_context(channel);
336 2705042 : grpc_sopb_init(&call->send_ops);
337 2704983 : grpc_sopb_init(&call->recv_ops);
338 2704852 : gpr_slice_buffer_init(&call->incoming_message);
339 2704792 : grpc_closure_init(&call->on_done_recv, call_on_done_recv, call);
340 2704755 : grpc_closure_init(&call->on_done_send, call_on_done_send, call);
341 2704477 : grpc_closure_init(&call->on_done_bind, finished_loose_op, call);
342 : /* dropped in destroy and when READ_STATE_STREAM_CLOSED received */
343 2703932 : gpr_ref_init(&call->internal_refcount, 2);
344 : /* server hack: start reads immediately so we can get initial metadata.
345 : TODO(ctiller): figure out a cleaner solution */
346 2703730 : if (!call->is_client) {
347 1301630 : memset(&initial_op, 0, sizeof(initial_op));
348 1301630 : initial_op.recv_ops = &call->recv_ops;
349 1301630 : initial_op.recv_state = &call->recv_state;
350 1301630 : initial_op.on_done_recv = &call->on_done_recv;
351 1301630 : initial_op.context = call->context;
352 1301630 : call->receiving = 1;
353 1301630 : GRPC_CALL_INTERNAL_REF(call, "receiving");
354 1301853 : initial_op_ptr = &initial_op;
355 : }
356 2703953 : grpc_call_stack_init(&exec_ctx, channel_stack, server_transport_data,
357 : initial_op_ptr, CALL_STACK_FROM_CALL(call));
358 2703925 : if (parent_call != NULL) {
359 502 : GRPC_CALL_INTERNAL_REF(parent_call, "child");
360 502 : GPR_ASSERT(call->is_client);
361 502 : GPR_ASSERT(!parent_call->is_client);
362 :
363 502 : gpr_mu_lock(&parent_call->mu);
364 :
365 502 : if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
366 502 : send_deadline = gpr_time_min(
367 : gpr_convert_clock_type(send_deadline,
368 : parent_call->send_deadline.clock_type),
369 : parent_call->send_deadline);
370 : }
371 : /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
372 : * GRPC_PROPAGATE_STATS_CONTEXT */
373 : /* TODO(ctiller): This should change to use the appropriate census start_op
374 : * call. */
375 502 : if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
376 502 : GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
377 502 : grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
378 : parent_call->context[GRPC_CONTEXT_TRACING].value,
379 : NULL);
380 : } else {
381 0 : GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
382 : }
383 502 : if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
384 502 : call->cancellation_is_inherited = 1;
385 : }
386 :
387 502 : if (parent_call->first_child == NULL) {
388 502 : parent_call->first_child = call;
389 502 : call->sibling_next = call->sibling_prev = call;
390 : } else {
391 0 : call->sibling_next = parent_call->first_child;
392 0 : call->sibling_prev = parent_call->first_child->sibling_prev;
393 0 : call->sibling_next->sibling_prev = call->sibling_prev->sibling_next =
394 : call;
395 : }
396 :
397 502 : gpr_mu_unlock(&parent_call->mu);
398 : }
399 2703925 : if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
400 : 0) {
401 3160 : set_deadline_alarm(&exec_ctx, call, send_deadline);
402 : }
403 2703513 : grpc_exec_ctx_finish(&exec_ctx);
404 2704697 : return call;
405 : }
406 :
407 1301717 : void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
408 : grpc_completion_queue *cq) {
409 1301717 : lock(call);
410 1301594 : call->cq = cq;
411 1301594 : if (cq) {
412 1301733 : GRPC_CQ_INTERNAL_REF(cq, "bind");
413 : }
414 1301603 : unlock(exec_ctx, call);
415 1301525 : }
416 :
417 0 : grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
418 0 : return call->cq;
419 : }
420 :
421 5644836 : static grpc_cq_completion *allocate_completion(grpc_call *call) {
422 : gpr_uint8 i;
423 5644836 : gpr_mu_lock(&call->completion_mu);
424 13283240 : for (i = 0; i < GPR_ARRAY_SIZE(call->completions); i++) {
425 6641620 : if (call->allocated_completions & (1u << i)) {
426 994996 : continue;
427 : }
428 : /* NB: the following integer arithmetic operation needs to be in its
429 : * expanded form due to the "integral promotion" performed (see section
430 : * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type
431 : * is then required to avoid the compiler warning */
432 5646624 : call->allocated_completions =
433 5646624 : (gpr_uint8)(call->allocated_completions | (1u << i));
434 5646624 : gpr_mu_unlock(&call->completion_mu);
435 5646159 : return &call->completions[i];
436 : }
437 0 : GPR_UNREACHABLE_CODE(return NULL);
438 : return NULL;
439 : }
440 :
441 5641180 : static void done_completion(grpc_exec_ctx *exec_ctx, void *call,
442 : grpc_cq_completion *completion) {
443 5641180 : grpc_call *c = call;
444 5641180 : gpr_mu_lock(&c->completion_mu);
445 11290624 : c->allocated_completions &=
446 5645312 : (gpr_uint8) ~(1u << (completion - c->completions));
447 5645312 : gpr_mu_unlock(&c->completion_mu);
448 5643445 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, c, "completion");
449 5644369 : }
450 :
451 : #ifdef GRPC_CALL_REF_COUNT_DEBUG
452 : void grpc_call_internal_ref(grpc_call *c, const char *reason) {
453 : gpr_log(GPR_DEBUG, "CALL: ref %p %d -> %d [%s]", c,
454 : c->internal_refcount.count, c->internal_refcount.count + 1, reason);
455 : #else
456 25005364 : void grpc_call_internal_ref(grpc_call *c) {
457 : #endif
458 25005364 : gpr_ref(&c->internal_refcount);
459 25120528 : }
460 :
461 2705085 : static void destroy_call(grpc_exec_ctx *exec_ctx, grpc_call *call) {
462 : size_t i;
463 2705085 : grpc_call *c = call;
464 2705085 : grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
465 2703236 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
466 2705039 : gpr_mu_destroy(&c->mu);
467 2704850 : gpr_mu_destroy(&c->completion_mu);
468 13519042 : for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
469 10815128 : if (c->status[i].details) {
470 385254 : GRPC_MDSTR_UNREF(c->status[i].details);
471 : }
472 : }
473 4006230 : for (i = 0; i < c->owned_metadata_count; i++) {
474 1302197 : GRPC_MDELEM_UNREF(c->owned_metadata[i]);
475 : }
476 2704033 : gpr_free(c->owned_metadata);
477 8109120 : for (i = 0; i < GPR_ARRAY_SIZE(c->buffered_metadata); i++) {
478 5406473 : gpr_free(c->buffered_metadata[i].metadata);
479 : }
480 2703041 : for (i = 0; i < c->send_initial_metadata_count; i++) {
481 394 : GRPC_MDELEM_UNREF(c->send_initial_metadata[i].md);
482 : }
483 8108433 : for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
484 5405566 : if (c->context[i].destroy) {
485 1740 : c->context[i].destroy(c->context[i].value);
486 : }
487 : }
488 2702867 : grpc_sopb_destroy(&c->send_ops);
489 2702723 : grpc_sopb_destroy(&c->recv_ops);
490 2702591 : grpc_bbq_destroy(&c->incoming_queue);
491 2703987 : gpr_slice_buffer_destroy(&c->incoming_message);
492 2703939 : if (c->cq) {
493 2703823 : GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
494 : }
495 2705124 : gpr_free(c);
496 2704632 : }
497 :
498 : #ifdef GRPC_CALL_REF_COUNT_DEBUG
499 : void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c,
500 : const char *reason) {
501 : gpr_log(GPR_DEBUG, "CALL: unref %p %d -> %d [%s]", c,
502 : c->internal_refcount.count, c->internal_refcount.count - 1, reason);
503 : #else
504 30421284 : void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c) {
505 : #endif
506 30421284 : if (gpr_unref(&c->internal_refcount)) {
507 2705089 : destroy_call(exec_ctx, c);
508 : }
509 30583198 : }
510 :
511 2997059 : static void set_status_code(grpc_call *call, status_source source,
512 : gpr_uint32 status) {
513 5994117 : if (call->status[source].is_set) return;
514 :
515 2991661 : call->status[source].is_set = 1;
516 2991661 : call->status[source].code = (grpc_status_code)status;
517 2991661 : call->error_status_set = status != GRPC_STATUS_OK;
518 :
519 2991661 : if (status != GRPC_STATUS_OK && !grpc_bbq_empty(&call->incoming_queue)) {
520 131 : grpc_bbq_flush(&call->incoming_queue);
521 : }
522 : }
523 :
524 2703435 : static void set_compression_algorithm(grpc_call *call,
525 : grpc_compression_algorithm algo) {
526 2703435 : call->compression_algorithm = algo;
527 2703435 : }
528 :
529 4 : grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
530 : grpc_call *call) {
531 : grpc_compression_algorithm algorithm;
532 4 : gpr_mu_lock(&call->mu);
533 4 : algorithm = call->compression_algorithm;
534 4 : gpr_mu_unlock(&call->mu);
535 4 : return algorithm;
536 : }
537 :
538 2703127 : static void set_encodings_accepted_by_peer(
539 : grpc_call *call, const gpr_slice accept_encoding_slice) {
540 : size_t i;
541 : grpc_compression_algorithm algorithm;
542 : gpr_slice_buffer accept_encoding_parts;
543 :
544 2703127 : gpr_slice_buffer_init(&accept_encoding_parts);
545 2703544 : gpr_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
546 :
547 : /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
548 : * zeroes the whole grpc_call */
549 : /* Always support no compression */
550 2700330 : GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
551 8103114 : for (i = 0; i < accept_encoding_parts.count; i++) {
552 5399894 : const gpr_slice *accept_encoding_entry_slice =
553 5399894 : &accept_encoding_parts.slices[i];
554 16200387 : if (grpc_compression_algorithm_parse(
555 5399894 : (const char *)GPR_SLICE_START_PTR(*accept_encoding_entry_slice),
556 10800493 : GPR_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) {
557 5402464 : GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
558 : } else {
559 0 : char *accept_encoding_entry_str =
560 : gpr_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII);
561 0 : gpr_log(GPR_ERROR,
562 : "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
563 : accept_encoding_entry_str);
564 0 : gpr_free(accept_encoding_entry_str);
565 : }
566 : }
567 2703220 : }
568 :
569 1320 : gpr_uint32 grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
570 : gpr_uint32 encodings_accepted_by_peer;
571 1320 : gpr_mu_lock(&call->mu);
572 1320 : encodings_accepted_by_peer = call->encodings_accepted_by_peer;
573 1320 : gpr_mu_unlock(&call->mu);
574 1320 : return encodings_accepted_by_peer;
575 : }
576 :
577 4 : gpr_uint32 grpc_call_test_only_get_message_flags(grpc_call *call) {
578 : gpr_uint32 flags;
579 4 : gpr_mu_lock(&call->mu);
580 4 : flags = call->incoming_message_flags;
581 4 : gpr_mu_unlock(&call->mu);
582 4 : return flags;
583 : }
584 :
585 385293 : static void set_status_details(grpc_call *call, status_source source,
586 : grpc_mdstr *status) {
587 385293 : if (call->status[source].details != NULL) {
588 35 : GRPC_MDSTR_UNREF(call->status[source].details);
589 : }
590 385293 : call->status[source].details = status;
591 385293 : }
592 :
593 124556254 : static int is_op_live(grpc_call *call, grpc_ioreq_op op) {
594 124556254 : gpr_uint8 set = call->request_set[op];
595 : reqinfo_master *master;
596 124556254 : if (set >= GRPC_IOREQ_OP_COUNT) return 0;
597 44509805 : master = &call->masters[set];
598 44509805 : return (master->complete_mask & (1u << op)) == 0;
599 : }
600 :
601 26340839 : static void lock(grpc_call *call) { gpr_mu_lock(&call->mu); }
602 :
603 15974155 : static int need_more_data(grpc_call *call) {
604 15974155 : if (call->read_state == READ_STATE_STREAM_CLOSED) return 0;
605 : /* TODO(ctiller): this needs some serious cleanup */
606 22920025 : return is_op_live(call, GRPC_IOREQ_RECV_INITIAL_METADATA) ||
607 7900690 : (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE) &&
608 6712465 : grpc_bbq_empty(&call->incoming_queue)) ||
609 11044874 : is_op_live(call, GRPC_IOREQ_RECV_TRAILING_METADATA) ||
610 10845861 : is_op_live(call, GRPC_IOREQ_RECV_STATUS) ||
611 10652444 : is_op_live(call, GRPC_IOREQ_RECV_STATUS_DETAILS) ||
612 5336191 : (is_op_live(call, GRPC_IOREQ_RECV_CLOSE) &&
613 5329456 : grpc_bbq_empty(&call->incoming_queue)) ||
614 11780257 : (call->write_state == WRITE_STATE_INITIAL && !call->is_client) ||
615 16082117 : (call->cancel_with_status != GRPC_STATUS_OK) || call->destroy_called;
616 : }
617 :
618 26265701 : static void unlock(grpc_exec_ctx *exec_ctx, grpc_call *call) {
619 : grpc_transport_stream_op op;
620 : completed_request completed_requests[GRPC_IOREQ_OP_COUNT];
621 26265701 : int completing_requests = 0;
622 26265701 : int start_op = 0;
623 : int i;
624 26265701 : const size_t MAX_RECV_PEEK_AHEAD = 65536;
625 : size_t buffered_bytes;
626 :
627 26265701 : memset(&op, 0, sizeof(op));
628 :
629 26265701 : op.cancel_with_status = call->cancel_with_status;
630 26265701 : start_op = op.cancel_with_status != GRPC_STATUS_OK;
631 26265701 : call->cancel_with_status = GRPC_STATUS_OK; /* reset */
632 :
633 26265701 : if (!call->receiving && need_more_data(call)) {
634 4133603 : if (grpc_bbq_empty(&call->incoming_queue) && call->reading_message) {
635 3030303 : op.max_recv_bytes = call->incoming_message_length -
636 2020202 : call->incoming_message.length + MAX_RECV_PEEK_AHEAD;
637 : } else {
638 3123498 : buffered_bytes = grpc_bbq_bytes(&call->incoming_queue);
639 3123572 : if (buffered_bytes > MAX_RECV_PEEK_AHEAD) {
640 88502 : op.max_recv_bytes = 0;
641 : } else {
642 3035070 : op.max_recv_bytes = MAX_RECV_PEEK_AHEAD - buffered_bytes;
643 : }
644 : }
645 : /* TODO(ctiller): 1024 is basically to cover a bug
646 : I don't understand yet */
647 4133673 : if (op.max_recv_bytes > 1024) {
648 4045062 : op.recv_ops = &call->recv_ops;
649 4045062 : op.recv_state = &call->recv_state;
650 4045062 : op.on_done_recv = &call->on_done_recv;
651 4045062 : call->receiving = 1;
652 4045062 : GRPC_CALL_INTERNAL_REF(call, "receiving");
653 4046429 : start_op = 1;
654 : }
655 : }
656 :
657 26258619 : if (!call->sending) {
658 26125853 : if (fill_send_ops(call, &op)) {
659 3006765 : call->sending = 1;
660 3006765 : GRPC_CALL_INTERNAL_REF(call, "sending");
661 3007494 : start_op = 1;
662 : }
663 : }
664 :
665 26290565 : if (!call->bound_pollset && call->cq && (!call->is_client || start_op)) {
666 2595821 : call->bound_pollset = 1;
667 2595821 : op.bind_pollset = grpc_cq_pollset(call->cq);
668 2703657 : start_op = 1;
669 : }
670 :
671 26398401 : if (!call->completing && call->num_completed_requests != 0) {
672 6921240 : completing_requests = call->num_completed_requests;
673 6921240 : memcpy(completed_requests, call->completed_requests,
674 : sizeof(completed_requests));
675 6921240 : call->num_completed_requests = 0;
676 6921240 : call->completing = 1;
677 6921240 : GRPC_CALL_INTERNAL_REF(call, "completing");
678 : }
679 :
680 26406718 : gpr_mu_unlock(&call->mu);
681 :
682 26448252 : if (start_op) {
683 8268252 : execute_op(exec_ctx, call, &op);
684 : }
685 :
686 26491981 : if (completing_requests > 0) {
687 13873935 : for (i = 0; i < completing_requests; i++) {
688 6945532 : completed_requests[i].on_complete(exec_ctx, call,
689 : completed_requests[i].success,
690 : completed_requests[i].user_data);
691 : }
692 6928403 : lock(call);
693 6930515 : call->completing = 0;
694 6930515 : unlock(exec_ctx, call);
695 6931324 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completing");
696 : }
697 26492799 : }
698 :
699 2702089 : static void get_final_status(grpc_call *call, grpc_ioreq_data out) {
700 : int i;
701 8575586 : for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
702 8575586 : if (call->status[i].is_set) {
703 2702089 : out.recv_status.set_value(call->status[i].code,
704 : out.recv_status.user_data);
705 5407384 : return;
706 : }
707 : }
708 0 : if (call->is_client) {
709 0 : out.recv_status.set_value(GRPC_STATUS_UNKNOWN, out.recv_status.user_data);
710 : } else {
711 0 : out.recv_status.set_value(GRPC_STATUS_OK, out.recv_status.user_data);
712 : }
713 : }
714 :
715 1402818 : static void get_final_details(grpc_call *call, grpc_ioreq_data out) {
716 : int i;
717 4207386 : for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
718 4207345 : if (call->status[i].is_set) {
719 1402777 : if (call->status[i].details) {
720 103885 : gpr_slice details = call->status[i].details->slice;
721 103885 : size_t len = GPR_SLICE_LENGTH(details);
722 103885 : if (len + 1 > *out.recv_status_details.details_capacity) {
723 2886 : *out.recv_status_details.details_capacity = GPR_MAX(
724 : len + 1, *out.recv_status_details.details_capacity * 3 / 2);
725 5831 : *out.recv_status_details.details =
726 2886 : gpr_realloc(*out.recv_status_details.details,
727 2886 : *out.recv_status_details.details_capacity);
728 : }
729 103944 : memcpy(*out.recv_status_details.details, GPR_SLICE_START_PTR(details),
730 : len);
731 103944 : (*out.recv_status_details.details)[len] = 0;
732 : } else {
733 1298892 : goto no_details;
734 : }
735 1507045 : return;
736 : }
737 : }
738 :
739 : no_details:
740 1298933 : if (0 == *out.recv_status_details.details_capacity) {
741 1298927 : *out.recv_status_details.details_capacity = 8;
742 2598078 : *out.recv_status_details.details =
743 1298927 : gpr_malloc(*out.recv_status_details.details_capacity);
744 : }
745 1299157 : **out.recv_status_details.details = 0;
746 : }
747 :
748 24789204 : static void finish_live_ioreq_op(grpc_call *call, grpc_ioreq_op op,
749 : int success) {
750 : completed_request *cr;
751 24789204 : gpr_uint8 master_set = call->request_set[op];
752 : reqinfo_master *master;
753 : size_t i;
754 : /* ioreq is live: we need to do something */
755 24789204 : master = &call->masters[master_set];
756 : /* NB: the following integer arithmetic operation needs to be in its
757 : * expanded form due to the "integral promotion" performed (see section
758 : * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type
759 : * is then required to avoid the compiler warning */
760 24789204 : master->complete_mask = (gpr_uint16)(master->complete_mask | (1u << op));
761 24789204 : if (!success) {
762 489 : master->success = 0;
763 : }
764 24789204 : if (master->complete_mask == master->need_mask) {
765 82914140 : for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
766 75993161 : if (call->request_set[i] != master_set) {
767 51300248 : continue;
768 : }
769 24692913 : call->request_set[i] = REQSET_DONE;
770 24692913 : switch ((grpc_ioreq_op)i) {
771 : case GRPC_IOREQ_RECV_MESSAGE:
772 : case GRPC_IOREQ_SEND_MESSAGE:
773 5988929 : call->request_set[i] = REQSET_EMPTY;
774 5988929 : if (!master->success) {
775 195 : call->write_state = WRITE_STATE_WRITE_CLOSED;
776 : }
777 5988929 : break;
778 : case GRPC_IOREQ_SEND_STATUS:
779 1301234 : if (call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details !=
780 : NULL) {
781 8 : GRPC_MDSTR_UNREF(
782 : call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details);
783 8 : call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
784 : NULL;
785 : }
786 1301234 : break;
787 : case GRPC_IOREQ_RECV_CLOSE:
788 : case GRPC_IOREQ_SEND_INITIAL_METADATA:
789 : case GRPC_IOREQ_SEND_TRAILING_METADATA:
790 : case GRPC_IOREQ_SEND_CLOSE:
791 9392359 : break;
792 : case GRPC_IOREQ_RECV_STATUS:
793 2661991 : get_final_status(call, call->request_data[GRPC_IOREQ_RECV_STATUS]);
794 2702352 : break;
795 : case GRPC_IOREQ_RECV_STATUS_DETAILS:
796 1402818 : get_final_details(call,
797 : call->request_data[GRPC_IOREQ_RECV_STATUS_DETAILS]);
798 1403097 : break;
799 : case GRPC_IOREQ_RECV_INITIAL_METADATA:
800 2702880 : GPR_SWAP(grpc_metadata_array, call->buffered_metadata[0],
801 : *call->request_data[GRPC_IOREQ_RECV_INITIAL_METADATA]
802 : .recv_metadata);
803 2702880 : break;
804 : case GRPC_IOREQ_RECV_TRAILING_METADATA:
805 1402823 : GPR_SWAP(grpc_metadata_array, call->buffered_metadata[1],
806 : *call->request_data[GRPC_IOREQ_RECV_TRAILING_METADATA]
807 : .recv_metadata);
808 1402823 : break;
809 : case GRPC_IOREQ_OP_COUNT:
810 0 : abort();
811 : break;
812 : }
813 : }
814 6920979 : cr = &call->completed_requests[call->num_completed_requests++];
815 6920979 : cr->success = master->success;
816 6920979 : cr->on_complete = master->on_complete;
817 6920979 : cr->user_data = master->user_data;
818 : }
819 24829844 : }
820 :
821 57977240 : static void finish_ioreq_op(grpc_call *call, grpc_ioreq_op op, int success) {
822 57977240 : if (is_op_live(call, op)) {
823 21813360 : finish_live_ioreq_op(call, op, success);
824 : }
825 57947537 : }
826 :
827 6918375 : static void early_out_write_ops(grpc_call *call) {
828 6918375 : switch (call->write_state) {
829 : case WRITE_STATE_WRITE_CLOSED:
830 1088267 : finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, 0);
831 1088247 : finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, 0);
832 1088143 : finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, 0);
833 1087991 : finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
834 : /* fallthrough */
835 : case WRITE_STATE_STARTED:
836 1589614 : finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, 0);
837 : /* fallthrough */
838 : case WRITE_STATE_INITIAL:
839 : /* do nothing */
840 6937384 : break;
841 : }
842 6918062 : }
843 :
844 3006948 : static void call_on_done_send(grpc_exec_ctx *exec_ctx, void *pc, int success) {
845 3006948 : grpc_call *call = pc;
846 3006948 : lock(call);
847 3007292 : if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_INITIAL_METADATA)) {
848 2704589 : finish_ioreq_op(call, GRPC_IOREQ_SEND_INITIAL_METADATA, success);
849 2704510 : call->write_state = WRITE_STATE_STARTED;
850 : }
851 3007213 : if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_MESSAGE)) {
852 2999048 : finish_ioreq_op(call, GRPC_IOREQ_SEND_MESSAGE, success);
853 : }
854 3006586 : if (call->last_send_contains & (1 << GRPC_IOREQ_SEND_CLOSE)) {
855 2703420 : finish_ioreq_op(call, GRPC_IOREQ_SEND_TRAILING_METADATA, success);
856 2702588 : finish_ioreq_op(call, GRPC_IOREQ_SEND_STATUS, success);
857 2702046 : finish_ioreq_op(call, GRPC_IOREQ_SEND_CLOSE, 1);
858 2700937 : call->write_state = WRITE_STATE_WRITE_CLOSED;
859 : }
860 3004103 : if (!success) {
861 282 : call->write_state = WRITE_STATE_WRITE_CLOSED;
862 282 : early_out_write_ops(call);
863 : }
864 3004103 : call->send_ops.nops = 0;
865 3004103 : call->last_send_contains = 0;
866 3004103 : call->sending = 0;
867 3004103 : unlock(exec_ctx, call);
868 3007430 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "sending");
869 3007046 : }
870 :
871 3001213 : static void finish_message(grpc_call *call) {
872 3001213 : if (call->error_status_set == 0) {
873 : /* TODO(ctiller): this could be a lot faster if coded directly */
874 : grpc_byte_buffer *byte_buffer;
875 : /* some aliases for readability */
876 3001226 : gpr_slice *slices = call->incoming_message.slices;
877 3001226 : const size_t nslices = call->incoming_message.count;
878 :
879 3001292 : if ((call->incoming_message_flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
880 66 : (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
881 66 : byte_buffer = grpc_raw_compressed_byte_buffer_create(
882 : slices, nslices, call->compression_algorithm);
883 : } else {
884 3001160 : byte_buffer = grpc_raw_byte_buffer_create(slices, nslices);
885 : }
886 3000079 : grpc_bbq_push(&call->incoming_queue, byte_buffer);
887 : }
888 3000705 : gpr_slice_buffer_reset_and_unref(&call->incoming_message);
889 3000676 : GPR_ASSERT(call->incoming_message.count == 0);
890 3000676 : call->reading_message = 0;
891 3000676 : }
892 :
893 3001131 : static int begin_message(grpc_call *call, grpc_begin_message msg) {
894 : /* can't begin a message when we're still reading a message */
895 3001131 : if (call->reading_message) {
896 0 : char *message = NULL;
897 0 : gpr_asprintf(
898 : &message, "Message terminated early; read %d bytes, expected %d",
899 0 : (int)call->incoming_message.length, (int)call->incoming_message_length);
900 0 : cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
901 0 : gpr_free(message);
902 0 : return 0;
903 : }
904 : /* sanity check: if message flags indicate a compressed message, the
905 : * compression level should already be present in the call, as parsed off its
906 : * corresponding metadata. */
907 3001210 : if ((msg.flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
908 79 : (call->compression_algorithm == GRPC_COMPRESS_NONE)) {
909 13 : char *message = NULL;
910 : char *alg_name;
911 13 : if (!grpc_compression_algorithm_name(call->compression_algorithm,
912 : &alg_name)) {
913 : /* This shouldn't happen, other than due to data corruption */
914 0 : alg_name = "<unknown>";
915 : }
916 13 : gpr_asprintf(&message,
917 : "Invalid compression algorithm (%s) for compressed message.",
918 : alg_name);
919 13 : cancel_with_status(call, GRPC_STATUS_INTERNAL, message);
920 13 : gpr_free(message);
921 13 : return 0;
922 : }
923 : /* stash away parameters, and prepare for incoming slices */
924 3001118 : if (msg.length > grpc_channel_get_max_message_length(call->channel)) {
925 25 : char *message = NULL;
926 25 : gpr_asprintf(
927 : &message,
928 : "Maximum message length of %d exceeded by a message of length %d",
929 : grpc_channel_get_max_message_length(call->channel), msg.length);
930 25 : cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
931 25 : gpr_free(message);
932 25 : return 0;
933 3000877 : } else if (msg.length > 0) {
934 3000871 : call->reading_message = 1;
935 3000871 : call->incoming_message_length = msg.length;
936 3000871 : call->incoming_message_flags = msg.flags;
937 3000871 : return 1;
938 : } else {
939 6 : finish_message(call);
940 6 : return 1;
941 : }
942 : }
943 :
944 7042052 : static int add_slice_to_message(grpc_call *call, gpr_slice slice) {
945 7042052 : if (GPR_SLICE_LENGTH(slice) == 0) {
946 6 : gpr_slice_unref(slice);
947 6 : return 1;
948 : }
949 : /* we have to be reading a message to know what to do here */
950 7042046 : if (!call->reading_message) {
951 1 : gpr_slice_unref(slice);
952 1 : cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT,
953 : "Received payload data while not reading a message");
954 1 : return 0;
955 : }
956 : /* append the slice to the incoming buffer */
957 7042045 : gpr_slice_buffer_add(&call->incoming_message, slice);
958 7041702 : if (call->incoming_message.length > call->incoming_message_length) {
959 : /* if we got too many bytes, complain */
960 0 : char *message = NULL;
961 0 : gpr_asprintf(
962 : &message, "Receiving message overflow; read %d bytes, expected %d",
963 0 : (int)call->incoming_message.length, (int)call->incoming_message_length);
964 0 : cancel_with_status(call, GRPC_STATUS_INVALID_ARGUMENT, message);
965 0 : gpr_free(message);
966 0 : return 0;
967 7041702 : } else if (call->incoming_message.length == call->incoming_message_length) {
968 3000850 : finish_message(call);
969 3000674 : return 1;
970 : } else {
971 4040852 : return 1;
972 : }
973 : }
974 :
975 5346044 : static void call_on_done_recv(grpc_exec_ctx *exec_ctx, void *pc, int success) {
976 5346044 : grpc_call *call = pc;
977 : grpc_call *child_call;
978 : grpc_call *next_child_call;
979 : size_t i;
980 : GRPC_TIMER_BEGIN(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
981 5346044 : lock(call);
982 5347218 : call->receiving = 0;
983 5347218 : if (success) {
984 19485532 : for (i = 0; success && i < call->recv_ops.nops; i++) {
985 14137496 : grpc_stream_op *op = &call->recv_ops.ops[i];
986 14137496 : switch (op->type) {
987 : case GRPC_NO_OP:
988 0 : break;
989 : case GRPC_OP_METADATA:
990 4120363 : recv_metadata(exec_ctx, call, &op->data.metadata);
991 4121819 : break;
992 : case GRPC_OP_BEGIN_MESSAGE:
993 3001396 : success = begin_message(call, op->data.begin_message);
994 3001308 : break;
995 : case GRPC_OP_SLICE:
996 7042073 : success = add_slice_to_message(call, op->data.slice);
997 7041523 : break;
998 : }
999 : }
1000 5348036 : if (!success) {
1001 39 : grpc_stream_ops_unref_owned_objects(&call->recv_ops.ops[i],
1002 39 : call->recv_ops.nops - i);
1003 : }
1004 5347507 : if (call->recv_state == GRPC_STREAM_RECV_CLOSED) {
1005 1310234 : GPR_ASSERT(call->read_state <= READ_STATE_READ_CLOSED);
1006 1310234 : call->read_state = READ_STATE_READ_CLOSED;
1007 : }
1008 5347507 : if (call->recv_state == GRPC_STREAM_CLOSED) {
1009 2704308 : GPR_ASSERT(call->read_state <= READ_STATE_STREAM_CLOSED);
1010 2704308 : call->read_state = READ_STATE_STREAM_CLOSED;
1011 2704308 : if (call->have_alarm) {
1012 5477 : grpc_alarm_cancel(exec_ctx, &call->alarm);
1013 : }
1014 : /* propagate cancellation to any interested children */
1015 2704309 : child_call = call->first_child;
1016 2704309 : if (child_call != NULL) {
1017 : do {
1018 288 : next_child_call = child_call->sibling_next;
1019 288 : if (child_call->cancellation_is_inherited) {
1020 288 : GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
1021 288 : grpc_call_cancel(child_call, NULL);
1022 288 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel");
1023 : }
1024 288 : child_call = next_child_call;
1025 288 : } while (child_call != call->first_child);
1026 : }
1027 2704309 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "closed");
1028 : }
1029 5348389 : finish_read_ops(call);
1030 : } else {
1031 0 : finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 0);
1032 0 : finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 0);
1033 0 : finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 0);
1034 0 : finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 0);
1035 0 : finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 0);
1036 0 : finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 0);
1037 : }
1038 5343031 : call->recv_ops.nops = 0;
1039 5343031 : unlock(exec_ctx, call);
1040 :
1041 5348325 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "receiving");
1042 : GRPC_TIMER_END(GRPC_PTAG_CALL_ON_DONE_RECV, 0);
1043 5348135 : }
1044 :
1045 4004110 : static int prepare_application_metadata(grpc_call *call, size_t count,
1046 : grpc_metadata *metadata) {
1047 : size_t i;
1048 4005303 : for (i = 0; i < count; i++) {
1049 1193 : grpc_metadata *md = &metadata[i];
1050 1193 : grpc_metadata *next_md = (i == count - 1) ? NULL : &metadata[i + 1];
1051 1193 : grpc_metadata *prev_md = (i == 0) ? NULL : &metadata[i - 1];
1052 1193 : grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
1053 : GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
1054 2386 : l->md = grpc_mdelem_from_string_and_buffer(call->metadata_context, md->key,
1055 1193 : (const gpr_uint8 *)md->value,
1056 : md->value_length);
1057 1193 : if (!grpc_mdstr_is_legal_header(l->md->key)) {
1058 0 : gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s",
1059 0 : grpc_mdstr_as_c_string(l->md->key));
1060 0 : return 0;
1061 2276 : } else if (!grpc_mdstr_is_bin_suffixed(l->md->key) &&
1062 1083 : !grpc_mdstr_is_legal_nonbin_header(l->md->value)) {
1063 0 : gpr_log(GPR_ERROR, "attempt to send invalid metadata value");
1064 0 : return 0;
1065 : }
1066 1193 : l->next = next_md ? (grpc_linked_mdelem *)&next_md->internal_data : NULL;
1067 1193 : l->prev = prev_md ? (grpc_linked_mdelem *)&prev_md->internal_data : NULL;
1068 : }
1069 4004110 : return 1;
1070 : }
1071 :
1072 4003089 : static grpc_mdelem_list chain_metadata_from_app(grpc_call *call, size_t count,
1073 : grpc_metadata *metadata) {
1074 : grpc_mdelem_list out;
1075 4003089 : if (count == 0) {
1076 4002114 : out.head = out.tail = NULL;
1077 4002114 : return out;
1078 : }
1079 975 : out.head = (grpc_linked_mdelem *)&(metadata[0].internal_data);
1080 975 : out.tail = (grpc_linked_mdelem *)&(metadata[count - 1].internal_data);
1081 975 : return out;
1082 : }
1083 :
1084 : /* Copy the contents of a byte buffer into stream ops */
1085 2998777 : static void copy_byte_buffer_to_stream_ops(grpc_byte_buffer *byte_buffer,
1086 : grpc_stream_op_buffer *sopb) {
1087 : size_t i;
1088 :
1089 2998777 : switch (byte_buffer->type) {
1090 : case GRPC_BB_RAW:
1091 6000033 : for (i = 0; i < byte_buffer->data.raw.slice_buffer.count; i++) {
1092 3000640 : gpr_slice slice = byte_buffer->data.raw.slice_buffer.slices[i];
1093 3000640 : gpr_slice_ref(slice);
1094 3000878 : grpc_sopb_add_slice(sopb, slice);
1095 : }
1096 2999393 : break;
1097 : }
1098 2998009 : }
1099 :
1100 26101760 : static int fill_send_ops(grpc_call *call, grpc_transport_stream_op *op) {
1101 : grpc_ioreq_data data;
1102 : gpr_uint32 flags;
1103 : grpc_metadata_batch mdb;
1104 : size_t i;
1105 26101760 : GPR_ASSERT(op->send_ops == NULL);
1106 :
1107 26101760 : switch (call->write_state) {
1108 : case WRITE_STATE_INITIAL:
1109 9165149 : if (!is_op_live(call, GRPC_IOREQ_SEND_INITIAL_METADATA)) {
1110 6473368 : break;
1111 : }
1112 2703878 : data = call->request_data[GRPC_IOREQ_SEND_INITIAL_METADATA];
1113 2703878 : mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
1114 : data.send_metadata.metadata);
1115 2701874 : mdb.garbage.head = mdb.garbage.tail = NULL;
1116 2701874 : mdb.deadline = call->send_deadline;
1117 5507499 : for (i = 0; i < call->send_initial_metadata_count; i++) {
1118 2804632 : grpc_metadata_batch_link_head(&mdb, &call->send_initial_metadata[i]);
1119 : }
1120 2702867 : grpc_sopb_add_metadata(&call->send_ops, mdb);
1121 2702553 : op->send_ops = &call->send_ops;
1122 2702553 : call->last_send_contains |= 1 << GRPC_IOREQ_SEND_INITIAL_METADATA;
1123 2702553 : call->send_initial_metadata_count = 0;
1124 : /* fall through intended */
1125 : case WRITE_STATE_STARTED:
1126 4704632 : if (is_op_live(call, GRPC_IOREQ_SEND_MESSAGE)) {
1127 : size_t length;
1128 2999588 : data = call->request_data[GRPC_IOREQ_SEND_MESSAGE];
1129 2999588 : flags = call->request_flags[GRPC_IOREQ_SEND_MESSAGE];
1130 2999588 : length = grpc_byte_buffer_length(data.send_message);
1131 2999233 : GPR_ASSERT(length <= GPR_UINT32_MAX);
1132 2999233 : grpc_sopb_add_begin_message(&call->send_ops, (gpr_uint32)length, flags);
1133 2999009 : copy_byte_buffer_to_stream_ops(data.send_message, &call->send_ops);
1134 2999249 : op->send_ops = &call->send_ops;
1135 2999249 : call->last_send_contains |= 1 << GRPC_IOREQ_SEND_MESSAGE;
1136 : }
1137 4715860 : if (is_op_live(call, GRPC_IOREQ_SEND_CLOSE)) {
1138 2702048 : op->is_last_send = 1;
1139 2702048 : op->send_ops = &call->send_ops;
1140 2702048 : call->last_send_contains |= 1 << GRPC_IOREQ_SEND_CLOSE;
1141 2702048 : if (!call->is_client) {
1142 : /* send trailing metadata */
1143 1301350 : data = call->request_data[GRPC_IOREQ_SEND_TRAILING_METADATA];
1144 1301350 : mdb.list = chain_metadata_from_app(call, data.send_metadata.count,
1145 : data.send_metadata.metadata);
1146 1301362 : mdb.garbage.head = mdb.garbage.tail = NULL;
1147 1301362 : mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
1148 : /* send status */
1149 : /* TODO(ctiller): cache common status values */
1150 1301558 : data = call->request_data[GRPC_IOREQ_SEND_STATUS];
1151 1301558 : grpc_metadata_batch_add_tail(
1152 : &mdb, &call->status_link,
1153 : grpc_channel_get_reffed_status_elem(call->channel,
1154 1301558 : data.send_status.code));
1155 1301420 : if (data.send_status.details) {
1156 2423 : grpc_metadata_batch_add_tail(
1157 : &mdb, &call->details_link,
1158 : grpc_mdelem_from_metadata_strings(
1159 : call->metadata_context,
1160 : GRPC_MDSTR_REF(
1161 : grpc_channel_get_message_string(call->channel)),
1162 : data.send_status.details));
1163 2423 : call->request_data[GRPC_IOREQ_SEND_STATUS].send_status.details =
1164 : NULL;
1165 : }
1166 1301420 : grpc_sopb_add_metadata(&call->send_ops, mdb);
1167 : }
1168 : }
1169 4717555 : break;
1170 : case WRITE_STATE_WRITE_CLOSED:
1171 15031928 : break;
1172 : }
1173 26125455 : if (op->send_ops) {
1174 3006735 : op->on_done_send = &call->on_done_send;
1175 : }
1176 26125455 : return op->send_ops != NULL;
1177 : }
1178 :
1179 0 : static grpc_call_error start_ioreq_error(grpc_call *call,
1180 : gpr_uint32 mutated_ops,
1181 : grpc_call_error ret) {
1182 : size_t i;
1183 0 : for (i = 0; i < GRPC_IOREQ_OP_COUNT; i++) {
1184 0 : if (mutated_ops & (1u << i)) {
1185 0 : call->request_set[i] = REQSET_EMPTY;
1186 : }
1187 : }
1188 0 : return ret;
1189 : }
1190 :
1191 12257604 : static void finish_read_ops(grpc_call *call) {
1192 : int empty;
1193 :
1194 12257604 : if (is_op_live(call, GRPC_IOREQ_RECV_MESSAGE)) {
1195 6694528 : empty =
1196 13389370 : (NULL == (*call->request_data[GRPC_IOREQ_RECV_MESSAGE].recv_message =
1197 6694842 : grpc_bbq_pop(&call->incoming_queue)));
1198 6694528 : if (!empty) {
1199 2997897 : finish_live_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1);
1200 2998466 : empty = grpc_bbq_empty(&call->incoming_queue);
1201 : }
1202 : } else {
1203 5592939 : empty = grpc_bbq_empty(&call->incoming_queue);
1204 : }
1205 :
1206 12270398 : switch (call->read_state) {
1207 : case READ_STATE_STREAM_CLOSED:
1208 2707484 : if (empty && !call->have_alarm) {
1209 2702042 : finish_ioreq_op(call, GRPC_IOREQ_RECV_CLOSE, 1);
1210 : }
1211 : /* fallthrough */
1212 : case READ_STATE_READ_CLOSED:
1213 6882575 : if (empty) {
1214 6607231 : finish_ioreq_op(call, GRPC_IOREQ_RECV_MESSAGE, 1);
1215 : }
1216 6883651 : finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS, 1);
1217 6879849 : finish_ioreq_op(call, GRPC_IOREQ_RECV_STATUS_DETAILS, 1);
1218 6876468 : finish_ioreq_op(call, GRPC_IOREQ_RECV_TRAILING_METADATA, 1);
1219 : /* fallthrough */
1220 : case READ_STATE_GOT_INITIAL_METADATA:
1221 8706685 : finish_ioreq_op(call, GRPC_IOREQ_RECV_INITIAL_METADATA, 1);
1222 : /* fallthrough */
1223 : case READ_STATE_INITIAL:
1224 : /* do nothing */
1225 12267534 : break;
1226 : }
1227 12267534 : }
1228 :
1229 6940053 : static grpc_call_error start_ioreq(grpc_call *call, const grpc_ioreq *reqs,
1230 : size_t nreqs,
1231 : grpc_ioreq_completion_func completion,
1232 : void *user_data) {
1233 : size_t i;
1234 6940053 : gpr_uint16 have_ops = 0;
1235 : grpc_ioreq_op op;
1236 : reqinfo_master *master;
1237 : grpc_ioreq_data data;
1238 : gpr_uint8 set;
1239 :
1240 6940053 : if (nreqs == 0) {
1241 0 : return GRPC_CALL_OK;
1242 : }
1243 :
1244 6940053 : set = reqs[0].op;
1245 :
1246 31803378 : for (i = 0; i < nreqs; i++) {
1247 24867596 : op = reqs[i].op;
1248 24867596 : if (call->request_set[op] < GRPC_IOREQ_OP_COUNT) {
1249 0 : return start_ioreq_error(call, have_ops,
1250 : GRPC_CALL_ERROR_TOO_MANY_OPERATIONS);
1251 24869627 : } else if (call->request_set[op] == REQSET_DONE) {
1252 0 : return start_ioreq_error(call, have_ops, GRPC_CALL_ERROR_ALREADY_INVOKED);
1253 : }
1254 24869627 : data = reqs[i].data;
1255 24869627 : if (op == GRPC_IOREQ_SEND_INITIAL_METADATA ||
1256 : op == GRPC_IOREQ_SEND_TRAILING_METADATA) {
1257 4011339 : if (!prepare_application_metadata(call, data.send_metadata.count,
1258 : data.send_metadata.metadata)) {
1259 0 : return start_ioreq_error(call, have_ops,
1260 : GRPC_CALL_ERROR_INVALID_METADATA);
1261 : }
1262 : }
1263 24863282 : if (op == GRPC_IOREQ_SEND_STATUS) {
1264 1301389 : set_status_code(call, STATUS_FROM_SERVER_STATUS,
1265 1301389 : (gpr_uint32)reqs[i].data.send_status.code);
1266 1301432 : if (reqs[i].data.send_status.details) {
1267 2431 : set_status_details(call, STATUS_FROM_SERVER_STATUS,
1268 2431 : GRPC_MDSTR_REF(reqs[i].data.send_status.details));
1269 : }
1270 : }
1271 : /* NB: the following integer arithmetic operation needs to be in its
1272 : * expanded form due to the "integral promotion" performed (see section
1273 : * 3.2.1.1 of the C89 draft standard). A cast to the smaller container type
1274 : * is then required to avoid the compiler warning */
1275 24863325 : have_ops = (gpr_uint16)(have_ops | (1u << op));
1276 :
1277 24863325 : call->request_data[op] = data;
1278 24863325 : call->request_flags[op] = reqs[i].flags;
1279 24863325 : call->request_set[op] = set;
1280 : }
1281 :
1282 6935782 : master = &call->masters[set];
1283 6935782 : master->success = 1;
1284 6935782 : master->need_mask = have_ops;
1285 6935782 : master->complete_mask = 0;
1286 6935782 : master->on_complete = completion;
1287 6935782 : master->user_data = user_data;
1288 :
1289 6935782 : finish_read_ops(call);
1290 6938428 : early_out_write_ops(call);
1291 :
1292 6938777 : return GRPC_CALL_OK;
1293 : }
1294 :
1295 6940451 : grpc_call_error grpc_call_start_ioreq_and_call_back(
1296 : grpc_exec_ctx *exec_ctx, grpc_call *call, const grpc_ioreq *reqs,
1297 : size_t nreqs, grpc_ioreq_completion_func on_complete, void *user_data) {
1298 : grpc_call_error err;
1299 6940451 : lock(call);
1300 6945719 : err = start_ioreq(call, reqs, nreqs, on_complete, user_data);
1301 6939074 : unlock(exec_ctx, call);
1302 6940686 : return err;
1303 : }
1304 :
1305 2703957 : void grpc_call_destroy(grpc_call *c) {
1306 : int cancel;
1307 2703957 : grpc_call *parent = c->parent;
1308 2703957 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1309 :
1310 2703957 : GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c));
1311 :
1312 2704334 : if (parent) {
1313 501 : gpr_mu_lock(&parent->mu);
1314 502 : if (c == parent->first_child) {
1315 502 : parent->first_child = c->sibling_next;
1316 502 : if (c == parent->first_child) {
1317 502 : parent->first_child = NULL;
1318 : }
1319 502 : c->sibling_prev->sibling_next = c->sibling_next;
1320 502 : c->sibling_next->sibling_prev = c->sibling_prev;
1321 : }
1322 502 : gpr_mu_unlock(&parent->mu);
1323 502 : GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child");
1324 : }
1325 :
1326 2704335 : lock(c);
1327 2704762 : GPR_ASSERT(!c->destroy_called);
1328 2704762 : c->destroy_called = 1;
1329 2704762 : if (c->have_alarm) {
1330 221 : grpc_alarm_cancel(&exec_ctx, &c->alarm);
1331 : }
1332 2704762 : cancel = c->read_state != READ_STATE_STREAM_CLOSED;
1333 2704762 : unlock(&exec_ctx, c);
1334 2704986 : if (cancel) grpc_call_cancel(c, NULL);
1335 2704983 : GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
1336 2704492 : grpc_exec_ctx_finish(&exec_ctx);
1337 2704672 : }
1338 :
1339 278732 : grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
1340 278732 : GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
1341 278732 : GPR_ASSERT(!reserved);
1342 278732 : return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled",
1343 : NULL);
1344 : }
1345 :
1346 278733 : grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
1347 : grpc_status_code status,
1348 : const char *description,
1349 : void *reserved) {
1350 : grpc_call_error r;
1351 278733 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1352 278733 : GRPC_API_TRACE(
1353 : "grpc_call_cancel_with_status("
1354 : "c=%p, status=%d, description=%s, reserved=%p)",
1355 : 4, (c, (int)status, description, reserved));
1356 278733 : GPR_ASSERT(reserved == NULL);
1357 278733 : lock(c);
1358 278736 : r = cancel_with_status(c, status, description);
1359 278735 : unlock(&exec_ctx, c);
1360 278737 : grpc_exec_ctx_finish(&exec_ctx);
1361 278738 : return r;
1362 : }
1363 :
1364 279118 : static grpc_call_error cancel_with_status(grpc_call *c, grpc_status_code status,
1365 : const char *description) {
1366 279119 : grpc_mdstr *details =
1367 279118 : description ? grpc_mdstr_from_string(c->metadata_context, description)
1368 558236 : : NULL;
1369 :
1370 279119 : GPR_ASSERT(status != GRPC_STATUS_OK);
1371 :
1372 279119 : set_status_code(c, STATUS_FROM_API_OVERRIDE, (gpr_uint32)status);
1373 279120 : set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
1374 :
1375 279117 : c->cancel_with_status = status;
1376 :
1377 279117 : return GRPC_CALL_OK;
1378 : }
1379 :
1380 2982132 : static void finished_loose_op(grpc_exec_ctx *exec_ctx, void *call,
1381 : int success_ignored) {
1382 2982132 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "loose-op");
1383 2983683 : }
1384 :
1385 : typedef struct {
1386 : grpc_call *call;
1387 : grpc_closure closure;
1388 : } finished_loose_op_allocated_args;
1389 :
1390 278955 : static void finished_loose_op_allocated(grpc_exec_ctx *exec_ctx, void *alloc,
1391 : int success) {
1392 278955 : finished_loose_op_allocated_args *args = alloc;
1393 278955 : finished_loose_op(exec_ctx, args->call, success);
1394 278956 : gpr_free(args);
1395 278956 : }
1396 :
1397 8268023 : static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
1398 : grpc_transport_stream_op *op) {
1399 : grpc_call_element *elem;
1400 :
1401 8268023 : GPR_ASSERT(op->on_consumed == NULL);
1402 8268023 : if (op->cancel_with_status != GRPC_STATUS_OK || op->bind_pollset) {
1403 2982030 : GRPC_CALL_INTERNAL_REF(call, "loose-op");
1404 2983997 : if (op->bind_pollset) {
1405 2705041 : op->on_consumed = &call->on_done_bind;
1406 : } else {
1407 278956 : finished_loose_op_allocated_args *args = gpr_malloc(sizeof(*args));
1408 278956 : args->call = call;
1409 278956 : grpc_closure_init(&args->closure, finished_loose_op_allocated, args);
1410 278956 : op->on_consumed = &args->closure;
1411 : }
1412 : }
1413 :
1414 8269990 : elem = CALL_ELEM_FROM_CALL(call, 0);
1415 8261110 : op->context = call->context;
1416 8261110 : elem->filter->start_transport_stream_op(exec_ctx, elem, op);
1417 8260953 : }
1418 :
1419 1434 : char *grpc_call_get_peer(grpc_call *call) {
1420 1434 : grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
1421 1434 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1422 1434 : char *result = elem->filter->get_peer(&exec_ctx, elem);
1423 1434 : GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
1424 1434 : grpc_exec_ctx_finish(&exec_ctx);
1425 1434 : return result;
1426 : }
1427 :
1428 1302018 : grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
1429 1302018 : return CALL_FROM_TOP_ELEM(elem);
1430 : }
1431 :
1432 5858 : static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, int success) {
1433 5858 : grpc_call *call = arg;
1434 5858 : lock(call);
1435 5858 : call->have_alarm = 0;
1436 5858 : if (success) {
1437 343 : cancel_with_status(call, GRPC_STATUS_DEADLINE_EXCEEDED,
1438 : "Deadline Exceeded");
1439 : }
1440 5858 : finish_read_ops(call);
1441 5858 : unlock(exec_ctx, call);
1442 5858 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm");
1443 5858 : }
1444 :
1445 5858 : static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
1446 : gpr_timespec deadline) {
1447 5858 : if (call->have_alarm) {
1448 0 : gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
1449 0 : assert(0);
1450 : return;
1451 : }
1452 5858 : GRPC_CALL_INTERNAL_REF(call, "alarm");
1453 5858 : call->have_alarm = 1;
1454 5858 : call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
1455 5858 : grpc_alarm_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call,
1456 : gpr_now(GPR_CLOCK_MONOTONIC));
1457 5858 : }
1458 :
1459 : /* we offset status by a small amount when storing it into transport metadata
1460 : as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
1461 : */
1462 : #define STATUS_OFFSET 1
1463 2451 : static void destroy_status(void *ignored) {}
1464 :
1465 1417621 : static gpr_uint32 decode_status(grpc_mdelem *md) {
1466 : gpr_uint32 status;
1467 1417621 : void *user_data = grpc_mdelem_get_user_data(md, destroy_status);
1468 1417796 : if (user_data) {
1469 1415345 : status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
1470 : } else {
1471 4902 : if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
1472 4902 : GPR_SLICE_LENGTH(md->value->slice),
1473 : &status)) {
1474 0 : status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
1475 : }
1476 2451 : grpc_mdelem_set_user_data(md, destroy_status,
1477 2451 : (void *)(gpr_intptr)(status + STATUS_OFFSET));
1478 : }
1479 1417796 : return status;
1480 : }
1481 :
1482 : /* just as for status above, we need to offset: metadata userdata can't hold a
1483 : * zero (null), which in this case is used to signal no compression */
1484 : #define COMPRESS_OFFSET 1
1485 2953 : static void destroy_compression(void *ignored) {}
1486 :
1487 2703026 : static gpr_uint32 decode_compression(grpc_mdelem *md) {
1488 : grpc_compression_algorithm algorithm;
1489 2703026 : void *user_data = grpc_mdelem_get_user_data(md, destroy_compression);
1490 2703467 : if (user_data) {
1491 5401028 : algorithm =
1492 5401028 : ((grpc_compression_algorithm)(gpr_intptr)user_data) - COMPRESS_OFFSET;
1493 : } else {
1494 2953 : const char *md_c_str = grpc_mdstr_as_c_string(md->value);
1495 2953 : if (!grpc_compression_algorithm_parse(md_c_str, strlen(md_c_str),
1496 : &algorithm)) {
1497 0 : gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
1498 0 : assert(0);
1499 : }
1500 2953 : grpc_mdelem_set_user_data(
1501 : md, destroy_compression,
1502 2953 : (void *)(gpr_intptr)(algorithm + COMPRESS_OFFSET));
1503 : }
1504 2703631 : return algorithm;
1505 : }
1506 :
1507 4112395 : static void recv_metadata(grpc_exec_ctx *exec_ctx, grpc_call *call,
1508 : grpc_metadata_batch *md) {
1509 : grpc_linked_mdelem *l;
1510 : grpc_metadata_array *dest;
1511 : grpc_metadata *mdusr;
1512 : int is_trailing;
1513 4112395 : grpc_mdctx *mdctx = call->metadata_context;
1514 :
1515 4112395 : is_trailing = call->read_state >= READ_STATE_GOT_INITIAL_METADATA;
1516 12329996 : for (l = md->list.head; l != NULL; l = l->next) {
1517 8210729 : grpc_mdelem *mdel = l->md;
1518 8210729 : grpc_mdstr *key = mdel->key;
1519 8210729 : if (key == grpc_channel_get_status_string(call->channel)) {
1520 1417622 : set_status_code(call, STATUS_FROM_WIRE, decode_status(mdel));
1521 6804424 : } else if (key == grpc_channel_get_message_string(call->channel)) {
1522 103743 : set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(mdel->value));
1523 6706908 : } else if (key ==
1524 6700282 : grpc_channel_get_compression_algorithm_string(call->channel)) {
1525 2703415 : set_compression_algorithm(call, decode_compression(mdel));
1526 4003493 : } else if (key == grpc_channel_get_encodings_accepted_by_peer_string(
1527 : call->channel)) {
1528 2703562 : set_encodings_accepted_by_peer(call, mdel->value->slice);
1529 : } else {
1530 1302235 : dest = &call->buffered_metadata[is_trailing];
1531 1302235 : if (dest->count == dest->capacity) {
1532 1301890 : dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
1533 1301911 : dest->metadata =
1534 1301890 : gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
1535 : }
1536 1302256 : mdusr = &dest->metadata[dest->count++];
1537 1302256 : mdusr->key = grpc_mdstr_as_c_string(mdel->key);
1538 1302247 : mdusr->value = grpc_mdstr_as_c_string(mdel->value);
1539 1302317 : mdusr->value_length = GPR_SLICE_LENGTH(mdel->value->slice);
1540 1302317 : if (call->owned_metadata_count == call->owned_metadata_capacity) {
1541 1301840 : call->owned_metadata_capacity =
1542 1301840 : GPR_MAX(call->owned_metadata_capacity + 8,
1543 : call->owned_metadata_capacity * 2);
1544 1301736 : call->owned_metadata =
1545 1301840 : gpr_realloc(call->owned_metadata,
1546 1301840 : sizeof(grpc_mdelem *) * call->owned_metadata_capacity);
1547 : }
1548 1302213 : call->owned_metadata[call->owned_metadata_count++] = mdel;
1549 1302213 : l->md = NULL;
1550 : }
1551 : }
1552 4119267 : if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
1553 5182 : 0 &&
1554 5182 : !call->is_client) {
1555 2698 : set_deadline_alarm(exec_ctx, call, md->deadline);
1556 : }
1557 4122234 : if (!is_trailing) {
1558 2705113 : call->read_state = READ_STATE_GOT_INITIAL_METADATA;
1559 : }
1560 :
1561 4122234 : grpc_mdctx_lock(mdctx);
1562 12349591 : for (l = md->list.head; l; l = l->next) {
1563 8227511 : if (l->md) GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md);
1564 : }
1565 14730288 : for (l = md->garbage.head; l; l = l->next) {
1566 10609232 : GRPC_MDCTX_LOCKED_MDELEM_UNREF(mdctx, l->md);
1567 : }
1568 4121056 : grpc_mdctx_unlock(mdctx);
1569 4121957 : }
1570 :
1571 1301861 : grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
1572 1301861 : return CALL_STACK_FROM_CALL(call);
1573 : }
1574 :
1575 : /*
1576 : * BATCH API IMPLEMENTATION
1577 : */
1578 :
1579 1402773 : static void set_status_value_directly(grpc_status_code status, void *dest) {
1580 1402773 : *(grpc_status_code *)dest = status;
1581 1402773 : }
1582 :
1583 1301646 : static void set_cancelled_value(grpc_status_code status, void *dest) {
1584 1301646 : *(grpc_status_code *)dest = (status != GRPC_STATUS_OK);
1585 1301646 : }
1586 :
1587 2941877 : static void finish_batch(grpc_exec_ctx *exec_ctx, grpc_call *call, int success,
1588 : void *tag) {
1589 2941877 : grpc_cq_end_op(exec_ctx, call->cq, tag, success, done_completion, call,
1590 : allocate_completion(call));
1591 2942097 : }
1592 :
1593 2704804 : static void finish_batch_with_close(grpc_exec_ctx *exec_ctx, grpc_call *call,
1594 : int success, void *tag) {
1595 2704804 : grpc_cq_end_op(exec_ctx, call->cq, tag, 1, done_completion, call,
1596 : allocate_completion(call));
1597 2703767 : }
1598 :
1599 3000850 : static int are_write_flags_valid(gpr_uint32 flags) {
1600 : /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
1601 3000850 : const gpr_uint32 allowed_write_positions =
1602 : (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
1603 3000850 : const gpr_uint32 invalid_positions = ~allowed_write_positions;
1604 3000850 : return !(flags & invalid_positions);
1605 : }
1606 :
1607 5636116 : grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
1608 : size_t nops, void *tag, void *reserved) {
1609 : grpc_ioreq reqs[GRPC_IOREQ_OP_COUNT];
1610 : size_t in;
1611 : size_t out;
1612 : const grpc_op *op;
1613 : grpc_ioreq *req;
1614 5636116 : void (*finish_func)(grpc_exec_ctx *, grpc_call *, int, void *) = finish_batch;
1615 : grpc_call_error error;
1616 5636116 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1617 :
1618 5636116 : GRPC_API_TRACE(
1619 : "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)",
1620 : 5, (call, ops, (unsigned long)nops, tag, reserved));
1621 :
1622 5638652 : if (reserved != NULL) {
1623 0 : error = GRPC_CALL_ERROR;
1624 0 : goto done;
1625 : }
1626 :
1627 5638652 : GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, tag);
1628 :
1629 5626296 : if (nops == 0) {
1630 23 : grpc_cq_begin_op(call->cq);
1631 23 : GRPC_CALL_INTERNAL_REF(call, "completion");
1632 23 : grpc_cq_end_op(&exec_ctx, call->cq, tag, 1, done_completion, call,
1633 : allocate_completion(call));
1634 23 : error = GRPC_CALL_OK;
1635 23 : goto done;
1636 : }
1637 :
1638 : /* rewrite batch ops into ioreq ops */
1639 19807898 : for (in = 0, out = 0; in < nops; in++) {
1640 14169142 : op = &ops[in];
1641 14169142 : if (op->reserved != NULL) {
1642 0 : error = GRPC_CALL_ERROR;
1643 0 : goto done;
1644 : }
1645 14182430 : switch (op->op) {
1646 : case GRPC_OP_SEND_INITIAL_METADATA:
1647 : /* Flag validation: currently allow no flags */
1648 2702572 : if (op->flags != 0) {
1649 20 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1650 20 : goto done;
1651 : }
1652 2702552 : req = &reqs[out++];
1653 2702552 : if (out > GRPC_IOREQ_OP_COUNT) {
1654 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1655 0 : goto done;
1656 : }
1657 2702552 : req->op = GRPC_IOREQ_SEND_INITIAL_METADATA;
1658 2702552 : req->data.send_metadata.count = op->data.send_initial_metadata.count;
1659 2702552 : req->data.send_metadata.metadata =
1660 2702552 : op->data.send_initial_metadata.metadata;
1661 2702552 : req->flags = op->flags;
1662 2702552 : break;
1663 : case GRPC_OP_SEND_MESSAGE:
1664 3000562 : if (!are_write_flags_valid(op->flags)) {
1665 20 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1666 20 : goto done;
1667 : }
1668 2999917 : if (op->data.send_message == NULL) {
1669 0 : error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1670 0 : goto done;
1671 : }
1672 2999917 : req = &reqs[out++];
1673 2999917 : if (out > GRPC_IOREQ_OP_COUNT) {
1674 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1675 0 : goto done;
1676 : }
1677 2999917 : req->op = GRPC_IOREQ_SEND_MESSAGE;
1678 2999917 : req->data.send_message = op->data.send_message;
1679 2999917 : req->flags = op->flags;
1680 2999917 : break;
1681 : case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1682 : /* Flag validation: currently allow no flags */
1683 1402671 : if (op->flags != 0) {
1684 20 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1685 20 : goto done;
1686 : }
1687 1402651 : if (!call->is_client) {
1688 0 : error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1689 0 : goto done;
1690 : }
1691 1402651 : req = &reqs[out++];
1692 1402651 : if (out > GRPC_IOREQ_OP_COUNT) {
1693 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1694 0 : goto done;
1695 : }
1696 1402651 : req->op = GRPC_IOREQ_SEND_CLOSE;
1697 1402651 : req->flags = op->flags;
1698 1402651 : break;
1699 : case GRPC_OP_SEND_STATUS_FROM_SERVER:
1700 : /* Flag validation: currently allow no flags */
1701 1301350 : if (op->flags != 0) {
1702 0 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1703 0 : goto done;
1704 : }
1705 1301350 : if (call->is_client) {
1706 0 : error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1707 0 : goto done;
1708 : }
1709 1301350 : req = &reqs[out++];
1710 1301350 : if (out > GRPC_IOREQ_OP_COUNT) {
1711 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1712 0 : goto done;
1713 : }
1714 1301350 : req->op = GRPC_IOREQ_SEND_TRAILING_METADATA;
1715 1301350 : req->flags = op->flags;
1716 1301350 : req->data.send_metadata.count =
1717 1301350 : op->data.send_status_from_server.trailing_metadata_count;
1718 1301350 : req->data.send_metadata.metadata =
1719 1301350 : op->data.send_status_from_server.trailing_metadata;
1720 1301350 : req = &reqs[out++];
1721 1301350 : if (out > GRPC_IOREQ_OP_COUNT) {
1722 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1723 0 : goto done;
1724 : }
1725 1301350 : req->op = GRPC_IOREQ_SEND_STATUS;
1726 1301350 : req->data.send_status.code = op->data.send_status_from_server.status;
1727 1301270 : req->data.send_status.details =
1728 1301350 : op->data.send_status_from_server.status_details != NULL
1729 2431 : ? grpc_mdstr_from_string(
1730 : call->metadata_context,
1731 : op->data.send_status_from_server.status_details)
1732 1303781 : : NULL;
1733 1301270 : req = &reqs[out++];
1734 1301270 : if (out > GRPC_IOREQ_OP_COUNT) {
1735 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1736 0 : goto done;
1737 : }
1738 1301270 : req->op = GRPC_IOREQ_SEND_CLOSE;
1739 1301270 : break;
1740 : case GRPC_OP_RECV_INITIAL_METADATA:
1741 : /* Flag validation: currently allow no flags */
1742 1402567 : if (op->flags != 0) {
1743 20 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1744 20 : goto done;
1745 : }
1746 1402547 : if (!call->is_client) {
1747 0 : error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1748 0 : goto done;
1749 : }
1750 1402547 : req = &reqs[out++];
1751 1402547 : if (out > GRPC_IOREQ_OP_COUNT) {
1752 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1753 0 : goto done;
1754 : }
1755 1402547 : req->op = GRPC_IOREQ_RECV_INITIAL_METADATA;
1756 1402547 : req->data.recv_metadata = op->data.recv_initial_metadata;
1757 1402547 : req->data.recv_metadata->count = 0;
1758 1402547 : req->flags = op->flags;
1759 1402547 : break;
1760 : case GRPC_OP_RECV_MESSAGE:
1761 : /* Flag validation: currently allow no flags */
1762 1702865 : if (op->flags != 0) {
1763 0 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1764 0 : goto done;
1765 : }
1766 1702865 : req = &reqs[out++];
1767 1702865 : if (out > GRPC_IOREQ_OP_COUNT) {
1768 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1769 0 : goto done;
1770 : }
1771 1702865 : req->op = GRPC_IOREQ_RECV_MESSAGE;
1772 1702865 : req->data.recv_message = op->data.recv_message;
1773 1702865 : req->flags = op->flags;
1774 1702865 : break;
1775 : case GRPC_OP_RECV_STATUS_ON_CLIENT:
1776 : /* Flag validation: currently allow no flags */
1777 1402244 : if (op->flags != 0) {
1778 20 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1779 20 : goto done;
1780 : }
1781 1402224 : if (!call->is_client) {
1782 0 : error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1783 0 : goto done;
1784 : }
1785 1402224 : req = &reqs[out++];
1786 1402224 : if (out > GRPC_IOREQ_OP_COUNT) {
1787 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1788 0 : goto done;
1789 : }
1790 1402224 : req->op = GRPC_IOREQ_RECV_STATUS;
1791 1402224 : req->flags = op->flags;
1792 1402224 : req->data.recv_status.set_value = set_status_value_directly;
1793 1402224 : req->data.recv_status.user_data = op->data.recv_status_on_client.status;
1794 1402224 : req = &reqs[out++];
1795 1402224 : if (out > GRPC_IOREQ_OP_COUNT) {
1796 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1797 0 : goto done;
1798 : }
1799 1402224 : req->op = GRPC_IOREQ_RECV_STATUS_DETAILS;
1800 1402224 : req->data.recv_status_details.details =
1801 1402224 : op->data.recv_status_on_client.status_details;
1802 1402224 : req->data.recv_status_details.details_capacity =
1803 1402224 : op->data.recv_status_on_client.status_details_capacity;
1804 1402224 : req = &reqs[out++];
1805 1402224 : if (out > GRPC_IOREQ_OP_COUNT) {
1806 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1807 0 : goto done;
1808 : }
1809 1402224 : req->op = GRPC_IOREQ_RECV_TRAILING_METADATA;
1810 1402224 : req->data.recv_metadata =
1811 1402224 : op->data.recv_status_on_client.trailing_metadata;
1812 1402224 : req->data.recv_metadata->count = 0;
1813 1402224 : req = &reqs[out++];
1814 1402224 : if (out > GRPC_IOREQ_OP_COUNT) {
1815 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1816 0 : goto done;
1817 : }
1818 1402224 : req->op = GRPC_IOREQ_RECV_CLOSE;
1819 1402224 : finish_func = finish_batch_with_close;
1820 1402224 : break;
1821 : case GRPC_OP_RECV_CLOSE_ON_SERVER:
1822 : /* Flag validation: currently allow no flags */
1823 1301306 : if (op->flags != 0) {
1824 0 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1825 0 : goto done;
1826 : }
1827 1301306 : req = &reqs[out++];
1828 1301306 : if (out > GRPC_IOREQ_OP_COUNT) {
1829 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1830 0 : goto done;
1831 : }
1832 1301306 : req->op = GRPC_IOREQ_RECV_STATUS;
1833 1301306 : req->flags = op->flags;
1834 1301306 : req->data.recv_status.set_value = set_cancelled_value;
1835 1301306 : req->data.recv_status.user_data =
1836 1301306 : op->data.recv_close_on_server.cancelled;
1837 1301306 : req = &reqs[out++];
1838 1301306 : if (out > GRPC_IOREQ_OP_COUNT) {
1839 0 : error = GRPC_CALL_ERROR_BATCH_TOO_BIG;
1840 0 : goto done;
1841 : }
1842 1301306 : req->op = GRPC_IOREQ_RECV_CLOSE;
1843 1301306 : finish_func = finish_batch_with_close;
1844 1301306 : break;
1845 : }
1846 : }
1847 :
1848 5638756 : GRPC_CALL_INTERNAL_REF(call, "completion");
1849 5643282 : grpc_cq_begin_op(call->cq);
1850 :
1851 5645403 : error = grpc_call_start_ioreq_and_call_back(&exec_ctx, call, reqs, out,
1852 : finish_func, tag);
1853 : done:
1854 5629918 : grpc_exec_ctx_finish(&exec_ctx);
1855 5643613 : return error;
1856 : }
1857 :
1858 530 : void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
1859 : void *value, void (*destroy)(void *value)) {
1860 530 : if (call->context[elem].destroy) {
1861 0 : call->context[elem].destroy(call->context[elem].value);
1862 : }
1863 530 : call->context[elem].value = value;
1864 530 : call->context[elem].destroy = destroy;
1865 530 : }
1866 :
1867 1299171 : void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) {
1868 1299171 : return call->context[elem].value;
1869 : }
1870 :
1871 325 : gpr_uint8 grpc_call_is_client(grpc_call *call) { return call->is_client; }
|