Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 : #include <assert.h>
34 : #include <limits.h>
35 : #include <stdio.h>
36 : #include <stdlib.h>
37 : #include <string.h>
38 :
39 : #include <grpc/compression.h>
40 : #include <grpc/support/alloc.h>
41 : #include <grpc/support/log.h>
42 : #include <grpc/support/string_util.h>
43 : #include <grpc/support/useful.h>
44 :
45 : #include "src/core/channel/channel_stack.h"
46 : #include "src/core/compression/algorithm_metadata.h"
47 : #include "src/core/iomgr/timer.h"
48 : #include "src/core/profiling/timers.h"
49 : #include "src/core/support/string.h"
50 : #include "src/core/surface/api_trace.h"
51 : #include "src/core/surface/call.h"
52 : #include "src/core/surface/channel.h"
53 : #include "src/core/surface/completion_queue.h"
54 : #include "src/core/transport/static_metadata.h"
55 :
56 : /** The maximum number of concurrent batches possible.
57 : Based upon the maximum number of individually queueable ops in the batch
58 : api:
59 : - initial metadata send
60 : - message send
61 : - status/close send (depending on client/server)
62 : - initial metadata recv
63 : - message recv
64 : - status/close recv (depending on client/server) */
65 : #define MAX_CONCURRENT_BATCHES 6
66 :
67 : typedef struct {
68 : grpc_ioreq_completion_func on_complete;
69 : void *user_data;
70 : int success;
71 : } completed_request;
72 :
73 : #define MAX_SEND_EXTRA_METADATA_COUNT 3
74 :
75 : /* Status data for a request can come from several sources; this
76 : enumerates them all, and acts as a priority sorting for which
77 : status to return to the application - earlier entries override
78 : later ones */
79 : typedef enum {
80 : /* Status came from the application layer overriding whatever
81 : the wire says */
82 : STATUS_FROM_API_OVERRIDE = 0,
83 : /* Status was created by some internal channel stack operation */
84 : STATUS_FROM_CORE,
85 : /* Status came from 'the wire' - or somewhere below the surface
86 : layer */
87 : STATUS_FROM_WIRE,
88 : /* Status came from the server sending status */
89 : STATUS_FROM_SERVER_STATUS,
90 : STATUS_SOURCE_COUNT
91 : } status_source;
92 :
93 : typedef struct {
94 : gpr_uint8 is_set;
95 : grpc_status_code code;
96 : grpc_mdstr *details;
97 : } received_status;
98 :
99 : /* How far through the GRPC stream have we read? */
100 : typedef enum {
101 : /* We are still waiting for initial metadata to complete */
102 : READ_STATE_INITIAL = 0,
103 : /* We have gotten initial metadata, and are reading either
104 : messages or trailing metadata */
105 : READ_STATE_GOT_INITIAL_METADATA,
106 : /* The stream is closed for reading */
107 : READ_STATE_READ_CLOSED,
108 : /* The stream is closed for reading & writing */
109 : READ_STATE_STREAM_CLOSED
110 : } read_state;
111 :
112 : typedef enum {
113 : WRITE_STATE_INITIAL = 0,
114 : WRITE_STATE_STARTED,
115 : WRITE_STATE_WRITE_CLOSED
116 : } write_state;
117 :
118 : typedef struct batch_control {
119 : grpc_call *call;
120 : grpc_cq_completion cq_completion;
121 : grpc_closure finish_batch;
122 : void *notify_tag;
123 : gpr_refcount steps_to_complete;
124 :
125 : gpr_uint8 send_initial_metadata;
126 : gpr_uint8 send_message;
127 : gpr_uint8 send_final_op;
128 : gpr_uint8 recv_initial_metadata;
129 : gpr_uint8 recv_message;
130 : gpr_uint8 recv_final_op;
131 : gpr_uint8 is_notify_tag_closure;
132 : gpr_uint8 success;
133 : } batch_control;
134 :
135 : struct grpc_call {
136 : grpc_completion_queue *cq;
137 : grpc_channel *channel;
138 : grpc_call *parent;
139 : grpc_call *first_child;
140 : /* TODO(ctiller): share with cq if possible? */
141 : gpr_mu mu;
142 :
143 : /* client or server call */
144 : gpr_uint8 is_client;
145 : /* is the alarm set */
146 : gpr_uint8 have_alarm;
147 : /** has grpc_call_destroy been called */
148 : gpr_uint8 destroy_called;
149 : /** flag indicating that cancellation is inherited */
150 : gpr_uint8 cancellation_is_inherited;
151 : /** bitmask of live batches */
152 : gpr_uint8 used_batches;
153 : /** which ops are in-flight */
154 : gpr_uint8 sent_initial_metadata;
155 : gpr_uint8 sending_message;
156 : gpr_uint8 sent_final_op;
157 : gpr_uint8 received_initial_metadata;
158 : gpr_uint8 receiving_message;
159 : gpr_uint8 received_final_op;
160 :
161 : batch_control active_batches[MAX_CONCURRENT_BATCHES];
162 :
163 : /* first idx: is_receiving, second idx: is_trailing */
164 : grpc_metadata_batch metadata_batch[2][2];
165 :
166 : /* Buffered read metadata waiting to be returned to the application.
167 : Element 0 is initial metadata, element 1 is trailing metadata. */
168 : grpc_metadata_array *buffered_metadata[2];
169 :
170 : /* Received call statuses from various sources */
171 : received_status status[STATUS_SOURCE_COUNT];
172 :
173 : /* Compression algorithm for the call */
174 : grpc_compression_algorithm compression_algorithm;
175 : /* Supported encodings (compression algorithms), a bitset */
176 : gpr_uint32 encodings_accepted_by_peer;
177 :
178 : /* Contexts for various subsystems (security, tracing, ...). */
179 : grpc_call_context_element context[GRPC_CONTEXT_COUNT];
180 :
181 : /* Deadline alarm - if have_alarm is non-zero */
182 : grpc_timer alarm;
183 :
184 : /* for the client, extra metadata is initial metadata; for the
185 : server, it's trailing metadata */
186 : grpc_linked_mdelem send_extra_metadata[MAX_SEND_EXTRA_METADATA_COUNT];
187 : int send_extra_metadata_count;
188 : gpr_timespec send_deadline;
189 :
190 : /** siblings: children of the same parent form a list, and this list is
191 : protected under
192 : parent->mu */
193 : grpc_call *sibling_next;
194 : grpc_call *sibling_prev;
195 :
196 : grpc_slice_buffer_stream sending_stream;
197 : grpc_byte_stream *receiving_stream;
198 : grpc_byte_buffer **receiving_buffer;
199 : gpr_slice receiving_slice;
200 : grpc_closure receiving_slice_ready;
201 : grpc_closure receiving_stream_ready;
202 : gpr_uint32 test_only_last_message_flags;
203 :
204 : union {
205 : struct {
206 : grpc_status_code *status;
207 : char **status_details;
208 : size_t *status_details_capacity;
209 : } client;
210 : struct {
211 : int *cancelled;
212 : } server;
213 : } final_op;
214 : };
215 :
216 : #define CALL_STACK_FROM_CALL(call) ((grpc_call_stack *)((call) + 1))
217 : #define CALL_FROM_CALL_STACK(call_stack) (((grpc_call *)(call_stack)) - 1)
218 : #define CALL_ELEM_FROM_CALL(call, idx) \
219 : grpc_call_stack_element(CALL_STACK_FROM_CALL(call), idx)
220 : #define CALL_FROM_TOP_ELEM(top_elem) \
221 : CALL_FROM_CALL_STACK(grpc_call_stack_from_top_element(top_elem))
222 :
223 : static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
224 : gpr_timespec deadline);
225 : static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
226 : grpc_transport_stream_op *op);
227 : static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
228 : grpc_status_code status,
229 : const char *description);
230 : static void destroy_call(grpc_exec_ctx *exec_ctx, void *call_stack,
231 : int success);
232 : static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
233 : int success);
234 :
235 4437606 : grpc_call *grpc_call_create(grpc_channel *channel, grpc_call *parent_call,
236 : gpr_uint32 propagation_mask,
237 : grpc_completion_queue *cq,
238 : const void *server_transport_data,
239 : grpc_mdelem **add_initial_metadata,
240 : size_t add_initial_metadata_count,
241 : gpr_timespec send_deadline) {
242 : size_t i, j;
243 4437606 : grpc_channel_stack *channel_stack = grpc_channel_get_channel_stack(channel);
244 4440985 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
245 : grpc_call *call;
246 : GPR_TIMER_BEGIN("grpc_call_create", 0);
247 4440985 : call = gpr_malloc(sizeof(grpc_call) + channel_stack->call_stack_size);
248 4442034 : memset(call, 0, sizeof(grpc_call));
249 4442034 : gpr_mu_init(&call->mu);
250 4438801 : call->channel = channel;
251 4438801 : call->cq = cq;
252 4438801 : call->parent = parent_call;
253 4438801 : call->is_client = server_transport_data == NULL;
254 4438801 : if (call->is_client) {
255 2267807 : GPR_ASSERT(add_initial_metadata_count < MAX_SEND_EXTRA_METADATA_COUNT);
256 6812898 : for (i = 0; i < add_initial_metadata_count; i++) {
257 4545202 : call->send_extra_metadata[i].md = add_initial_metadata[i];
258 : }
259 2267807 : call->send_extra_metadata_count = (int)add_initial_metadata_count;
260 : } else {
261 2170994 : GPR_ASSERT(add_initial_metadata_count == 0);
262 2170994 : call->send_extra_metadata_count = 0;
263 : }
264 13322912 : for (i = 0; i < 2; i++) {
265 26643876 : for (j = 0; j < 2; j++) {
266 17759965 : call->metadata_batch[i][j].deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
267 : }
268 : }
269 4442898 : call->send_deadline = send_deadline;
270 4442898 : GRPC_CHANNEL_INTERNAL_REF(channel, "call");
271 : /* initial refcount dropped by grpc_call_destroy */
272 8886808 : grpc_call_stack_init(&exec_ctx, channel_stack, 1, destroy_call, call,
273 4443404 : call->context, server_transport_data,
274 : CALL_STACK_FROM_CALL(call));
275 4442114 : if (cq != NULL) {
276 2272583 : GRPC_CQ_INTERNAL_REF(cq, "bind");
277 2272847 : grpc_call_stack_set_pollset(&exec_ctx, CALL_STACK_FROM_CALL(call),
278 : grpc_cq_pollset(cq));
279 : }
280 4441346 : if (parent_call != NULL) {
281 494 : GRPC_CALL_INTERNAL_REF(parent_call, "child");
282 494 : GPR_ASSERT(call->is_client);
283 494 : GPR_ASSERT(!parent_call->is_client);
284 :
285 494 : gpr_mu_lock(&parent_call->mu);
286 :
287 494 : if (propagation_mask & GRPC_PROPAGATE_DEADLINE) {
288 491 : send_deadline = gpr_time_min(
289 : gpr_convert_clock_type(send_deadline,
290 : parent_call->send_deadline.clock_type),
291 : parent_call->send_deadline);
292 : }
293 : /* for now GRPC_PROPAGATE_TRACING_CONTEXT *MUST* be passed with
294 : * GRPC_PROPAGATE_STATS_CONTEXT */
295 : /* TODO(ctiller): This should change to use the appropriate census start_op
296 : * call. */
297 494 : if (propagation_mask & GRPC_PROPAGATE_CENSUS_TRACING_CONTEXT) {
298 494 : GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
299 494 : grpc_call_context_set(call, GRPC_CONTEXT_TRACING,
300 : parent_call->context[GRPC_CONTEXT_TRACING].value,
301 : NULL);
302 : } else {
303 0 : GPR_ASSERT(propagation_mask & GRPC_PROPAGATE_CENSUS_STATS_CONTEXT);
304 : }
305 494 : if (propagation_mask & GRPC_PROPAGATE_CANCELLATION) {
306 489 : call->cancellation_is_inherited = 1;
307 : }
308 :
309 494 : if (parent_call->first_child == NULL) {
310 494 : parent_call->first_child = call;
311 494 : call->sibling_next = call->sibling_prev = call;
312 : } else {
313 0 : call->sibling_next = parent_call->first_child;
314 0 : call->sibling_prev = parent_call->first_child->sibling_prev;
315 0 : call->sibling_next->sibling_prev = call->sibling_prev->sibling_next =
316 : call;
317 : }
318 :
319 494 : gpr_mu_unlock(&parent_call->mu);
320 : }
321 4441346 : if (gpr_time_cmp(send_deadline, gpr_inf_future(send_deadline.clock_type)) !=
322 : 0) {
323 524827 : set_deadline_alarm(&exec_ctx, call, send_deadline);
324 : }
325 4443987 : grpc_exec_ctx_finish(&exec_ctx);
326 : GPR_TIMER_END("grpc_call_create", 0);
327 4442279 : return call;
328 : }
329 :
330 2170886 : void grpc_call_set_completion_queue(grpc_exec_ctx *exec_ctx, grpc_call *call,
331 : grpc_completion_queue *cq) {
332 2170886 : GPR_ASSERT(cq);
333 2170886 : call->cq = cq;
334 2170886 : GRPC_CQ_INTERNAL_REF(cq, "bind");
335 2170912 : grpc_call_stack_set_pollset(exec_ctx, CALL_STACK_FROM_CALL(call),
336 : grpc_cq_pollset(cq));
337 2170942 : }
338 :
339 0 : grpc_completion_queue *grpc_call_get_completion_queue(grpc_call *call) {
340 0 : return call->cq;
341 : }
342 :
343 : #ifdef GRPC_STREAM_REFCOUNT_DEBUG
344 : void grpc_call_internal_ref(grpc_call *c, const char *reason) {
345 : grpc_call_stack_ref(CALL_STACK_FROM_CALL(c), reason);
346 : }
347 : void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c,
348 : const char *reason) {
349 : grpc_call_stack_unref(exec_ctx, CALL_STACK_FROM_CALL(c), reason);
350 : }
351 : #else
352 16044225 : void grpc_call_internal_ref(grpc_call *c) {
353 16044225 : grpc_call_stack_ref(CALL_STACK_FROM_CALL(c));
354 16110764 : }
355 20554286 : void grpc_call_internal_unref(grpc_exec_ctx *exec_ctx, grpc_call *c) {
356 20554286 : grpc_call_stack_unref(exec_ctx, CALL_STACK_FROM_CALL(c));
357 20545463 : }
358 : #endif
359 :
360 4434014 : static void destroy_call(grpc_exec_ctx *exec_ctx, void *call, int success) {
361 : size_t i;
362 : int ii;
363 4433949 : grpc_call *c = call;
364 : GPR_TIMER_BEGIN("destroy_call", 0);
365 13316678 : for (i = 0; i < 2; i++) {
366 8874719 : grpc_metadata_batch_destroy(
367 : &c->metadata_batch[1 /* is_receiving */][i /* is_initial */]);
368 : }
369 4441959 : if (c->receiving_stream != NULL) {
370 0 : grpc_byte_stream_destroy(c->receiving_stream);
371 : }
372 4441959 : grpc_call_stack_destroy(exec_ctx, CALL_STACK_FROM_CALL(c));
373 4439128 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, c->channel, "call");
374 4441907 : gpr_mu_destroy(&c->mu);
375 22210779 : for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
376 17768569 : if (c->status[i].details) {
377 1150505 : GRPC_MDSTR_UNREF(c->status[i].details);
378 : }
379 : }
380 11678160 : for (ii = 0; ii < c->send_extra_metadata_count; ii++) {
381 7236869 : GRPC_MDELEM_UNREF(c->send_extra_metadata[ii].md);
382 : }
383 13324747 : for (i = 0; i < GRPC_CONTEXT_COUNT; i++) {
384 8883540 : if (c->context[i].destroy) {
385 304561 : c->context[i].destroy(c->context[i].value);
386 : }
387 : }
388 4441272 : if (c->cq) {
389 4441737 : GRPC_CQ_INTERNAL_UNREF(c->cq, "bind");
390 : }
391 4442899 : gpr_free(c);
392 : GPR_TIMER_END("destroy_call", 0);
393 4443193 : }
394 :
395 4440529 : static void set_status_code(grpc_call *call, status_source source,
396 : gpr_uint32 status) {
397 8881301 : if (call->status[source].is_set) return;
398 :
399 4440587 : call->status[source].is_set = 1;
400 4440587 : call->status[source].code = (grpc_status_code)status;
401 :
402 : /* TODO(ctiller): what to do about the flush that was previously here */
403 : }
404 :
405 4336182 : static void set_compression_algorithm(grpc_call *call,
406 : grpc_compression_algorithm algo) {
407 4336345 : call->compression_algorithm = algo;
408 4336182 : }
409 :
410 4 : grpc_compression_algorithm grpc_call_test_only_get_compression_algorithm(
411 : grpc_call *call) {
412 : grpc_compression_algorithm algorithm;
413 4 : gpr_mu_lock(&call->mu);
414 4 : algorithm = call->compression_algorithm;
415 4 : gpr_mu_unlock(&call->mu);
416 4 : return algorithm;
417 : }
418 :
419 4 : gpr_uint32 grpc_call_test_only_get_message_flags(grpc_call *call) {
420 : gpr_uint32 flags;
421 4 : gpr_mu_lock(&call->mu);
422 4 : flags = call->test_only_last_message_flags;
423 4 : gpr_mu_unlock(&call->mu);
424 4 : return flags;
425 : }
426 :
427 3 : static void destroy_encodings_accepted_by_peer(void *p) { return; }
428 :
429 4336277 : static void set_encodings_accepted_by_peer(grpc_call *call, grpc_mdelem *mdel) {
430 : size_t i;
431 : grpc_compression_algorithm algorithm;
432 : gpr_slice_buffer accept_encoding_parts;
433 : gpr_slice accept_encoding_slice;
434 : void *accepted_user_data;
435 :
436 4336277 : accepted_user_data =
437 : grpc_mdelem_get_user_data(mdel, destroy_encodings_accepted_by_peer);
438 4335615 : if (accepted_user_data != NULL) {
439 4335612 : call->encodings_accepted_by_peer =
440 4335612 : (gpr_uint32)(((gpr_uintptr)accepted_user_data) - 1);
441 8671227 : return;
442 : }
443 :
444 3 : accept_encoding_slice = mdel->value->slice;
445 3 : gpr_slice_buffer_init(&accept_encoding_parts);
446 3 : gpr_slice_split(accept_encoding_slice, ",", &accept_encoding_parts);
447 :
448 : /* No need to zero call->encodings_accepted_by_peer: grpc_call_create already
449 : * zeroes the whole grpc_call */
450 : /* Always support no compression */
451 3 : GPR_BITSET(&call->encodings_accepted_by_peer, GRPC_COMPRESS_NONE);
452 12 : for (i = 0; i < accept_encoding_parts.count; i++) {
453 9 : const gpr_slice *accept_encoding_entry_slice =
454 9 : &accept_encoding_parts.slices[i];
455 27 : if (grpc_compression_algorithm_parse(
456 9 : (const char *)GPR_SLICE_START_PTR(*accept_encoding_entry_slice),
457 18 : GPR_SLICE_LENGTH(*accept_encoding_entry_slice), &algorithm)) {
458 9 : GPR_BITSET(&call->encodings_accepted_by_peer, algorithm);
459 : } else {
460 0 : char *accept_encoding_entry_str =
461 : gpr_dump_slice(*accept_encoding_entry_slice, GPR_DUMP_ASCII);
462 0 : gpr_log(GPR_ERROR,
463 : "Invalid entry in accept encoding metadata: '%s'. Ignoring.",
464 : accept_encoding_entry_str);
465 0 : gpr_free(accept_encoding_entry_str);
466 : }
467 : }
468 :
469 3 : gpr_slice_buffer_destroy(&accept_encoding_parts);
470 :
471 3 : grpc_mdelem_set_user_data(
472 : mdel, destroy_encodings_accepted_by_peer,
473 3 : (void *)(((gpr_uintptr)call->encodings_accepted_by_peer) + 1));
474 : }
475 :
476 1452 : gpr_uint32 grpc_call_test_only_get_encodings_accepted_by_peer(grpc_call *call) {
477 : gpr_uint32 encodings_accepted_by_peer;
478 1452 : gpr_mu_lock(&call->mu);
479 1452 : encodings_accepted_by_peer = call->encodings_accepted_by_peer;
480 1452 : gpr_mu_unlock(&call->mu);
481 1452 : return encodings_accepted_by_peer;
482 : }
483 :
484 1150614 : static void set_status_details(grpc_call *call, status_source source,
485 : grpc_mdstr *status) {
486 1150837 : if (call->status[source].details != NULL) {
487 181 : GRPC_MDSTR_UNREF(call->status[source].details);
488 : }
489 1150837 : call->status[source].details = status;
490 1150614 : }
491 :
492 4432483 : static void get_final_status(grpc_call *call,
493 : void (*set_value)(grpc_status_code code,
494 : void *user_data),
495 : void *set_value_user_data) {
496 : int i;
497 8975989 : for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
498 8975988 : if (call->status[i].is_set) {
499 4432482 : set_value(call->status[i].code, set_value_user_data);
500 8874129 : return;
501 : }
502 : }
503 1 : if (call->is_client) {
504 0 : set_value(GRPC_STATUS_UNKNOWN, set_value_user_data);
505 : } else {
506 1 : set_value(GRPC_STATUS_OK, set_value_user_data);
507 : }
508 : }
509 :
510 2272035 : static void get_final_details(grpc_call *call, char **out_details,
511 : size_t *out_details_capacity) {
512 : int i;
513 6813978 : for (i = 0; i < STATUS_SOURCE_COUNT; i++) {
514 6813430 : if (call->status[i].is_set) {
515 2271487 : if (call->status[i].details) {
516 625417 : gpr_slice details = call->status[i].details->slice;
517 625417 : size_t len = GPR_SLICE_LENGTH(details);
518 625417 : if (len + 1 > *out_details_capacity) {
519 524418 : *out_details_capacity =
520 524418 : GPR_MAX(len + 1, *out_details_capacity * 3 / 2);
521 524418 : *out_details = gpr_realloc(*out_details, *out_details_capacity);
522 : }
523 625808 : memcpy(*out_details, GPR_SLICE_START_PTR(details), len);
524 625808 : (*out_details)[len] = 0;
525 : } else {
526 1646070 : goto no_details;
527 : }
528 2898369 : return;
529 : }
530 : }
531 :
532 : no_details:
533 1646618 : if (0 == *out_details_capacity) {
534 1646616 : *out_details_capacity = 8;
535 1646616 : *out_details = gpr_malloc(*out_details_capacity);
536 : }
537 1646753 : **out_details = 0;
538 : }
539 :
540 5737901 : static grpc_linked_mdelem *linked_from_md(grpc_metadata *md) {
541 5738049 : return (grpc_linked_mdelem *)&md->internal_data;
542 : }
543 :
544 6599628 : static int prepare_application_metadata(grpc_call *call, int count,
545 : grpc_metadata *metadata,
546 : int is_trailing,
547 : int prepend_extra_metadata) {
548 : int i;
549 6599377 : grpc_metadata_batch *batch =
550 : &call->metadata_batch[0 /* is_receiving */][is_trailing];
551 6599628 : if (prepend_extra_metadata) {
552 4437291 : if (call->send_extra_metadata_count == 0) {
553 0 : prepend_extra_metadata = 0;
554 : } else {
555 11673996 : for (i = 0; i < call->send_extra_metadata_count; i++) {
556 7232107 : GRPC_MDELEM_REF(call->send_extra_metadata[i].md);
557 : }
558 7238324 : for (i = 1; i < call->send_extra_metadata_count; i++) {
559 2796435 : call->send_extra_metadata[i].prev = &call->send_extra_metadata[i - 1];
560 : }
561 7238484 : for (i = 0; i < call->send_extra_metadata_count - 1; i++) {
562 2796422 : call->send_extra_metadata[i].next = &call->send_extra_metadata[i + 1];
563 : }
564 : }
565 : }
566 8169543 : for (i = 0; i < count; i++) {
567 1565144 : grpc_metadata *md = &metadata[i];
568 1565070 : grpc_linked_mdelem *l = (grpc_linked_mdelem *)&md->internal_data;
569 : GPR_ASSERT(sizeof(grpc_linked_mdelem) == sizeof(md->internal_data));
570 3130288 : l->md = grpc_mdelem_from_string_and_buffer(
571 1565144 : md->key, (const gpr_uint8 *)md->value, md->value_length);
572 1565144 : if (!grpc_mdstr_is_legal_header(l->md->key)) {
573 0 : gpr_log(GPR_ERROR, "attempt to send invalid metadata key: %s",
574 0 : grpc_mdstr_as_c_string(l->md->key));
575 0 : return 0;
576 3130158 : } else if (!grpc_mdstr_is_bin_suffixed(l->md->key) &&
577 1565014 : !grpc_mdstr_is_legal_nonbin_header(l->md->value)) {
578 0 : gpr_log(GPR_ERROR, "attempt to send invalid metadata value");
579 0 : return 0;
580 : }
581 : }
582 7646965 : for (i = 1; i < count; i++) {
583 1042841 : linked_from_md(&metadata[i])->prev = linked_from_md(&metadata[i - 1]);
584 : }
585 7646965 : for (i = 0; i < count - 1; i++) {
586 1042841 : linked_from_md(&metadata[i])->next = linked_from_md(&metadata[i + 1]);
587 : }
588 6604399 : switch (prepend_extra_metadata * 2 + (count != 0)) {
589 : case 0:
590 : /* no prepend, no metadata => nothing to do */
591 2168802 : batch->list.head = batch->list.tail = NULL;
592 2168802 : break;
593 : case 1:
594 : /* metadata, but no prepend */
595 122 : batch->list.head = linked_from_md(&metadata[0]);
596 142 : batch->list.tail = linked_from_md(&metadata[count - 1]);
597 122 : batch->list.head->prev = NULL;
598 122 : batch->list.tail->next = NULL;
599 122 : break;
600 : case 2:
601 : /* prepend, but no md */
602 3919242 : batch->list.head = &call->send_extra_metadata[0];
603 3919242 : batch->list.tail =
604 3919242 : &call->send_extra_metadata[call->send_extra_metadata_count - 1];
605 3919242 : batch->list.head->prev = NULL;
606 3919242 : batch->list.tail->next = NULL;
607 3919242 : break;
608 : case 3:
609 : /* prepend AND md */
610 522205 : batch->list.head = &call->send_extra_metadata[0];
611 1044410 : call->send_extra_metadata[call->send_extra_metadata_count - 1].next =
612 522175 : linked_from_md(&metadata[0]);
613 1044380 : linked_from_md(&metadata[0])->prev =
614 522205 : &call->send_extra_metadata[call->send_extra_metadata_count - 1];
615 522235 : batch->list.tail = linked_from_md(&metadata[count - 1]);
616 522205 : batch->list.head->prev = NULL;
617 522205 : batch->list.tail->next = NULL;
618 522205 : break;
619 : default:
620 0 : GPR_UNREACHABLE_CODE(return 0);
621 : }
622 :
623 6604148 : return 1;
624 : }
625 :
626 4442123 : void grpc_call_destroy(grpc_call *c) {
627 : int cancel;
628 4442123 : grpc_call *parent = c->parent;
629 4442123 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
630 :
631 : GPR_TIMER_BEGIN("grpc_call_destroy", 0);
632 4442123 : GRPC_API_TRACE("grpc_call_destroy(c=%p)", 1, (c));
633 :
634 4442680 : if (parent) {
635 489 : gpr_mu_lock(&parent->mu);
636 489 : if (c == parent->first_child) {
637 489 : parent->first_child = c->sibling_next;
638 489 : if (c == parent->first_child) {
639 489 : parent->first_child = NULL;
640 : }
641 489 : c->sibling_prev->sibling_next = c->sibling_next;
642 489 : c->sibling_next->sibling_prev = c->sibling_prev;
643 : }
644 489 : gpr_mu_unlock(&parent->mu);
645 489 : GRPC_CALL_INTERNAL_UNREF(&exec_ctx, parent, "child");
646 : }
647 :
648 4442680 : gpr_mu_lock(&c->mu);
649 4442726 : GPR_ASSERT(!c->destroy_called);
650 4442726 : c->destroy_called = 1;
651 4442726 : if (c->have_alarm) {
652 965 : grpc_timer_cancel(&exec_ctx, &c->alarm);
653 : }
654 4442726 : cancel = !c->received_final_op;
655 4442726 : gpr_mu_unlock(&c->mu);
656 4442600 : if (cancel) grpc_call_cancel(c, NULL);
657 4442600 : GRPC_CALL_INTERNAL_UNREF(&exec_ctx, c, "destroy");
658 4441261 : grpc_exec_ctx_finish(&exec_ctx);
659 : GPR_TIMER_END("grpc_call_destroy", 0);
660 4441228 : }
661 :
662 1103 : grpc_call_error grpc_call_cancel(grpc_call *call, void *reserved) {
663 1103 : GRPC_API_TRACE("grpc_call_cancel(call=%p, reserved=%p)", 2, (call, reserved));
664 1103 : GPR_ASSERT(!reserved);
665 1103 : return grpc_call_cancel_with_status(call, GRPC_STATUS_CANCELLED, "Cancelled",
666 : NULL);
667 : }
668 :
669 1208 : grpc_call_error grpc_call_cancel_with_status(grpc_call *c,
670 : grpc_status_code status,
671 : const char *description,
672 : void *reserved) {
673 : grpc_call_error r;
674 1208 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
675 1208 : GRPC_API_TRACE(
676 : "grpc_call_cancel_with_status("
677 : "c=%p, status=%d, description=%s, reserved=%p)",
678 : 4, (c, (int)status, description, reserved));
679 1208 : GPR_ASSERT(reserved == NULL);
680 1208 : gpr_mu_lock(&c->mu);
681 1208 : r = cancel_with_status(&exec_ctx, c, status, description);
682 1208 : gpr_mu_unlock(&c->mu);
683 1208 : grpc_exec_ctx_finish(&exec_ctx);
684 1208 : return r;
685 : }
686 :
687 : typedef struct cancel_closure {
688 : grpc_closure closure;
689 : grpc_call *call;
690 : grpc_status_code status;
691 : } cancel_closure;
692 :
693 1745 : static void done_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) {
694 1681 : cancel_closure *cc = ccp;
695 1745 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, cc->call, "cancel");
696 1745 : gpr_free(cc);
697 1745 : }
698 :
699 1745 : static void send_cancel(grpc_exec_ctx *exec_ctx, void *ccp, int success) {
700 : grpc_transport_stream_op op;
701 1681 : cancel_closure *cc = ccp;
702 1745 : memset(&op, 0, sizeof(op));
703 1745 : op.cancel_with_status = cc->status;
704 : /* reuse closure to catch completion */
705 1745 : grpc_closure_init(&cc->closure, done_cancel, cc);
706 1745 : op.on_complete = &cc->closure;
707 1745 : execute_op(exec_ctx, cc->call, &op);
708 1745 : }
709 :
710 1745 : static grpc_call_error cancel_with_status(grpc_exec_ctx *exec_ctx, grpc_call *c,
711 : grpc_status_code status,
712 : const char *description) {
713 1681 : grpc_mdstr *details =
714 1745 : description ? grpc_mdstr_from_string(description) : NULL;
715 1745 : cancel_closure *cc = gpr_malloc(sizeof(*cc));
716 :
717 1745 : GPR_ASSERT(status != GRPC_STATUS_OK);
718 :
719 1681 : set_status_code(c, STATUS_FROM_API_OVERRIDE, (gpr_uint32)status);
720 1681 : set_status_details(c, STATUS_FROM_API_OVERRIDE, details);
721 :
722 1745 : grpc_closure_init(&cc->closure, send_cancel, cc);
723 1745 : cc->call = c;
724 1745 : cc->status = status;
725 1745 : GRPC_CALL_INTERNAL_REF(c, "cancel");
726 1745 : grpc_exec_ctx_enqueue(exec_ctx, &cc->closure, 1);
727 :
728 1745 : return GRPC_CALL_OK;
729 : }
730 :
731 12365401 : static void execute_op(grpc_exec_ctx *exec_ctx, grpc_call *call,
732 : grpc_transport_stream_op *op) {
733 : grpc_call_element *elem;
734 :
735 : GPR_TIMER_BEGIN("execute_op", 0);
736 12365401 : elem = CALL_ELEM_FROM_CALL(call, 0);
737 12378837 : op->context = call->context;
738 12378837 : elem->filter->start_transport_stream_op(exec_ctx, elem, op);
739 : GPR_TIMER_END("execute_op", 0);
740 12377546 : }
741 :
742 1628 : char *grpc_call_get_peer(grpc_call *call) {
743 1628 : grpc_call_element *elem = CALL_ELEM_FROM_CALL(call, 0);
744 1628 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
745 1628 : char *result = elem->filter->get_peer(&exec_ctx, elem);
746 1628 : GRPC_API_TRACE("grpc_call_get_peer(%p)", 1, (call));
747 1628 : grpc_exec_ctx_finish(&exec_ctx);
748 1628 : return result;
749 : }
750 :
751 2171296 : grpc_call *grpc_call_from_top_element(grpc_call_element *elem) {
752 2171296 : return CALL_FROM_TOP_ELEM(elem);
753 : }
754 :
755 1048944 : static void call_alarm(grpc_exec_ctx *exec_ctx, void *arg, int success) {
756 1048915 : grpc_call *call = arg;
757 1048944 : gpr_mu_lock(&call->mu);
758 1048944 : call->have_alarm = 0;
759 1048944 : if (success) {
760 510 : cancel_with_status(exec_ctx, call, GRPC_STATUS_DEADLINE_EXCEEDED,
761 : "Deadline Exceeded");
762 : }
763 1048944 : gpr_mu_unlock(&call->mu);
764 1048944 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "alarm");
765 1048944 : }
766 :
767 1048944 : static void set_deadline_alarm(grpc_exec_ctx *exec_ctx, grpc_call *call,
768 : gpr_timespec deadline) {
769 1048944 : if (call->have_alarm) {
770 0 : gpr_log(GPR_ERROR, "Attempt to set deadline alarm twice");
771 0 : assert(0);
772 : return;
773 : }
774 1048944 : GRPC_CALL_INTERNAL_REF(call, "alarm");
775 1048944 : call->have_alarm = 1;
776 1048944 : call->send_deadline = gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC);
777 1048944 : grpc_timer_init(exec_ctx, &call->alarm, call->send_deadline, call_alarm, call,
778 : gpr_now(GPR_CLOCK_MONOTONIC));
779 1048944 : }
780 :
781 : /* we offset status by a small amount when storing it into transport metadata
782 : as metadata cannot store a 0 value (which is used as OK for grpc_status_codes
783 : */
784 : #define STATUS_OFFSET 1
785 507 : static void destroy_status(void *ignored) {}
786 :
787 2272412 : static gpr_uint32 decode_status(grpc_mdelem *md) {
788 : gpr_uint32 status;
789 : void *user_data;
790 2272412 : if (md == GRPC_MDELEM_GRPC_STATUS_0) return 0;
791 524017 : if (md == GRPC_MDELEM_GRPC_STATUS_1) return 1;
792 523750 : if (md == GRPC_MDELEM_GRPC_STATUS_2) return 2;
793 523730 : user_data = grpc_mdelem_get_user_data(md, destroy_status);
794 523730 : if (user_data != NULL) {
795 523211 : status = ((gpr_uint32)(gpr_intptr)user_data) - STATUS_OFFSET;
796 : } else {
797 1038 : if (!gpr_parse_bytes_to_uint32(grpc_mdstr_as_c_string(md->value),
798 1038 : GPR_SLICE_LENGTH(md->value->slice),
799 : &status)) {
800 0 : status = GRPC_STATUS_UNKNOWN; /* could not parse status code */
801 : }
802 519 : grpc_mdelem_set_user_data(md, destroy_status,
803 519 : (void *)(gpr_intptr)(status + STATUS_OFFSET));
804 : }
805 523730 : return status;
806 : }
807 :
808 4337341 : static gpr_uint32 decode_compression(grpc_mdelem *md) {
809 4337341 : grpc_compression_algorithm algorithm =
810 4337341 : grpc_compression_algorithm_from_mdstr(md->value);
811 4337219 : if (algorithm == GRPC_COMPRESS_ALGORITHMS_COUNT) {
812 0 : const char *md_c_str = grpc_mdstr_as_c_string(md->value);
813 0 : gpr_log(GPR_ERROR, "Invalid compression algorithm: '%s'", md_c_str);
814 : }
815 4337156 : return algorithm;
816 : }
817 :
818 15294260 : static grpc_mdelem *recv_common_filter(grpc_call *call, grpc_mdelem *elem) {
819 15294260 : if (elem->key == GRPC_MDSTR_GRPC_STATUS) {
820 : GPR_TIMER_BEGIN("status", 0);
821 2272439 : set_status_code(call, STATUS_FROM_WIRE, decode_status(elem));
822 : GPR_TIMER_END("status", 0);
823 2272294 : return NULL;
824 13021821 : } else if (elem->key == GRPC_MDSTR_GRPC_MESSAGE) {
825 : GPR_TIMER_BEGIN("status-details", 0);
826 625106 : set_status_details(call, STATUS_FROM_WIRE, GRPC_MDSTR_REF(elem->value));
827 : GPR_TIMER_END("status-details", 0);
828 625106 : return NULL;
829 : }
830 12396216 : return elem;
831 : }
832 :
833 3735138 : static grpc_mdelem *publish_app_metadata(grpc_call *call, grpc_mdelem *elem,
834 : int is_trailing) {
835 : grpc_metadata_array *dest;
836 : grpc_metadata *mdusr;
837 : GPR_TIMER_BEGIN("publish_app_metadata", 0);
838 3735138 : dest = call->buffered_metadata[is_trailing];
839 3735138 : if (dest->count == dest->capacity) {
840 2170901 : dest->capacity = GPR_MAX(dest->capacity + 8, dest->capacity * 2);
841 2171035 : dest->metadata =
842 2170901 : gpr_realloc(dest->metadata, sizeof(grpc_metadata) * dest->capacity);
843 : }
844 3735272 : mdusr = &dest->metadata[dest->count++];
845 3735272 : mdusr->key = grpc_mdstr_as_c_string(elem->key);
846 3735271 : mdusr->value = grpc_mdstr_as_c_string(elem->value);
847 3735221 : mdusr->value_length = GPR_SLICE_LENGTH(elem->value->slice);
848 : GPR_TIMER_END("publish_app_metadata", 0);
849 3735221 : return elem;
850 : }
851 :
852 12397372 : static grpc_mdelem *recv_initial_filter(void *callp, grpc_mdelem *elem) {
853 12396894 : grpc_call *call = callp;
854 12397372 : elem = recv_common_filter(call, elem);
855 12408708 : if (elem == NULL) {
856 12 : return NULL;
857 12408696 : } else if (elem->key == GRPC_MDSTR_GRPC_ENCODING) {
858 : GPR_TIMER_BEGIN("compression_algorithm", 0);
859 4337349 : set_compression_algorithm(call, decode_compression(elem));
860 : GPR_TIMER_END("compression_algorithm", 0);
861 4337199 : return NULL;
862 8071347 : } else if (elem->key == GRPC_MDSTR_GRPC_ACCEPT_ENCODING) {
863 : GPR_TIMER_BEGIN("encodings_accepted_by_peer", 0);
864 4336298 : set_encodings_accepted_by_peer(call, elem);
865 : GPR_TIMER_END("encodings_accepted_by_peer", 0);
866 4335748 : return NULL;
867 : } else {
868 3735049 : return publish_app_metadata(call, elem, 0);
869 : }
870 : }
871 :
872 2897639 : static grpc_mdelem *recv_trailing_filter(void *callp, grpc_mdelem *elem) {
873 2897436 : grpc_call *call = callp;
874 2897639 : elem = recv_common_filter(call, elem);
875 2897575 : if (elem == NULL) {
876 2897294 : return NULL;
877 : } else {
878 99 : return publish_app_metadata(call, elem, 1);
879 : }
880 : }
881 :
882 4340842 : grpc_call_stack *grpc_call_get_call_stack(grpc_call *call) {
883 4340842 : return CALL_STACK_FROM_CALL(call);
884 : }
885 :
886 : /*
887 : * BATCH API IMPLEMENTATION
888 : */
889 :
890 2272422 : static void set_status_value_directly(grpc_status_code status, void *dest) {
891 2272422 : *(grpc_status_code *)dest = status;
892 2272422 : }
893 :
894 2170918 : static void set_cancelled_value(grpc_status_code status, void *dest) {
895 2170918 : *(int *)dest = (status != GRPC_STATUS_OK);
896 2170918 : }
897 :
898 3961156 : static int are_write_flags_valid(gpr_uint32 flags) {
899 : /* check that only bits in GRPC_WRITE_(INTERNAL?)_USED_MASK are set */
900 3961156 : const gpr_uint32 allowed_write_positions =
901 : (GRPC_WRITE_USED_MASK | GRPC_WRITE_INTERNAL_USED_MASK);
902 3961156 : const gpr_uint32 invalid_positions = ~allowed_write_positions;
903 3961347 : return !(flags & invalid_positions);
904 : }
905 :
906 12889542 : static batch_control *allocate_batch_control(grpc_call *call) {
907 : size_t i;
908 16463444 : for (i = 0; i < MAX_CONCURRENT_BATCHES; i++) {
909 16464279 : if ((call->used_batches & (1 << i)) == 0) {
910 12890377 : call->used_batches =
911 12890377 : (gpr_uint8)(call->used_batches | (gpr_uint8)(1 << i));
912 12890377 : return &call->active_batches[i];
913 : }
914 : }
915 0 : return NULL;
916 : }
917 :
918 8554170 : static void finish_batch_completion(grpc_exec_ctx *exec_ctx, void *user_data,
919 : grpc_cq_completion *storage) {
920 8553511 : batch_control *bctl = user_data;
921 8554170 : grpc_call *call = bctl->call;
922 8554170 : gpr_mu_lock(&call->mu);
923 17129086 : call->used_batches = (gpr_uint8)(
924 17129086 : call->used_batches & ~(gpr_uint8)(1 << (bctl - call->active_batches)));
925 8564543 : gpr_mu_unlock(&call->mu);
926 8564627 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
927 8562377 : }
928 :
929 12906176 : static void post_batch_completion(grpc_exec_ctx *exec_ctx,
930 : batch_control *bctl) {
931 12906176 : grpc_call *call = bctl->call;
932 12906176 : if (bctl->is_notify_tag_closure) {
933 4341520 : grpc_exec_ctx_enqueue(exec_ctx, bctl->notify_tag, bctl->success);
934 4341424 : gpr_mu_lock(&call->mu);
935 8683568 : bctl->call->used_batches =
936 8683744 : (gpr_uint8)(bctl->call->used_batches &
937 4341872 : ~(gpr_uint8)(1 << (bctl - bctl->call->active_batches)));
938 4341872 : gpr_mu_unlock(&call->mu);
939 4342058 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "completion");
940 : } else {
941 8564656 : grpc_cq_end_op(exec_ctx, bctl->call->cq, bctl->notify_tag, bctl->success,
942 : finish_batch_completion, bctl, &bctl->cq_completion);
943 : }
944 12903581 : }
945 :
946 4932037 : static void continue_receiving_slices(grpc_exec_ctx *exec_ctx,
947 : batch_control *bctl) {
948 4932037 : grpc_call *call = bctl->call;
949 : for (;;) {
950 16248414 : size_t remaining = call->receiving_stream->length -
951 8124207 : (*call->receiving_buffer)->data.raw.slice_buffer.length;
952 8124207 : if (remaining == 0) {
953 3957219 : call->receiving_message = 0;
954 3957219 : grpc_byte_stream_destroy(call->receiving_stream);
955 3963050 : call->receiving_stream = NULL;
956 3963050 : if (gpr_unref(&bctl->steps_to_complete)) {
957 2214871 : post_batch_completion(exec_ctx, bctl);
958 : }
959 3962741 : return;
960 : }
961 4166988 : if (grpc_byte_stream_next(exec_ctx, call->receiving_stream,
962 : &call->receiving_slice, remaining,
963 : &call->receiving_slice_ready)) {
964 3192181 : gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
965 : call->receiving_slice);
966 : } else {
967 975422 : return;
968 : }
969 3192170 : }
970 : }
971 :
972 975474 : static void receiving_slice_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
973 : int success) {
974 975422 : batch_control *bctl = bctlp;
975 975474 : grpc_call *call = bctl->call;
976 :
977 975474 : GPR_ASSERT(success);
978 975474 : gpr_slice_buffer_add(&(*call->receiving_buffer)->data.raw.slice_buffer,
979 : call->receiving_slice);
980 :
981 975474 : continue_receiving_slices(exec_ctx, bctl);
982 975474 : }
983 :
984 12352279 : static void finish_batch(grpc_exec_ctx *exec_ctx, void *bctlp, int success) {
985 12351532 : batch_control *bctl = bctlp;
986 12352279 : grpc_call *call = bctl->call;
987 : grpc_call *child_call;
988 : grpc_call *next_child_call;
989 :
990 12352279 : gpr_mu_lock(&call->mu);
991 12374038 : if (bctl->send_initial_metadata) {
992 4443171 : grpc_metadata_batch_destroy(
993 : &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */]);
994 : }
995 12376248 : if (bctl->send_message) {
996 3962147 : call->sending_message = 0;
997 : }
998 12376248 : if (bctl->send_final_op) {
999 4440861 : grpc_metadata_batch_destroy(
1000 : &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */]);
1001 : }
1002 12375844 : if (bctl->recv_initial_metadata) {
1003 4442897 : grpc_metadata_batch *md =
1004 : &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1005 4442897 : grpc_metadata_batch_filter(md, recv_initial_filter, call);
1006 :
1007 4440495 : if (gpr_time_cmp(md->deadline, gpr_inf_future(md->deadline.clock_type)) !=
1008 1047988 : 0 &&
1009 1047988 : !call->is_client) {
1010 : GPR_TIMER_BEGIN("set_deadline_alarm", 0);
1011 524117 : set_deadline_alarm(exec_ctx, call, md->deadline);
1012 : GPR_TIMER_END("set_deadline_alarm", 0);
1013 : }
1014 : }
1015 12376406 : if (bctl->recv_final_op) {
1016 4443158 : grpc_metadata_batch *md =
1017 : &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1018 4443158 : grpc_metadata_batch_filter(md, recv_trailing_filter, call);
1019 :
1020 4439247 : if (call->have_alarm) {
1021 1048172 : grpc_timer_cancel(exec_ctx, &call->alarm);
1022 : }
1023 : /* propagate cancellation to any interested children */
1024 4443372 : child_call = call->first_child;
1025 4443372 : if (child_call != NULL) {
1026 : do {
1027 277 : next_child_call = child_call->sibling_next;
1028 277 : if (child_call->cancellation_is_inherited) {
1029 275 : GRPC_CALL_INTERNAL_REF(child_call, "propagate_cancel");
1030 275 : grpc_call_cancel(child_call, NULL);
1031 275 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, child_call, "propagate_cancel");
1032 : }
1033 271 : child_call = next_child_call;
1034 277 : } while (child_call != call->first_child);
1035 : }
1036 :
1037 4443372 : if (call->is_client) {
1038 2272440 : get_final_status(call, set_status_value_directly,
1039 2272440 : call->final_op.client.status);
1040 2271759 : get_final_details(call, call->final_op.client.status_details,
1041 : call->final_op.client.status_details_capacity);
1042 : } else {
1043 2170932 : get_final_status(call, set_cancelled_value,
1044 2170932 : call->final_op.server.cancelled);
1045 : }
1046 :
1047 4438864 : success = 1;
1048 : }
1049 12372294 : bctl->success = success != 0;
1050 12372294 : gpr_mu_unlock(&call->mu);
1051 12370443 : if (gpr_unref(&bctl->steps_to_complete)) {
1052 10166777 : post_batch_completion(exec_ctx, bctl);
1053 : }
1054 12376711 : }
1055 :
1056 3963329 : static void receiving_stream_ready(grpc_exec_ctx *exec_ctx, void *bctlp,
1057 : int success) {
1058 3963075 : batch_control *bctl = bctlp;
1059 3963329 : grpc_call *call = bctl->call;
1060 :
1061 3963329 : if (call->receiving_stream == NULL) {
1062 865 : *call->receiving_buffer = NULL;
1063 865 : if (gpr_unref(&bctl->steps_to_complete)) {
1064 722 : post_batch_completion(exec_ctx, bctl);
1065 : }
1066 7924844 : } else if (call->receiving_stream->length >
1067 3962464 : grpc_channel_get_max_message_length(call->channel)) {
1068 27 : cancel_with_status(exec_ctx, call, GRPC_STATUS_INTERNAL,
1069 : "Max message size exceeded");
1070 27 : grpc_byte_stream_destroy(call->receiving_stream);
1071 27 : call->receiving_stream = NULL;
1072 27 : *call->receiving_buffer = NULL;
1073 27 : if (gpr_unref(&bctl->steps_to_complete)) {
1074 2 : post_batch_completion(exec_ctx, bctl);
1075 : }
1076 : } else {
1077 3962353 : call->test_only_last_message_flags = call->receiving_stream->flags;
1078 3962423 : if ((call->receiving_stream->flags & GRPC_WRITE_INTERNAL_COMPRESS) &&
1079 70 : (call->compression_algorithm > GRPC_COMPRESS_NONE)) {
1080 68 : *call->receiving_buffer = grpc_raw_compressed_byte_buffer_create(
1081 : NULL, 0, call->compression_algorithm);
1082 : } else {
1083 3962285 : *call->receiving_buffer = grpc_raw_byte_buffer_create(NULL, 0);
1084 : }
1085 3959943 : grpc_closure_init(&call->receiving_slice_ready, receiving_slice_ready,
1086 : bctl);
1087 3959948 : continue_receiving_slices(exec_ctx, bctl);
1088 : /* early out */
1089 3963464 : return;
1090 : }
1091 : }
1092 :
1093 12862462 : static grpc_call_error call_start_batch(grpc_exec_ctx *exec_ctx,
1094 : grpc_call *call, const grpc_op *ops,
1095 : size_t nops, void *notify_tag,
1096 : int is_notify_tag_closure) {
1097 : grpc_transport_stream_op stream_op;
1098 : size_t i;
1099 : const grpc_op *op;
1100 : batch_control *bctl;
1101 12861627 : int num_completion_callbacks_needed = 1;
1102 12861627 : grpc_call_error error = GRPC_CALL_OK;
1103 :
1104 : GPR_TIMER_BEGIN("grpc_call_start_batch", 0);
1105 :
1106 12862462 : GRPC_CALL_LOG_BATCH(GPR_INFO, call, ops, nops, notify_tag);
1107 :
1108 12862462 : memset(&stream_op, 0, sizeof(stream_op));
1109 :
1110 : /* TODO(ctiller): this feels like it could be made lock-free */
1111 12862462 : gpr_mu_lock(&call->mu);
1112 12896254 : bctl = allocate_batch_control(call);
1113 12853737 : memset(bctl, 0, sizeof(*bctl));
1114 12853737 : bctl->call = call;
1115 12853737 : bctl->notify_tag = notify_tag;
1116 12853737 : bctl->is_notify_tag_closure = (gpr_uint8)(is_notify_tag_closure != 0);
1117 :
1118 12853737 : if (nops == 0) {
1119 524238 : GRPC_CALL_INTERNAL_REF(call, "completion");
1120 524238 : bctl->success = 1;
1121 524238 : if (!is_notify_tag_closure) {
1122 26 : grpc_cq_begin_op(call->cq);
1123 : }
1124 524238 : gpr_mu_unlock(&call->mu);
1125 524238 : post_batch_completion(exec_ctx, bctl);
1126 523634 : return GRPC_CALL_OK;
1127 : }
1128 :
1129 : /* rewrite batch ops into a transport op */
1130 37893621 : for (i = 0; i < nops; i++) {
1131 25559416 : op = &ops[i];
1132 25559416 : if (op->reserved != NULL) {
1133 0 : error = GRPC_CALL_ERROR;
1134 0 : goto done_with_error;
1135 : }
1136 25566118 : switch (op->op) {
1137 : case GRPC_OP_SEND_INITIAL_METADATA:
1138 : /* Flag validation: currently allow no flags */
1139 4441735 : if (op->flags != 0) {
1140 22 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1141 22 : goto done_with_error;
1142 : }
1143 4441713 : if (call->sent_initial_metadata) {
1144 1 : error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1145 1 : goto done_with_error;
1146 : }
1147 4441712 : if (op->data.send_initial_metadata.count > INT_MAX) {
1148 1 : error = GRPC_CALL_ERROR_INVALID_METADATA;
1149 1 : goto done_with_error;
1150 : }
1151 4441711 : bctl->send_initial_metadata = 1;
1152 4441711 : call->sent_initial_metadata = 1;
1153 8883422 : if (!prepare_application_metadata(
1154 4441711 : call, (int)op->data.send_initial_metadata.count,
1155 4441711 : op->data.send_initial_metadata.metadata, 0, call->is_client)) {
1156 0 : error = GRPC_CALL_ERROR_INVALID_METADATA;
1157 0 : goto done_with_error;
1158 : }
1159 : /* TODO(ctiller): just make these the same variable? */
1160 4440106 : call->metadata_batch[0][0].deadline = call->send_deadline;
1161 4440106 : stream_op.send_initial_metadata =
1162 4440106 : &call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
1163 4440106 : break;
1164 : case GRPC_OP_SEND_MESSAGE:
1165 3961477 : if (!are_write_flags_valid(op->flags)) {
1166 22 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1167 22 : goto done_with_error;
1168 : }
1169 3960186 : if (op->data.send_message == NULL) {
1170 1 : error = GRPC_CALL_ERROR_INVALID_MESSAGE;
1171 1 : goto done_with_error;
1172 : }
1173 3960185 : if (call->sending_message) {
1174 1 : error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1175 1 : goto done_with_error;
1176 : }
1177 3960184 : bctl->send_message = 1;
1178 3960184 : call->sending_message = 1;
1179 7920368 : grpc_slice_buffer_stream_init(
1180 : &call->sending_stream,
1181 3960184 : &op->data.send_message->data.raw.slice_buffer, op->flags);
1182 3960977 : stream_op.send_message = &call->sending_stream.base;
1183 3960977 : break;
1184 : case GRPC_OP_SEND_CLOSE_FROM_CLIENT:
1185 : /* Flag validation: currently allow no flags */
1186 2272050 : if (op->flags != 0) {
1187 22 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1188 22 : goto done_with_error;
1189 : }
1190 2272028 : if (!call->is_client) {
1191 1 : error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1192 1 : goto done_with_error;
1193 : }
1194 2272027 : if (call->sent_final_op) {
1195 0 : error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1196 0 : goto done_with_error;
1197 : }
1198 2272027 : bctl->send_final_op = 1;
1199 2272027 : call->sent_final_op = 1;
1200 2272027 : stream_op.send_trailing_metadata =
1201 2272027 : &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1202 2272027 : break;
1203 : case GRPC_OP_SEND_STATUS_FROM_SERVER:
1204 : /* Flag validation: currently allow no flags */
1205 2169047 : if (op->flags != 0) {
1206 1 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1207 1 : goto done_with_error;
1208 : }
1209 2169046 : if (call->is_client) {
1210 1 : error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1211 1 : goto done_with_error;
1212 : }
1213 2169045 : if (call->sent_final_op) {
1214 1 : error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1215 1 : goto done_with_error;
1216 : }
1217 2169044 : if (op->data.send_status_from_server.trailing_metadata_count >
1218 : INT_MAX) {
1219 1 : error = GRPC_CALL_ERROR_INVALID_METADATA;
1220 1 : goto done_with_error;
1221 : }
1222 2169043 : bctl->send_final_op = 1;
1223 2169043 : call->sent_final_op = 1;
1224 2169043 : call->send_extra_metadata_count = 1;
1225 2169043 : call->send_extra_metadata[0].md = grpc_channel_get_reffed_status_elem(
1226 2169043 : call->channel, op->data.send_status_from_server.status);
1227 2169850 : if (op->data.send_status_from_server.status_details != NULL) {
1228 523986 : call->send_extra_metadata[1].md = grpc_mdelem_from_metadata_strings(
1229 : GRPC_MDSTR_GRPC_MESSAGE,
1230 : grpc_mdstr_from_string(
1231 : op->data.send_status_from_server.status_details));
1232 523986 : call->send_extra_metadata_count++;
1233 523986 : set_status_details(
1234 : call, STATUS_FROM_API_OVERRIDE,
1235 523908 : GRPC_MDSTR_REF(call->send_extra_metadata[1].md->value));
1236 : }
1237 2169772 : set_status_code(call, STATUS_FROM_API_OVERRIDE,
1238 2169850 : (gpr_uint32)op->data.send_status_from_server.status);
1239 4340028 : if (!prepare_application_metadata(
1240 : call,
1241 2170014 : (int)op->data.send_status_from_server.trailing_metadata_count,
1242 : op->data.send_status_from_server.trailing_metadata, 1, 1)) {
1243 0 : error = GRPC_CALL_ERROR_INVALID_METADATA;
1244 0 : goto done_with_error;
1245 : }
1246 2169886 : stream_op.send_trailing_metadata =
1247 2169886 : &call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
1248 2169886 : break;
1249 : case GRPC_OP_RECV_INITIAL_METADATA:
1250 : /* Flag validation: currently allow no flags */
1251 4441825 : if (op->flags != 0) {
1252 22 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1253 22 : goto done_with_error;
1254 : }
1255 4441803 : if (call->received_initial_metadata) {
1256 1 : error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1257 1 : goto done_with_error;
1258 : }
1259 4441802 : call->received_initial_metadata = 1;
1260 4441802 : call->buffered_metadata[0] = op->data.recv_initial_metadata;
1261 4441802 : bctl->recv_initial_metadata = 1;
1262 4441802 : stream_op.recv_initial_metadata =
1263 4441802 : &call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
1264 4441802 : break;
1265 : case GRPC_OP_RECV_MESSAGE:
1266 : /* Flag validation: currently allow no flags */
1267 3961478 : if (op->flags != 0) {
1268 1 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1269 1 : goto done_with_error;
1270 : }
1271 3961477 : if (call->receiving_message) {
1272 1 : error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1273 1 : goto done_with_error;
1274 : }
1275 3961476 : call->receiving_message = 1;
1276 3961476 : bctl->recv_message = 1;
1277 3961476 : call->receiving_buffer = op->data.recv_message;
1278 3961476 : stream_op.recv_message = &call->receiving_stream;
1279 3961476 : grpc_closure_init(&call->receiving_stream_ready, receiving_stream_ready,
1280 : bctl);
1281 3961401 : stream_op.recv_message_ready = &call->receiving_stream_ready;
1282 3961401 : num_completion_callbacks_needed++;
1283 3961401 : break;
1284 : case GRPC_OP_RECV_STATUS_ON_CLIENT:
1285 : /* Flag validation: currently allow no flags */
1286 2272504 : if (op->flags != 0) {
1287 22 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1288 22 : goto done_with_error;
1289 : }
1290 2272482 : if (!call->is_client) {
1291 1 : error = GRPC_CALL_ERROR_NOT_ON_SERVER;
1292 1 : goto done_with_error;
1293 : }
1294 2272481 : if (call->received_final_op) {
1295 1 : error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1296 1 : goto done_with_error;
1297 : }
1298 2272480 : call->received_final_op = 1;
1299 2272480 : call->buffered_metadata[1] =
1300 2272480 : op->data.recv_status_on_client.trailing_metadata;
1301 2272480 : call->final_op.client.status = op->data.recv_status_on_client.status;
1302 2272480 : call->final_op.client.status_details =
1303 2272480 : op->data.recv_status_on_client.status_details;
1304 2272480 : call->final_op.client.status_details_capacity =
1305 2272480 : op->data.recv_status_on_client.status_details_capacity;
1306 2272480 : bctl->recv_final_op = 1;
1307 2272480 : stream_op.recv_trailing_metadata =
1308 2272480 : &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1309 2272480 : break;
1310 : case GRPC_OP_RECV_CLOSE_ON_SERVER:
1311 : /* Flag validation: currently allow no flags */
1312 2170703 : if (op->flags != 0) {
1313 1 : error = GRPC_CALL_ERROR_INVALID_FLAGS;
1314 1 : goto done_with_error;
1315 : }
1316 2170702 : if (call->is_client) {
1317 1 : error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
1318 1 : goto done_with_error;
1319 : }
1320 2170701 : if (call->received_final_op) {
1321 1 : error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
1322 1 : goto done_with_error;
1323 : }
1324 2170700 : call->received_final_op = 1;
1325 2170700 : call->final_op.server.cancelled =
1326 2170700 : op->data.recv_close_on_server.cancelled;
1327 2170700 : bctl->recv_final_op = 1;
1328 2170700 : stream_op.recv_trailing_metadata =
1329 2170700 : &call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
1330 2170700 : break;
1331 : }
1332 : }
1333 :
1334 12334952 : GRPC_CALL_INTERNAL_REF(call, "completion");
1335 12371871 : if (!is_notify_tag_closure) {
1336 8560236 : grpc_cq_begin_op(call->cq);
1337 : }
1338 12375386 : gpr_ref_init(&bctl->steps_to_complete, num_completion_callbacks_needed);
1339 :
1340 12377238 : stream_op.context = call->context;
1341 12377238 : grpc_closure_init(&bctl->finish_batch, finish_batch, bctl);
1342 12375770 : stream_op.on_complete = &bctl->finish_batch;
1343 12375770 : gpr_mu_unlock(&call->mu);
1344 :
1345 12377480 : execute_op(exec_ctx, call, &stream_op);
1346 :
1347 : done:
1348 : GPR_TIMER_END("grpc_call_start_batch", 0);
1349 12377993 : return error;
1350 :
1351 : done_with_error:
1352 : /* reverse any mutations that occured */
1353 0 : if (bctl->send_initial_metadata) {
1354 90 : call->sent_initial_metadata = 0;
1355 90 : grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
1356 : }
1357 128 : if (bctl->send_message) {
1358 67 : call->sending_message = 0;
1359 67 : grpc_byte_stream_destroy(&call->sending_stream.base);
1360 : }
1361 128 : if (bctl->send_final_op) {
1362 45 : call->sent_final_op = 0;
1363 45 : grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
1364 : }
1365 128 : if (bctl->recv_initial_metadata) {
1366 22 : call->received_initial_metadata = 0;
1367 : }
1368 128 : if (bctl->recv_message) {
1369 1 : call->receiving_message = 0;
1370 : }
1371 128 : if (bctl->recv_final_op) {
1372 1 : call->received_final_op = 0;
1373 : }
1374 128 : gpr_mu_unlock(&call->mu);
1375 128 : goto done;
1376 : }
1377 :
1378 8546790 : grpc_call_error grpc_call_start_batch(grpc_call *call, const grpc_op *ops,
1379 : size_t nops, void *tag, void *reserved) {
1380 8546790 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1381 : grpc_call_error err;
1382 :
1383 8546790 : GRPC_API_TRACE(
1384 : "grpc_call_start_batch(call=%p, ops=%p, nops=%lu, tag=%p, reserved=%p)",
1385 : 5, (call, ops, (unsigned long)nops, tag, reserved));
1386 :
1387 8552311 : if (reserved != NULL) {
1388 1 : err = GRPC_CALL_ERROR;
1389 : } else {
1390 8552310 : err = call_start_batch(&exec_ctx, call, ops, nops, tag, 0);
1391 : }
1392 :
1393 8564167 : grpc_exec_ctx_finish(&exec_ctx);
1394 8553043 : return err;
1395 : }
1396 :
1397 4341703 : grpc_call_error grpc_call_start_batch_and_execute(grpc_exec_ctx *exec_ctx,
1398 : grpc_call *call,
1399 : const grpc_op *ops,
1400 : size_t nops,
1401 : grpc_closure *closure) {
1402 4341703 : return call_start_batch(exec_ctx, call, ops, nops, closure, 1);
1403 : }
1404 :
1405 519 : void grpc_call_context_set(grpc_call *call, grpc_context_index elem,
1406 : void *value, void (*destroy)(void *value)) {
1407 519 : if (call->context[elem].destroy) {
1408 0 : call->context[elem].destroy(call->context[elem].value);
1409 : }
1410 519 : call->context[elem].value = value;
1411 519 : call->context[elem].destroy = destroy;
1412 519 : }
1413 :
1414 1646764 : void *grpc_call_context_get(grpc_call *call, grpc_context_index elem) {
1415 1646764 : return call->context[elem].value;
1416 : }
1417 :
1418 47461 : gpr_uint8 grpc_call_is_client(grpc_call *call) { return call->is_client; }
|