Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/surface/server.h"
35 :
36 : #include <limits.h>
37 : #include <stdlib.h>
38 : #include <string.h>
39 :
40 : #include <grpc/support/alloc.h>
41 : #include <grpc/support/log.h>
42 : #include <grpc/support/string_util.h>
43 : #include <grpc/support/useful.h>
44 :
45 : #include "src/core/census/grpc_filter.h"
46 : #include "src/core/channel/channel_args.h"
47 : #include "src/core/channel/connected_channel.h"
48 : #include "src/core/iomgr/iomgr.h"
49 : #include "src/core/support/stack_lockfree.h"
50 : #include "src/core/support/string.h"
51 : #include "src/core/surface/api_trace.h"
52 : #include "src/core/surface/call.h"
53 : #include "src/core/surface/channel.h"
54 : #include "src/core/surface/completion_queue.h"
55 : #include "src/core/surface/init.h"
56 : #include "src/core/transport/metadata.h"
57 : #include "src/core/transport/static_metadata.h"
58 :
59 : typedef struct listener {
60 : void *arg;
61 : void (*start)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
62 : grpc_pollset **pollsets, size_t pollset_count);
63 : void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
64 : grpc_closure *closure);
65 : struct listener *next;
66 : grpc_closure destroy_done;
67 : } listener;
68 :
69 : typedef struct call_data call_data;
70 : typedef struct channel_data channel_data;
71 : typedef struct registered_method registered_method;
72 :
73 : typedef struct {
74 : call_data *next;
75 : call_data *prev;
76 : } call_link;
77 :
78 : typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
79 :
80 : typedef struct requested_call {
81 : requested_call_type type;
82 : void *tag;
83 : grpc_server *server;
84 : grpc_completion_queue *cq_bound_to_call;
85 : grpc_completion_queue *cq_for_notification;
86 : grpc_call **call;
87 : grpc_cq_completion completion;
88 : grpc_metadata_array *initial_metadata;
89 : union {
90 : struct {
91 : grpc_call_details *details;
92 : } batch;
93 : struct {
94 : registered_method *registered_method;
95 : gpr_timespec *deadline;
96 : grpc_byte_buffer **optional_payload;
97 : } registered;
98 : } data;
99 : grpc_closure publish;
100 : } requested_call;
101 :
102 : typedef struct channel_registered_method {
103 : registered_method *server_registered_method;
104 : grpc_mdstr *method;
105 : grpc_mdstr *host;
106 : } channel_registered_method;
107 :
108 : struct channel_data {
109 : grpc_server *server;
110 : grpc_connectivity_state connectivity_state;
111 : grpc_channel *channel;
112 : /* linked list of all channels on a server */
113 : channel_data *next;
114 : channel_data *prev;
115 : channel_registered_method *registered_methods;
116 : gpr_uint32 registered_method_slots;
117 : gpr_uint32 registered_method_max_probes;
118 : grpc_closure finish_destroy_channel_closure;
119 : grpc_closure channel_connectivity_changed;
120 : };
121 :
122 : typedef struct shutdown_tag {
123 : void *tag;
124 : grpc_completion_queue *cq;
125 : grpc_cq_completion completion;
126 : } shutdown_tag;
127 :
128 : typedef enum {
129 : /* waiting for metadata */
130 : NOT_STARTED,
131 : /* inital metadata read, not flow controlled in yet */
132 : PENDING,
133 : /* flow controlled in, on completion queue */
134 : ACTIVATED,
135 : /* cancelled before being queued */
136 : ZOMBIED
137 : } call_state;
138 :
139 : typedef struct request_matcher request_matcher;
140 :
141 : struct call_data {
142 : grpc_call *call;
143 :
144 : /** protects state */
145 : gpr_mu mu_state;
146 : /** the current state of a call - see call_state */
147 : call_state state;
148 :
149 : grpc_mdstr *path;
150 : grpc_mdstr *host;
151 : gpr_timespec deadline;
152 :
153 : grpc_completion_queue *cq_new;
154 :
155 : grpc_metadata_batch *recv_initial_metadata;
156 : grpc_metadata_array initial_metadata;
157 :
158 : grpc_closure got_initial_metadata;
159 : grpc_closure server_on_recv_initial_metadata;
160 : grpc_closure kill_zombie_closure;
161 : grpc_closure *on_done_recv_initial_metadata;
162 :
163 : call_data *pending_next;
164 : };
165 :
166 : struct request_matcher {
167 : call_data *pending_head;
168 : call_data *pending_tail;
169 : gpr_stack_lockfree *requests;
170 : };
171 :
172 : struct registered_method {
173 : char *method;
174 : char *host;
175 : request_matcher request_matcher;
176 : registered_method *next;
177 : };
178 :
179 : typedef struct {
180 : grpc_channel **channels;
181 : size_t num_channels;
182 : } channel_broadcaster;
183 :
184 : struct grpc_server {
185 : size_t channel_filter_count;
186 : grpc_channel_filter const **channel_filters;
187 : grpc_channel_args *channel_args;
188 :
189 : grpc_completion_queue **cqs;
190 : grpc_pollset **pollsets;
191 : size_t cq_count;
192 :
193 : /* The two following mutexes control access to server-state
194 : mu_global controls access to non-call-related state (e.g., channel state)
195 : mu_call controls access to call-related state (e.g., the call lists)
196 :
197 : If they are ever required to be nested, you must lock mu_global
198 : before mu_call. This is currently used in shutdown processing
199 : (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
200 : gpr_mu mu_global; /* mutex for server and channel state */
201 : gpr_mu mu_call; /* mutex for call-specific state */
202 :
203 : registered_method *registered_methods;
204 : request_matcher unregistered_request_matcher;
205 : /** free list of available requested_calls indices */
206 : gpr_stack_lockfree *request_freelist;
207 : /** requested call backing data */
208 : requested_call *requested_calls;
209 : size_t max_requested_calls;
210 :
211 : gpr_atm shutdown_flag;
212 : gpr_uint8 shutdown_published;
213 : size_t num_shutdown_tags;
214 : shutdown_tag *shutdown_tags;
215 :
216 : channel_data root_channel_data;
217 :
218 : listener *listeners;
219 : int listeners_destroyed;
220 : gpr_refcount internal_refcount;
221 :
222 : /** when did we print the last shutdown progress message */
223 : gpr_timespec last_shutdown_message_time;
224 : };
225 :
226 : #define SERVER_FROM_CALL_ELEM(elem) \
227 : (((channel_data *)(elem)->channel_data)->server)
228 :
229 : static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
230 : call_data *calld, requested_call *rc);
231 : static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
232 : requested_call *rc);
233 : /* Before calling maybe_finish_shutdown, we must hold mu_global and not
234 : hold mu_call */
235 : static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server);
236 :
237 : /*
238 : * channel broadcaster
239 : */
240 :
241 : /* assumes server locked */
242 3600 : static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
243 : channel_data *c;
244 3512 : size_t count = 0;
245 6342 : for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
246 2742 : count++;
247 : }
248 3600 : cb->num_channels = count;
249 3600 : cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
250 3512 : count = 0;
251 6342 : for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
252 2742 : cb->channels[count++] = c->channel;
253 2742 : GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
254 : }
255 3600 : }
256 :
257 : struct shutdown_cleanup_args {
258 : grpc_closure closure;
259 : gpr_slice slice;
260 : };
261 :
262 2742 : static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
263 : int iomgr_status_ignored) {
264 2701 : struct shutdown_cleanup_args *a = arg;
265 2742 : gpr_slice_unref(a->slice);
266 2742 : gpr_free(a);
267 2742 : }
268 :
269 2742 : static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
270 : int send_goaway, int send_disconnect) {
271 : grpc_transport_op op;
272 : struct shutdown_cleanup_args *sc;
273 : grpc_channel_element *elem;
274 :
275 2742 : memset(&op, 0, sizeof(op));
276 2742 : op.send_goaway = send_goaway;
277 2742 : sc = gpr_malloc(sizeof(*sc));
278 2742 : sc->slice = gpr_slice_from_copied_string("Server shutdown");
279 2742 : op.goaway_message = &sc->slice;
280 2742 : op.goaway_status = GRPC_STATUS_OK;
281 2742 : op.disconnect = send_disconnect;
282 2742 : grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
283 2742 : op.on_consumed = &sc->closure;
284 :
285 2742 : elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
286 2742 : elem->filter->start_transport_op(exec_ctx, elem, &op);
287 2742 : }
288 :
289 3600 : static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
290 : channel_broadcaster *cb,
291 : int send_goaway,
292 : int force_disconnect) {
293 : size_t i;
294 :
295 6342 : for (i = 0; i < cb->num_channels; i++) {
296 2742 : send_shutdown(exec_ctx, cb->channels[i], send_goaway, force_disconnect);
297 2742 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, cb->channels[i], "broadcast");
298 : }
299 3600 : gpr_free(cb->channels);
300 3600 : }
301 :
302 : /*
303 : * request_matcher
304 : */
305 :
306 4740 : static void request_matcher_init(request_matcher *rm, size_t entries) {
307 4740 : memset(rm, 0, sizeof(*rm));
308 4740 : rm->requests = gpr_stack_lockfree_create(entries);
309 4740 : }
310 :
311 4700 : static void request_matcher_destroy(request_matcher *rm) {
312 4700 : GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests) == -1);
313 4700 : gpr_stack_lockfree_destroy(rm->requests);
314 4700 : }
315 :
316 194 : static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, int success) {
317 194 : grpc_call_destroy(grpc_call_from_top_element(elem));
318 194 : }
319 :
320 19775 : static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
321 : request_matcher *rm) {
322 39536 : while (rm->pending_head) {
323 139 : call_data *calld = rm->pending_head;
324 141 : rm->pending_head = calld->pending_next;
325 141 : gpr_mu_lock(&calld->mu_state);
326 141 : calld->state = ZOMBIED;
327 141 : gpr_mu_unlock(&calld->mu_state);
328 141 : grpc_closure_init(
329 : &calld->kill_zombie_closure, kill_zombie,
330 141 : grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
331 141 : grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
332 : }
333 19775 : }
334 :
335 19775 : static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
336 : grpc_server *server,
337 : request_matcher *rm) {
338 : int request_id;
339 100974 : while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
340 61579 : fail_call(exec_ctx, server, &server->requested_calls[request_id]);
341 : }
342 19775 : }
343 :
344 : /*
345 : * server proper
346 : */
347 :
348 4422992 : static void server_ref(grpc_server *server) {
349 4423314 : gpr_ref(&server->internal_refcount);
350 4423821 : }
351 :
352 3475 : static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
353 : registered_method *rm;
354 : size_t i;
355 3475 : grpc_channel_args_destroy(server->channel_args);
356 3475 : gpr_mu_destroy(&server->mu_global);
357 3475 : gpr_mu_destroy(&server->mu_call);
358 3475 : gpr_free((void *)server->channel_filters);
359 8166 : while ((rm = server->registered_methods) != NULL) {
360 1225 : server->registered_methods = rm->next;
361 1225 : request_matcher_destroy(&rm->request_matcher);
362 1225 : gpr_free(rm->method);
363 1225 : gpr_free(rm->host);
364 1225 : gpr_free(rm);
365 : }
366 6990 : for (i = 0; i < server->cq_count; i++) {
367 3524 : GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
368 : }
369 3475 : request_matcher_destroy(&server->unregistered_request_matcher);
370 3475 : gpr_stack_lockfree_destroy(server->request_freelist);
371 3475 : gpr_free(server->cqs);
372 3475 : gpr_free(server->pollsets);
373 3475 : gpr_free(server->shutdown_tags);
374 3475 : gpr_free(server->requested_calls);
375 3475 : gpr_free(server);
376 3475 : }
377 :
378 4425576 : static void server_unref(grpc_exec_ctx *exec_ctx, grpc_server *server) {
379 4425576 : if (gpr_unref(&server->internal_refcount)) {
380 3475 : server_delete(exec_ctx, server);
381 : }
382 4428232 : }
383 :
384 3015 : static int is_channel_orphaned(channel_data *chand) {
385 3056 : return chand->next == chand;
386 : }
387 :
388 3015 : static void orphan_channel(channel_data *chand) {
389 3056 : chand->next->prev = chand->prev;
390 3056 : chand->prev->next = chand->next;
391 3056 : chand->next = chand->prev = chand;
392 3015 : }
393 :
394 3056 : static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd,
395 : int success) {
396 3015 : channel_data *chand = cd;
397 3056 : grpc_server *server = chand->server;
398 3056 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server");
399 3056 : server_unref(exec_ctx, server);
400 3056 : }
401 :
402 3056 : static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
403 6112 : if (is_channel_orphaned(chand)) return;
404 3056 : GPR_ASSERT(chand->server != NULL);
405 3015 : orphan_channel(chand);
406 3015 : server_ref(chand->server);
407 3056 : maybe_finish_shutdown(exec_ctx, chand->server);
408 3056 : chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
409 3056 : chand->finish_destroy_channel_closure.cb_arg = chand;
410 3056 : grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, 1);
411 : }
412 :
413 2170882 : static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
414 : grpc_call_element *elem, request_matcher *rm) {
415 2170882 : call_data *calld = elem->call_data;
416 : int request_id;
417 :
418 2170882 : if (gpr_atm_acq_load(&server->shutdown_flag)) {
419 0 : gpr_mu_lock(&calld->mu_state);
420 0 : calld->state = ZOMBIED;
421 0 : gpr_mu_unlock(&calld->mu_state);
422 0 : grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
423 0 : grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
424 2171077 : return;
425 : }
426 :
427 2170882 : request_id = gpr_stack_lockfree_pop(rm->requests);
428 2171093 : if (request_id == -1) {
429 104788 : gpr_mu_lock(&server->mu_call);
430 104788 : gpr_mu_lock(&calld->mu_state);
431 104788 : calld->state = PENDING;
432 104788 : gpr_mu_unlock(&calld->mu_state);
433 104788 : if (rm->pending_head == NULL) {
434 727 : rm->pending_tail = rm->pending_head = calld;
435 : } else {
436 104061 : rm->pending_tail->pending_next = calld;
437 104061 : rm->pending_tail = calld;
438 : }
439 104788 : calld->pending_next = NULL;
440 104788 : gpr_mu_unlock(&server->mu_call);
441 : } else {
442 2066305 : gpr_mu_lock(&calld->mu_state);
443 2066313 : calld->state = ACTIVATED;
444 2066313 : gpr_mu_unlock(&calld->mu_state);
445 2066311 : begin_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
446 : }
447 : }
448 :
449 2170928 : static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
450 2170928 : channel_data *chand = elem->channel_data;
451 2170928 : call_data *calld = elem->call_data;
452 2170928 : grpc_server *server = chand->server;
453 : gpr_uint32 i;
454 : gpr_uint32 hash;
455 : channel_registered_method *rm;
456 :
457 2170928 : if (chand->registered_methods && calld->path && calld->host) {
458 : /* TODO(ctiller): unify these two searches */
459 : /* check for an exact match with host */
460 1646572 : hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
461 3294645 : for (i = 0; i <= chand->registered_method_max_probes; i++) {
462 3295954 : rm = &chand->registered_methods[(hash + i) %
463 1647977 : chand->registered_method_slots];
464 1647977 : if (!rm) break;
465 1648074 : if (rm->host != calld->host) continue;
466 1 : if (rm->method != calld->path) continue;
467 1 : finish_start_new_rpc(exec_ctx, server, elem,
468 1 : &rm->server_registered_method->request_matcher);
469 1 : return;
470 : }
471 : /* check for a wildcard method definition (no host set) */
472 1646571 : hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
473 1646959 : for (i = 0; i <= chand->registered_method_max_probes; i++) {
474 3293932 : rm = &chand->registered_methods[(hash + i) %
475 1646966 : chand->registered_method_slots];
476 1646966 : if (!rm) break;
477 1646979 : if (rm->host != NULL) continue;
478 1646849 : if (rm->method != calld->path) continue;
479 1646591 : finish_start_new_rpc(exec_ctx, server, elem,
480 1646591 : &rm->server_registered_method->request_matcher);
481 1646757 : return;
482 : }
483 : }
484 524336 : finish_start_new_rpc(exec_ctx, server, elem,
485 : &server->unregistered_request_matcher);
486 : }
487 :
488 11403 : static int num_listeners(grpc_server *server) {
489 : listener *l;
490 11403 : int n = 0;
491 21372 : for (l = server->listeners; l; l = l->next) {
492 9969 : n++;
493 : }
494 11486 : return n;
495 : }
496 :
497 3514 : static void done_shutdown_event(grpc_exec_ctx *exec_ctx, void *server,
498 : grpc_cq_completion *completion) {
499 3514 : server_unref(exec_ctx, server);
500 3514 : }
501 :
502 0 : static int num_channels(grpc_server *server) {
503 : channel_data *chand;
504 0 : int n = 0;
505 0 : for (chand = server->root_channel_data.next;
506 0 : chand != &server->root_channel_data; chand = chand->next) {
507 0 : n++;
508 : }
509 0 : return n;
510 : }
511 :
512 14377 : static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx,
513 : grpc_server *server) {
514 : registered_method *rm;
515 14377 : request_matcher_kill_requests(exec_ctx, server,
516 : &server->unregistered_request_matcher);
517 14377 : request_matcher_zombify_all_pending_calls(
518 : exec_ctx, &server->unregistered_request_matcher);
519 19775 : for (rm = server->registered_methods; rm; rm = rm->next) {
520 5398 : request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher);
521 5398 : request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher);
522 : }
523 14377 : }
524 :
525 12346 : static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
526 : grpc_server *server) {
527 : size_t i;
528 12346 : if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
529 1474 : return;
530 : }
531 :
532 10863 : kill_pending_work_locked(exec_ctx, server);
533 :
534 18868 : if (server->root_channel_data.next != &server->root_channel_data ||
535 8005 : server->listeners_destroyed < num_listeners(server)) {
536 7349 : if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
537 : server->last_shutdown_message_time),
538 : gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
539 0 : server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
540 0 : gpr_log(GPR_DEBUG,
541 : "Waiting for %d channels and %d/%d listeners to be destroyed"
542 : " before shutting down server",
543 : num_channels(server),
544 0 : num_listeners(server) - server->listeners_destroyed,
545 : num_listeners(server));
546 : }
547 7274 : return;
548 : }
549 3514 : server->shutdown_published = 1;
550 7028 : for (i = 0; i < server->num_shutdown_tags; i++) {
551 3474 : server_ref(server);
552 6988 : grpc_cq_end_op(exec_ctx, server->shutdown_tags[i].cq,
553 3474 : server->shutdown_tags[i].tag, 1, done_shutdown_event, server,
554 3514 : &server->shutdown_tags[i].completion);
555 : }
556 : }
557 :
558 12417741 : static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
559 12417270 : grpc_call_element *elem = user_data;
560 12417741 : call_data *calld = elem->call_data;
561 12417741 : if (md->key == GRPC_MDSTR_PATH) {
562 2170690 : calld->path = GRPC_MDSTR_REF(md->value);
563 2171098 : return NULL;
564 10247051 : } else if (md->key == GRPC_MDSTR_AUTHORITY) {
565 2171063 : calld->host = GRPC_MDSTR_REF(md->value);
566 2171080 : return NULL;
567 : }
568 8075695 : return md;
569 : }
570 :
571 2170979 : static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
572 : int success) {
573 2170890 : grpc_call_element *elem = ptr;
574 2170979 : call_data *calld = elem->call_data;
575 : gpr_timespec op_deadline;
576 :
577 2170979 : grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, elem);
578 2170839 : op_deadline = calld->recv_initial_metadata->deadline;
579 2170839 : if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) {
580 524117 : calld->deadline = op_deadline;
581 : }
582 2171153 : if (calld->host && calld->path) {
583 : /* do nothing */
584 : } else {
585 48 : success = 0;
586 : }
587 :
588 4342217 : calld->on_done_recv_initial_metadata->cb(
589 2171064 : exec_ctx, calld->on_done_recv_initial_metadata->cb_arg, success);
590 2171153 : }
591 :
592 8098548 : static void server_mutate_op(grpc_call_element *elem,
593 : grpc_transport_stream_op *op) {
594 8098548 : call_data *calld = elem->call_data;
595 :
596 8098997 : if (op->recv_initial_metadata != NULL) {
597 2171122 : calld->recv_initial_metadata = op->recv_initial_metadata;
598 2171122 : calld->on_done_recv_initial_metadata = op->on_complete;
599 2171122 : op->on_complete = &calld->server_on_recv_initial_metadata;
600 : }
601 8098548 : }
602 :
603 8099259 : static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
604 : grpc_call_element *elem,
605 : grpc_transport_stream_op *op) {
606 8099259 : GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
607 8098810 : server_mutate_op(elem, op);
608 8101097 : grpc_call_next_op(exec_ctx, elem, op);
609 8102166 : }
610 :
611 2170979 : static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
612 : int success) {
613 2170890 : grpc_call_element *elem = ptr;
614 2170979 : call_data *calld = elem->call_data;
615 2170979 : if (success) {
616 2170926 : start_new_rpc(exec_ctx, elem);
617 : } else {
618 53 : gpr_mu_lock(&calld->mu_state);
619 53 : if (calld->state == NOT_STARTED) {
620 53 : calld->state = ZOMBIED;
621 53 : gpr_mu_unlock(&calld->mu_state);
622 53 : grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
623 53 : grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
624 0 : } else if (calld->state == PENDING) {
625 0 : calld->state = ZOMBIED;
626 0 : gpr_mu_unlock(&calld->mu_state);
627 : /* zombied call will be destroyed when it's removed from the pending
628 : queue... later */
629 : } else {
630 0 : gpr_mu_unlock(&calld->mu_state);
631 : }
632 : }
633 2171079 : }
634 :
635 2171064 : static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
636 : grpc_transport *transport,
637 : const void *transport_server_data) {
638 2170975 : channel_data *chand = cd;
639 : /* create a call */
640 2171154 : grpc_call *call =
641 2171064 : grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data,
642 : NULL, 0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
643 2171139 : grpc_call_element *elem =
644 2171151 : grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
645 2171140 : call_data *calld = elem->call_data;
646 : grpc_op op;
647 2171140 : memset(&op, 0, sizeof(op));
648 2171140 : op.op = GRPC_OP_RECV_INITIAL_METADATA;
649 2171140 : op.data.recv_initial_metadata = &calld->initial_metadata;
650 2171140 : grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem);
651 2171117 : grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1,
652 : &calld->got_initial_metadata);
653 2171118 : }
654 :
655 6112 : static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
656 : int iomgr_status_ignored) {
657 6030 : channel_data *chand = cd;
658 6112 : grpc_server *server = chand->server;
659 6112 : if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
660 : grpc_transport_op op;
661 3056 : memset(&op, 0, sizeof(op));
662 3056 : op.on_connectivity_state_change = &chand->channel_connectivity_changed,
663 3056 : op.connectivity_state = &chand->connectivity_state;
664 3056 : grpc_channel_next_op(exec_ctx,
665 : grpc_channel_stack_element(
666 : grpc_channel_get_channel_stack(chand->channel), 0),
667 : &op);
668 : } else {
669 3056 : gpr_mu_lock(&server->mu_global);
670 3056 : destroy_channel(exec_ctx, chand);
671 3056 : gpr_mu_unlock(&server->mu_global);
672 3056 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "connectivity");
673 : }
674 6112 : }
675 :
676 2170899 : static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
677 : grpc_call_element_args *args) {
678 2170899 : call_data *calld = elem->call_data;
679 2170899 : channel_data *chand = elem->channel_data;
680 2170899 : memset(calld, 0, sizeof(call_data));
681 2170899 : calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
682 2171116 : calld->call = grpc_call_from_top_element(elem);
683 2171104 : gpr_mu_init(&calld->mu_state);
684 :
685 2171092 : grpc_closure_init(&calld->server_on_recv_initial_metadata,
686 : server_on_recv_initial_metadata, elem);
687 :
688 2171040 : server_ref(chand->server);
689 2171147 : }
690 :
691 2170777 : static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
692 : grpc_call_element *elem) {
693 2170777 : channel_data *chand = elem->channel_data;
694 2170777 : call_data *calld = elem->call_data;
695 :
696 2170777 : GPR_ASSERT(calld->state != PENDING);
697 :
698 2170777 : if (calld->host) {
699 2170751 : GRPC_MDSTR_UNREF(calld->host);
700 : }
701 2170963 : if (calld->path) {
702 2170915 : GRPC_MDSTR_UNREF(calld->path);
703 : }
704 2170555 : grpc_metadata_array_destroy(&calld->initial_metadata);
705 :
706 2170542 : gpr_mu_destroy(&calld->mu_state);
707 :
708 2170498 : server_unref(exec_ctx, chand->server);
709 2171016 : }
710 :
711 3056 : static void init_channel_elem(grpc_exec_ctx *exec_ctx,
712 : grpc_channel_element *elem,
713 : grpc_channel_element_args *args) {
714 3056 : channel_data *chand = elem->channel_data;
715 3056 : GPR_ASSERT(args->is_first);
716 3056 : GPR_ASSERT(!args->is_last);
717 3056 : chand->server = NULL;
718 3056 : chand->channel = NULL;
719 3056 : chand->next = chand->prev = chand;
720 3056 : chand->registered_methods = NULL;
721 3056 : chand->connectivity_state = GRPC_CHANNEL_IDLE;
722 3056 : grpc_closure_init(&chand->channel_connectivity_changed,
723 : channel_connectivity_changed, chand);
724 3056 : }
725 :
726 3024 : static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
727 : grpc_channel_element *elem) {
728 : size_t i;
729 3024 : channel_data *chand = elem->channel_data;
730 3024 : if (chand->registered_methods) {
731 2452 : for (i = 0; i < chand->registered_method_slots; i++) {
732 2296 : if (chand->registered_methods[i].method) {
733 1148 : GRPC_MDSTR_UNREF(chand->registered_methods[i].method);
734 : }
735 2296 : if (chand->registered_methods[i].host) {
736 365 : GRPC_MDSTR_UNREF(chand->registered_methods[i].host);
737 : }
738 : }
739 156 : gpr_free(chand->registered_methods);
740 : }
741 3024 : if (chand->server) {
742 3024 : gpr_mu_lock(&chand->server->mu_global);
743 3024 : chand->next->prev = chand->prev;
744 3024 : chand->prev->next = chand->next;
745 3024 : chand->next = chand->prev = chand;
746 3024 : maybe_finish_shutdown(exec_ctx, chand->server);
747 3024 : gpr_mu_unlock(&chand->server->mu_global);
748 3024 : server_unref(exec_ctx, chand->server);
749 : }
750 3024 : }
751 :
752 : static const grpc_channel_filter server_surface_filter = {
753 : server_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
754 : init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
755 : sizeof(channel_data), init_channel_elem, destroy_channel_elem,
756 : grpc_call_next_get_peer, "server",
757 : };
758 :
759 3596 : void grpc_server_register_completion_queue(grpc_server *server,
760 : grpc_completion_queue *cq,
761 : void *reserved) {
762 : size_t i, n;
763 3596 : GRPC_API_TRACE(
764 : "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
765 : (server, cq, reserved));
766 3596 : GPR_ASSERT(!reserved);
767 3623 : for (i = 0; i < server->cq_count; i++) {
768 3705 : if (server->cqs[i] == cq) return;
769 : }
770 3596 : GRPC_CQ_INTERNAL_REF(cq, "server");
771 3596 : grpc_cq_mark_server_cq(cq);
772 3596 : n = server->cq_count++;
773 3596 : server->cqs = gpr_realloc(server->cqs,
774 3514 : server->cq_count * sizeof(grpc_completion_queue *));
775 3596 : server->cqs[n] = cq;
776 : }
777 :
778 3515 : grpc_server *grpc_server_create_from_filters(
779 : const grpc_channel_filter **filters, size_t filter_count,
780 : const grpc_channel_args *args) {
781 : size_t i;
782 : /* TODO(census): restore this once we finalize census filter etc.
783 : int census_enabled = grpc_channel_args_is_census_enabled(args); */
784 3474 : int census_enabled = 0;
785 :
786 3515 : grpc_server *server = gpr_malloc(sizeof(grpc_server));
787 :
788 3515 : GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
789 :
790 3515 : memset(server, 0, sizeof(grpc_server));
791 :
792 3515 : gpr_mu_init(&server->mu_global);
793 3515 : gpr_mu_init(&server->mu_call);
794 :
795 : /* decremented by grpc_server_destroy */
796 3515 : gpr_ref_init(&server->internal_refcount, 1);
797 3515 : server->root_channel_data.next = server->root_channel_data.prev =
798 3515 : &server->root_channel_data;
799 :
800 : /* TODO(ctiller): expose a channel_arg for this */
801 3515 : server->max_requested_calls = 32768;
802 3515 : server->request_freelist =
803 3515 : gpr_stack_lockfree_create(server->max_requested_calls);
804 115183035 : for (i = 0; i < (size_t)server->max_requested_calls; i++) {
805 115179520 : gpr_stack_lockfree_push(server->request_freelist, (int)i);
806 : }
807 3515 : request_matcher_init(&server->unregistered_request_matcher,
808 : server->max_requested_calls);
809 3515 : server->requested_calls = gpr_malloc(server->max_requested_calls *
810 : sizeof(*server->requested_calls));
811 :
812 : /* Server filter stack is:
813 :
814 : server_surface_filter - for making surface API calls
815 : grpc_server_census_filter (optional) - for stats collection and tracing
816 : {passed in filter stack}
817 : grpc_connected_channel_filter - for interfacing with transports */
818 3515 : server->channel_filter_count = filter_count + 1u + (census_enabled ? 1u : 0u);
819 3515 : server->channel_filters =
820 3515 : gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
821 3515 : server->channel_filters[0] = &server_surface_filter;
822 3474 : if (census_enabled) {
823 0 : server->channel_filters[1] = &grpc_server_census_filter;
824 : }
825 6321 : for (i = 0; i < filter_count; i++) {
826 2765 : server->channel_filters[i + 1u + (census_enabled ? 1u : 0u)] = filters[i];
827 : }
828 :
829 3515 : server->channel_args = grpc_channel_args_copy(args);
830 :
831 3515 : return server;
832 : }
833 :
834 5518 : static int streq(const char *a, const char *b) {
835 5518 : if (a == NULL && b == NULL) return 1;
836 5518 : if (a == NULL) return 0;
837 5108 : if (b == NULL) return 0;
838 5108 : return 0 == strcmp(a, b);
839 : }
840 :
841 1225 : void *grpc_server_register_method(grpc_server *server, const char *method,
842 : const char *host) {
843 : registered_method *m;
844 1225 : GRPC_API_TRACE("grpc_server_register_method(server=%p, method=%s, host=%s)",
845 : 3, (server, method, host));
846 1225 : if (!method) {
847 0 : gpr_log(GPR_ERROR,
848 : "grpc_server_register_method method string cannot be NULL");
849 0 : return NULL;
850 : }
851 6333 : for (m = server->registered_methods; m; m = m->next) {
852 5108 : if (streq(m->method, method) && streq(m->host, host)) {
853 0 : gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
854 : host ? host : "*");
855 0 : return NULL;
856 : }
857 : }
858 1225 : m = gpr_malloc(sizeof(registered_method));
859 1225 : memset(m, 0, sizeof(*m));
860 1225 : request_matcher_init(&m->request_matcher, server->max_requested_calls);
861 1225 : m->method = gpr_strdup(method);
862 1225 : m->host = gpr_strdup(host);
863 1225 : m->next = server->registered_methods;
864 1225 : server->registered_methods = m;
865 1225 : return m;
866 : }
867 :
868 3505 : void grpc_server_start(grpc_server *server) {
869 : listener *l;
870 : size_t i;
871 3505 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
872 :
873 3505 : GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
874 :
875 3505 : server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
876 7082 : for (i = 0; i < server->cq_count; i++) {
877 3577 : server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
878 : }
879 :
880 6256 : for (l = server->listeners; l; l = l->next) {
881 2751 : l->start(&exec_ctx, server, l->arg, server->pollsets, server->cq_count);
882 : }
883 :
884 3505 : grpc_exec_ctx_finish(&exec_ctx);
885 3505 : }
886 :
887 3056 : void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
888 : grpc_transport *transport,
889 : grpc_channel_filter const **extra_filters,
890 : size_t num_extra_filters,
891 : const grpc_channel_args *args) {
892 3056 : size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
893 3056 : grpc_channel_filter const **filters =
894 3056 : gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
895 : size_t i;
896 : size_t num_registered_methods;
897 : size_t alloc;
898 : registered_method *rm;
899 : channel_registered_method *crm;
900 : grpc_channel *channel;
901 : channel_data *chand;
902 : grpc_mdstr *host;
903 : grpc_mdstr *method;
904 : gpr_uint32 hash;
905 : size_t slots;
906 : gpr_uint32 probes;
907 3015 : gpr_uint32 max_probes = 0;
908 : grpc_transport_op op;
909 :
910 8418 : for (i = 0; i < s->channel_filter_count; i++) {
911 5362 : filters[i] = s->channel_filters[i];
912 : }
913 6661 : for (; i < s->channel_filter_count + num_extra_filters; i++) {
914 3646 : filters[i] = extra_filters[i - s->channel_filter_count];
915 : }
916 3056 : filters[i] = &grpc_connected_channel_filter;
917 :
918 6248 : for (i = 0; i < s->cq_count; i++) {
919 3192 : memset(&op, 0, sizeof(op));
920 3192 : op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
921 3192 : grpc_transport_perform_op(exec_ctx, transport, &op);
922 : }
923 :
924 3056 : channel = grpc_channel_create_from_filters(exec_ctx, NULL, filters,
925 : num_filters, args, 0);
926 3056 : chand = (channel_data *)grpc_channel_stack_element(
927 : grpc_channel_get_channel_stack(channel), 0)->channel_data;
928 3056 : chand->server = s;
929 3015 : server_ref(s);
930 3056 : chand->channel = channel;
931 :
932 3015 : num_registered_methods = 0;
933 4204 : for (rm = s->registered_methods; rm; rm = rm->next) {
934 1148 : num_registered_methods++;
935 : }
936 : /* build a lookup table phrased in terms of mdstr's in this channels context
937 : to quickly find registered methods */
938 3056 : if (num_registered_methods > 0) {
939 156 : slots = 2 * num_registered_methods;
940 156 : alloc = sizeof(channel_registered_method) * slots;
941 156 : chand->registered_methods = gpr_malloc(alloc);
942 156 : memset(chand->registered_methods, 0, alloc);
943 1304 : for (rm = s->registered_methods; rm; rm = rm->next) {
944 1148 : host = rm->host ? grpc_mdstr_from_string(rm->host) : NULL;
945 1148 : method = grpc_mdstr_from_string(rm->method);
946 1148 : hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
947 4188 : for (probes = 0; chand->registered_methods[(hash + probes) % slots]
948 1520 : .server_registered_method != NULL;
949 372 : probes++)
950 : ;
951 1148 : if (probes > max_probes) max_probes = probes;
952 1148 : crm = &chand->registered_methods[(hash + probes) % slots];
953 1148 : crm->server_registered_method = rm;
954 1148 : crm->host = host;
955 1148 : crm->method = method;
956 : }
957 156 : GPR_ASSERT(slots <= GPR_UINT32_MAX);
958 156 : chand->registered_method_slots = (gpr_uint32)slots;
959 156 : chand->registered_method_max_probes = max_probes;
960 : }
961 :
962 3056 : grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
963 : transport);
964 :
965 3056 : gpr_mu_lock(&s->mu_global);
966 3056 : chand->next = &s->root_channel_data;
967 3056 : chand->prev = chand->next->prev;
968 3056 : chand->next->prev = chand->prev->next = chand;
969 3056 : gpr_mu_unlock(&s->mu_global);
970 :
971 3056 : gpr_free((void *)filters);
972 :
973 3056 : GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
974 3056 : memset(&op, 0, sizeof(op));
975 3056 : op.set_accept_stream = accept_stream;
976 3056 : op.set_accept_stream_user_data = chand;
977 3056 : op.on_connectivity_state_change = &chand->channel_connectivity_changed;
978 3056 : op.connectivity_state = &chand->connectivity_state;
979 3056 : op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0;
980 3056 : grpc_transport_perform_op(exec_ctx, transport, &op);
981 3056 : }
982 :
983 13 : void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,
984 : grpc_cq_completion *storage) {
985 : (void)done_arg;
986 13 : gpr_free(storage);
987 13 : }
988 :
989 2752 : static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s,
990 : int success) {
991 2724 : grpc_server *server = s;
992 2752 : gpr_mu_lock(&server->mu_global);
993 2752 : server->listeners_destroyed++;
994 2752 : maybe_finish_shutdown(exec_ctx, server);
995 2752 : gpr_mu_unlock(&server->mu_global);
996 2752 : }
997 :
998 3527 : void grpc_server_shutdown_and_notify(grpc_server *server,
999 : grpc_completion_queue *cq, void *tag) {
1000 : listener *l;
1001 : shutdown_tag *sdt;
1002 : channel_broadcaster broadcaster;
1003 3527 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1004 :
1005 3527 : GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
1006 : (server, cq, tag));
1007 :
1008 : /* lock, and gather up some stuff to do */
1009 3527 : gpr_mu_lock(&server->mu_global);
1010 3527 : grpc_cq_begin_op(cq);
1011 3527 : if (server->shutdown_published) {
1012 13 : grpc_cq_end_op(&exec_ctx, cq, tag, 1, done_published_shutdown, NULL,
1013 13 : gpr_malloc(sizeof(grpc_cq_completion)));
1014 13 : gpr_mu_unlock(&server->mu_global);
1015 13 : goto done;
1016 : }
1017 3514 : server->shutdown_tags =
1018 3514 : gpr_realloc(server->shutdown_tags,
1019 3514 : sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
1020 3514 : sdt = &server->shutdown_tags[server->num_shutdown_tags++];
1021 3514 : sdt->tag = tag;
1022 3514 : sdt->cq = cq;
1023 3514 : if (gpr_atm_acq_load(&server->shutdown_flag)) {
1024 0 : gpr_mu_unlock(&server->mu_global);
1025 0 : goto done;
1026 : }
1027 :
1028 3514 : server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
1029 :
1030 3514 : channel_broadcaster_init(server, &broadcaster);
1031 :
1032 3514 : gpr_atm_rel_store(&server->shutdown_flag, 1);
1033 :
1034 : /* collect all unregistered then registered calls */
1035 3514 : gpr_mu_lock(&server->mu_call);
1036 3514 : kill_pending_work_locked(&exec_ctx, server);
1037 3514 : gpr_mu_unlock(&server->mu_call);
1038 :
1039 3514 : maybe_finish_shutdown(&exec_ctx, server);
1040 3514 : gpr_mu_unlock(&server->mu_global);
1041 :
1042 : /* Shutdown listeners */
1043 6266 : for (l = server->listeners; l; l = l->next) {
1044 2752 : grpc_closure_init(&l->destroy_done, listener_destroy_done, server);
1045 2752 : l->destroy(&exec_ctx, server, l->arg, &l->destroy_done);
1046 : }
1047 :
1048 3514 : channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 1, 0);
1049 :
1050 : done:
1051 3527 : grpc_exec_ctx_finish(&exec_ctx);
1052 3527 : }
1053 :
1054 86 : void grpc_server_cancel_all_calls(grpc_server *server) {
1055 : channel_broadcaster broadcaster;
1056 86 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1057 :
1058 86 : GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
1059 :
1060 86 : gpr_mu_lock(&server->mu_global);
1061 86 : channel_broadcaster_init(server, &broadcaster);
1062 86 : gpr_mu_unlock(&server->mu_global);
1063 :
1064 86 : channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 0, 1);
1065 86 : grpc_exec_ctx_finish(&exec_ctx);
1066 86 : }
1067 :
1068 3481 : void grpc_server_destroy(grpc_server *server) {
1069 : listener *l;
1070 3481 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1071 :
1072 3481 : GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
1073 :
1074 3481 : gpr_mu_lock(&server->mu_global);
1075 3481 : GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
1076 3490 : GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
1077 :
1078 9679 : while (server->listeners) {
1079 2722 : l = server->listeners;
1080 2726 : server->listeners = l->next;
1081 2726 : gpr_free(l);
1082 : }
1083 :
1084 3481 : gpr_mu_unlock(&server->mu_global);
1085 :
1086 3481 : server_unref(&exec_ctx, server);
1087 3481 : grpc_exec_ctx_finish(&exec_ctx);
1088 3481 : }
1089 :
1090 2754 : void grpc_server_add_listener(
1091 : grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
1092 : void (*start)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
1093 : grpc_pollset **pollsets, size_t pollset_count),
1094 : void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
1095 : grpc_closure *on_done)) {
1096 2754 : listener *l = gpr_malloc(sizeof(listener));
1097 2754 : l->arg = arg;
1098 2754 : l->start = start;
1099 2754 : l->destroy = destroy;
1100 2754 : l->next = server->listeners;
1101 2754 : server->listeners = l;
1102 2754 : }
1103 :
1104 2244070 : static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
1105 : grpc_server *server,
1106 : requested_call *rc) {
1107 2243959 : call_data *calld = NULL;
1108 2243959 : request_matcher *rm = NULL;
1109 : int request_id;
1110 2244070 : if (gpr_atm_acq_load(&server->shutdown_flag)) {
1111 11593 : fail_call(exec_ctx, server, rc);
1112 11599 : return GRPC_CALL_OK;
1113 : }
1114 2232477 : request_id = gpr_stack_lockfree_pop(server->request_freelist);
1115 2232536 : if (request_id == -1) {
1116 : /* out of request ids: just fail this one */
1117 0 : fail_call(exec_ctx, server, rc);
1118 0 : return GRPC_CALL_OK;
1119 : }
1120 2232536 : switch (rc->type) {
1121 : case BATCH_CALL:
1122 524670 : rm = &server->unregistered_request_matcher;
1123 524670 : break;
1124 : case REGISTERED_CALL:
1125 1707866 : rm = &rc->data.registered.registered_method->request_matcher;
1126 1707866 : break;
1127 : }
1128 2232536 : server->requested_calls[request_id] = *rc;
1129 2232536 : gpr_free(rc);
1130 2232529 : if (gpr_stack_lockfree_push(rm->requests, request_id)) {
1131 : /* this was the first queued request: we need to lock and start
1132 : matching calls */
1133 740163 : gpr_mu_lock(&server->mu_call);
1134 1584973 : while ((calld = rm->pending_head) != NULL) {
1135 208704 : request_id = gpr_stack_lockfree_pop(rm->requests);
1136 208704 : if (request_id == -1) break;
1137 104647 : rm->pending_head = calld->pending_next;
1138 104647 : gpr_mu_unlock(&server->mu_call);
1139 104647 : gpr_mu_lock(&calld->mu_state);
1140 104647 : if (calld->state == ZOMBIED) {
1141 0 : gpr_mu_unlock(&calld->mu_state);
1142 0 : grpc_closure_init(
1143 : &calld->kill_zombie_closure, kill_zombie,
1144 0 : grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
1145 0 : grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
1146 : } else {
1147 104647 : GPR_ASSERT(calld->state == PENDING);
1148 104647 : calld->state = ACTIVATED;
1149 104647 : gpr_mu_unlock(&calld->mu_state);
1150 209294 : begin_call(exec_ctx, server, calld,
1151 209294 : &server->requested_calls[request_id]);
1152 : }
1153 104647 : gpr_mu_lock(&server->mu_call);
1154 : }
1155 740163 : gpr_mu_unlock(&server->mu_call);
1156 : }
1157 2232425 : return GRPC_CALL_OK;
1158 : }
1159 :
1160 524670 : grpc_call_error grpc_server_request_call(
1161 : grpc_server *server, grpc_call **call, grpc_call_details *details,
1162 : grpc_metadata_array *initial_metadata,
1163 : grpc_completion_queue *cq_bound_to_call,
1164 : grpc_completion_queue *cq_for_notification, void *tag) {
1165 : grpc_call_error error;
1166 524670 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1167 524670 : requested_call *rc = gpr_malloc(sizeof(*rc));
1168 524670 : GRPC_API_TRACE(
1169 : "grpc_server_request_call("
1170 : "server=%p, call=%p, details=%p, initial_metadata=%p, "
1171 : "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
1172 : 7, (server, call, details, initial_metadata, cq_bound_to_call,
1173 : cq_for_notification, tag));
1174 524670 : if (!grpc_cq_is_server_cq(cq_for_notification)) {
1175 0 : gpr_free(rc);
1176 0 : error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1177 0 : goto done;
1178 : }
1179 524670 : grpc_cq_begin_op(cq_for_notification);
1180 524670 : details->reserved = NULL;
1181 524670 : rc->type = BATCH_CALL;
1182 524670 : rc->server = server;
1183 524670 : rc->tag = tag;
1184 524670 : rc->cq_bound_to_call = cq_bound_to_call;
1185 524670 : rc->cq_for_notification = cq_for_notification;
1186 524670 : rc->call = call;
1187 524670 : rc->data.batch.details = details;
1188 524670 : rc->initial_metadata = initial_metadata;
1189 524670 : error = queue_call_request(&exec_ctx, server, rc);
1190 : done:
1191 524670 : grpc_exec_ctx_finish(&exec_ctx);
1192 524670 : return error;
1193 : }
1194 :
1195 1718749 : grpc_call_error grpc_server_request_registered_call(
1196 : grpc_server *server, void *rmp, grpc_call **call, gpr_timespec *deadline,
1197 : grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
1198 : grpc_completion_queue *cq_bound_to_call,
1199 : grpc_completion_queue *cq_for_notification, void *tag) {
1200 : grpc_call_error error;
1201 1718749 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
1202 1718749 : requested_call *rc = gpr_malloc(sizeof(*rc));
1203 1719278 : registered_method *rm = rmp;
1204 1719278 : GRPC_API_TRACE(
1205 : "grpc_server_request_registered_call("
1206 : "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
1207 : "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
1208 : "tag=%p)",
1209 : 9, (server, rmp, call, deadline, initial_metadata, optional_payload,
1210 : cq_bound_to_call, cq_for_notification, tag));
1211 1719278 : if (!grpc_cq_is_server_cq(cq_for_notification)) {
1212 0 : gpr_free(rc);
1213 0 : error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
1214 0 : goto done;
1215 : }
1216 1719250 : grpc_cq_begin_op(cq_for_notification);
1217 1719404 : rc->type = REGISTERED_CALL;
1218 1719404 : rc->server = server;
1219 1719404 : rc->tag = tag;
1220 1719404 : rc->cq_bound_to_call = cq_bound_to_call;
1221 1719404 : rc->cq_for_notification = cq_for_notification;
1222 1719404 : rc->call = call;
1223 1719404 : rc->data.registered.registered_method = rm;
1224 1719404 : rc->data.registered.deadline = deadline;
1225 1719404 : rc->initial_metadata = initial_metadata;
1226 1719404 : rc->data.registered.optional_payload = optional_payload;
1227 1719404 : error = queue_call_request(&exec_ctx, server, rc);
1228 : done:
1229 1719465 : grpc_exec_ctx_finish(&exec_ctx);
1230 1719457 : return error;
1231 : }
1232 :
1233 : static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
1234 : void *user_data, int success);
1235 :
1236 1048354 : static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
1237 1048354 : gpr_slice slice = value->slice;
1238 1048354 : size_t len = GPR_SLICE_LENGTH(slice);
1239 :
1240 1048354 : if (len + 1 > *capacity) {
1241 1048055 : *capacity = GPR_MAX(len + 1, *capacity * 2);
1242 1048055 : *dest = gpr_realloc(*dest, *capacity);
1243 : }
1244 1048354 : memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
1245 1048354 : }
1246 :
1247 2170957 : static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
1248 : call_data *calld, requested_call *rc) {
1249 : grpc_op ops[1];
1250 2170870 : grpc_op *op = ops;
1251 :
1252 2170957 : memset(ops, 0, sizeof(ops));
1253 :
1254 : /* called once initial metadata has been read by the call, but BEFORE
1255 : the ioreq to fetch it out of the call has been executed.
1256 : This means metadata related fields can be relied on in calld, but to
1257 : fill in the metadata array passed by the client, we need to perform
1258 : an ioreq op, that should complete immediately. */
1259 :
1260 2170957 : grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
1261 2170926 : grpc_closure_init(&rc->publish, publish_registered_or_batch, rc);
1262 2170868 : *rc->call = calld->call;
1263 2170868 : calld->cq_new = rc->cq_for_notification;
1264 2170868 : GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
1265 2170868 : switch (rc->type) {
1266 : case BATCH_CALL:
1267 524177 : GPR_ASSERT(calld->host != NULL);
1268 524177 : GPR_ASSERT(calld->path != NULL);
1269 1048267 : cpstr(&rc->data.batch.details->host,
1270 524177 : &rc->data.batch.details->host_capacity, calld->host);
1271 1048354 : cpstr(&rc->data.batch.details->method,
1272 524177 : &rc->data.batch.details->method_capacity, calld->path);
1273 524177 : rc->data.batch.details->deadline = calld->deadline;
1274 524090 : break;
1275 : case REGISTERED_CALL:
1276 1646591 : *rc->data.registered.deadline = calld->deadline;
1277 1646591 : if (rc->data.registered.optional_payload) {
1278 1646555 : op->op = GRPC_OP_RECV_MESSAGE;
1279 1646555 : op->data.recv_message = rc->data.registered.optional_payload;
1280 1646555 : op++;
1281 : }
1282 1646591 : break;
1283 : default:
1284 0 : GPR_UNREACHABLE_CODE(return );
1285 : }
1286 :
1287 2170868 : GRPC_CALL_INTERNAL_REF(calld->call, "server");
1288 4341757 : grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops,
1289 2170922 : (size_t)(op - ops), &rc->publish);
1290 2170930 : }
1291 :
1292 2244053 : static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
1293 : grpc_cq_completion *c) {
1294 2243942 : requested_call *rc = req;
1295 2244053 : grpc_server *server = rc->server;
1296 :
1297 4476510 : if (rc >= server->requested_calls &&
1298 2232457 : rc < server->requested_calls + server->max_requested_calls) {
1299 2232461 : GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
1300 2232461 : gpr_stack_lockfree_push(server->request_freelist,
1301 2232461 : (int)(rc - server->requested_calls));
1302 : } else {
1303 11592 : gpr_free(req);
1304 : }
1305 :
1306 2244136 : server_unref(exec_ctx, server);
1307 2244139 : }
1308 :
1309 73104 : static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
1310 : requested_call *rc) {
1311 73104 : *rc->call = NULL;
1312 73104 : rc->initial_metadata->count = 0;
1313 :
1314 73080 : server_ref(server);
1315 73110 : grpc_cq_end_op(exec_ctx, rc->cq_for_notification, rc->tag, 0,
1316 : done_request_event, rc, &rc->completion);
1317 73159 : }
1318 :
1319 2170925 : static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc,
1320 : int success) {
1321 2170838 : requested_call *rc = prc;
1322 2170925 : grpc_call *call = *rc->call;
1323 2170858 : grpc_call_element *elem =
1324 2170925 : grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
1325 2170861 : call_data *calld = elem->call_data;
1326 2170861 : channel_data *chand = elem->channel_data;
1327 2170861 : server_ref(chand->server);
1328 2170875 : grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, success, done_request_event,
1329 : rc, &rc->completion);
1330 2170937 : GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "server");
1331 2170953 : }
1332 :
1333 6067 : const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
1334 6067 : return server->channel_args;
1335 : }
1336 :
1337 334 : int grpc_server_has_open_connections(grpc_server *server) {
1338 : int r;
1339 334 : gpr_mu_lock(&server->mu_global);
1340 334 : r = server->root_channel_data.next != &server->root_channel_data;
1341 334 : gpr_mu_unlock(&server->mu_global);
1342 334 : return r;
1343 : }
|