LCOV - code coverage report
Current view: top level - core/surface - server.c (source / functions) Hit Total Coverage
Test: tmp.CaZ6RjdVn2 Lines: 644 682 94.4 %
Date: 2015-12-10 22:15:08 Functions: 61 62 98.4 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/surface/server.h"
      35             : 
      36             : #include <limits.h>
      37             : #include <stdlib.h>
      38             : #include <string.h>
      39             : 
      40             : #include <grpc/support/alloc.h>
      41             : #include <grpc/support/log.h>
      42             : #include <grpc/support/string_util.h>
      43             : #include <grpc/support/useful.h>
      44             : 
      45             : #include "src/core/census/grpc_filter.h"
      46             : #include "src/core/channel/channel_args.h"
      47             : #include "src/core/channel/connected_channel.h"
      48             : #include "src/core/iomgr/iomgr.h"
      49             : #include "src/core/support/stack_lockfree.h"
      50             : #include "src/core/support/string.h"
      51             : #include "src/core/surface/api_trace.h"
      52             : #include "src/core/surface/call.h"
      53             : #include "src/core/surface/channel.h"
      54             : #include "src/core/surface/completion_queue.h"
      55             : #include "src/core/surface/init.h"
      56             : #include "src/core/transport/metadata.h"
      57             : #include "src/core/transport/static_metadata.h"
      58             : 
      59             : typedef struct listener {
      60             :   void *arg;
      61             :   void (*start)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
      62             :                 grpc_pollset **pollsets, size_t pollset_count);
      63             :   void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
      64             :                   grpc_closure *closure);
      65             :   struct listener *next;
      66             :   grpc_closure destroy_done;
      67             : } listener;
      68             : 
      69             : typedef struct call_data call_data;
      70             : typedef struct channel_data channel_data;
      71             : typedef struct registered_method registered_method;
      72             : 
      73             : typedef struct {
      74             :   call_data *next;
      75             :   call_data *prev;
      76             : } call_link;
      77             : 
      78             : typedef enum { BATCH_CALL, REGISTERED_CALL } requested_call_type;
      79             : 
      80             : typedef struct requested_call {
      81             :   requested_call_type type;
      82             :   void *tag;
      83             :   grpc_server *server;
      84             :   grpc_completion_queue *cq_bound_to_call;
      85             :   grpc_completion_queue *cq_for_notification;
      86             :   grpc_call **call;
      87             :   grpc_cq_completion completion;
      88             :   grpc_metadata_array *initial_metadata;
      89             :   union {
      90             :     struct {
      91             :       grpc_call_details *details;
      92             :     } batch;
      93             :     struct {
      94             :       registered_method *registered_method;
      95             :       gpr_timespec *deadline;
      96             :       grpc_byte_buffer **optional_payload;
      97             :     } registered;
      98             :   } data;
      99             :   grpc_closure publish;
     100             : } requested_call;
     101             : 
     102             : typedef struct channel_registered_method {
     103             :   registered_method *server_registered_method;
     104             :   grpc_mdstr *method;
     105             :   grpc_mdstr *host;
     106             : } channel_registered_method;
     107             : 
     108             : struct channel_data {
     109             :   grpc_server *server;
     110             :   grpc_connectivity_state connectivity_state;
     111             :   grpc_channel *channel;
     112             :   /* linked list of all channels on a server */
     113             :   channel_data *next;
     114             :   channel_data *prev;
     115             :   channel_registered_method *registered_methods;
     116             :   gpr_uint32 registered_method_slots;
     117             :   gpr_uint32 registered_method_max_probes;
     118             :   grpc_closure finish_destroy_channel_closure;
     119             :   grpc_closure channel_connectivity_changed;
     120             : };
     121             : 
     122             : typedef struct shutdown_tag {
     123             :   void *tag;
     124             :   grpc_completion_queue *cq;
     125             :   grpc_cq_completion completion;
     126             : } shutdown_tag;
     127             : 
     128             : typedef enum {
     129             :   /* waiting for metadata */
     130             :   NOT_STARTED,
     131             :   /* inital metadata read, not flow controlled in yet */
     132             :   PENDING,
     133             :   /* flow controlled in, on completion queue */
     134             :   ACTIVATED,
     135             :   /* cancelled before being queued */
     136             :   ZOMBIED
     137             : } call_state;
     138             : 
     139             : typedef struct request_matcher request_matcher;
     140             : 
     141             : struct call_data {
     142             :   grpc_call *call;
     143             : 
     144             :   /** protects state */
     145             :   gpr_mu mu_state;
     146             :   /** the current state of a call - see call_state */
     147             :   call_state state;
     148             : 
     149             :   grpc_mdstr *path;
     150             :   grpc_mdstr *host;
     151             :   gpr_timespec deadline;
     152             : 
     153             :   grpc_completion_queue *cq_new;
     154             : 
     155             :   grpc_metadata_batch *recv_initial_metadata;
     156             :   grpc_metadata_array initial_metadata;
     157             : 
     158             :   grpc_closure got_initial_metadata;
     159             :   grpc_closure server_on_recv_initial_metadata;
     160             :   grpc_closure kill_zombie_closure;
     161             :   grpc_closure *on_done_recv_initial_metadata;
     162             : 
     163             :   call_data *pending_next;
     164             : };
     165             : 
     166             : struct request_matcher {
     167             :   call_data *pending_head;
     168             :   call_data *pending_tail;
     169             :   gpr_stack_lockfree *requests;
     170             : };
     171             : 
     172             : struct registered_method {
     173             :   char *method;
     174             :   char *host;
     175             :   request_matcher request_matcher;
     176             :   registered_method *next;
     177             : };
     178             : 
     179             : typedef struct {
     180             :   grpc_channel **channels;
     181             :   size_t num_channels;
     182             : } channel_broadcaster;
     183             : 
     184             : struct grpc_server {
     185             :   size_t channel_filter_count;
     186             :   grpc_channel_filter const **channel_filters;
     187             :   grpc_channel_args *channel_args;
     188             : 
     189             :   grpc_completion_queue **cqs;
     190             :   grpc_pollset **pollsets;
     191             :   size_t cq_count;
     192             : 
     193             :   /* The two following mutexes control access to server-state
     194             :      mu_global controls access to non-call-related state (e.g., channel state)
     195             :      mu_call controls access to call-related state (e.g., the call lists)
     196             : 
     197             :      If they are ever required to be nested, you must lock mu_global
     198             :      before mu_call. This is currently used in shutdown processing
     199             :      (grpc_server_shutdown_and_notify and maybe_finish_shutdown) */
     200             :   gpr_mu mu_global; /* mutex for server and channel state */
     201             :   gpr_mu mu_call;   /* mutex for call-specific state */
     202             : 
     203             :   registered_method *registered_methods;
     204             :   request_matcher unregistered_request_matcher;
     205             :   /** free list of available requested_calls indices */
     206             :   gpr_stack_lockfree *request_freelist;
     207             :   /** requested call backing data */
     208             :   requested_call *requested_calls;
     209             :   size_t max_requested_calls;
     210             : 
     211             :   gpr_atm shutdown_flag;
     212             :   gpr_uint8 shutdown_published;
     213             :   size_t num_shutdown_tags;
     214             :   shutdown_tag *shutdown_tags;
     215             : 
     216             :   channel_data root_channel_data;
     217             : 
     218             :   listener *listeners;
     219             :   int listeners_destroyed;
     220             :   gpr_refcount internal_refcount;
     221             : 
     222             :   /** when did we print the last shutdown progress message */
     223             :   gpr_timespec last_shutdown_message_time;
     224             : };
     225             : 
     226             : #define SERVER_FROM_CALL_ELEM(elem) \
     227             :   (((channel_data *)(elem)->channel_data)->server)
     228             : 
     229             : static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
     230             :                        call_data *calld, requested_call *rc);
     231             : static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
     232             :                       requested_call *rc);
     233             : /* Before calling maybe_finish_shutdown, we must hold mu_global and not
     234             :    hold mu_call */
     235             : static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_server *server);
     236             : 
     237             : /*
     238             :  * channel broadcaster
     239             :  */
     240             : 
     241             : /* assumes server locked */
     242        3600 : static void channel_broadcaster_init(grpc_server *s, channel_broadcaster *cb) {
     243             :   channel_data *c;
     244        3512 :   size_t count = 0;
     245        6342 :   for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
     246        2742 :     count++;
     247             :   }
     248        3600 :   cb->num_channels = count;
     249        3600 :   cb->channels = gpr_malloc(sizeof(*cb->channels) * cb->num_channels);
     250        3512 :   count = 0;
     251        6342 :   for (c = s->root_channel_data.next; c != &s->root_channel_data; c = c->next) {
     252        2742 :     cb->channels[count++] = c->channel;
     253        2742 :     GRPC_CHANNEL_INTERNAL_REF(c->channel, "broadcast");
     254             :   }
     255        3600 : }
     256             : 
     257             : struct shutdown_cleanup_args {
     258             :   grpc_closure closure;
     259             :   gpr_slice slice;
     260             : };
     261             : 
     262        2742 : static void shutdown_cleanup(grpc_exec_ctx *exec_ctx, void *arg,
     263             :                              int iomgr_status_ignored) {
     264        2701 :   struct shutdown_cleanup_args *a = arg;
     265        2742 :   gpr_slice_unref(a->slice);
     266        2742 :   gpr_free(a);
     267        2742 : }
     268             : 
     269        2742 : static void send_shutdown(grpc_exec_ctx *exec_ctx, grpc_channel *channel,
     270             :                           int send_goaway, int send_disconnect) {
     271             :   grpc_transport_op op;
     272             :   struct shutdown_cleanup_args *sc;
     273             :   grpc_channel_element *elem;
     274             : 
     275        2742 :   memset(&op, 0, sizeof(op));
     276        2742 :   op.send_goaway = send_goaway;
     277        2742 :   sc = gpr_malloc(sizeof(*sc));
     278        2742 :   sc->slice = gpr_slice_from_copied_string("Server shutdown");
     279        2742 :   op.goaway_message = &sc->slice;
     280        2742 :   op.goaway_status = GRPC_STATUS_OK;
     281        2742 :   op.disconnect = send_disconnect;
     282        2742 :   grpc_closure_init(&sc->closure, shutdown_cleanup, sc);
     283        2742 :   op.on_consumed = &sc->closure;
     284             : 
     285        2742 :   elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
     286        2742 :   elem->filter->start_transport_op(exec_ctx, elem, &op);
     287        2742 : }
     288             : 
     289        3600 : static void channel_broadcaster_shutdown(grpc_exec_ctx *exec_ctx,
     290             :                                          channel_broadcaster *cb,
     291             :                                          int send_goaway,
     292             :                                          int force_disconnect) {
     293             :   size_t i;
     294             : 
     295        6342 :   for (i = 0; i < cb->num_channels; i++) {
     296        2742 :     send_shutdown(exec_ctx, cb->channels[i], send_goaway, force_disconnect);
     297        2742 :     GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, cb->channels[i], "broadcast");
     298             :   }
     299        3600 :   gpr_free(cb->channels);
     300        3600 : }
     301             : 
     302             : /*
     303             :  * request_matcher
     304             :  */
     305             : 
     306        4740 : static void request_matcher_init(request_matcher *rm, size_t entries) {
     307        4740 :   memset(rm, 0, sizeof(*rm));
     308        4740 :   rm->requests = gpr_stack_lockfree_create(entries);
     309        4740 : }
     310             : 
     311        4700 : static void request_matcher_destroy(request_matcher *rm) {
     312        4700 :   GPR_ASSERT(gpr_stack_lockfree_pop(rm->requests) == -1);
     313        4700 :   gpr_stack_lockfree_destroy(rm->requests);
     314        4700 : }
     315             : 
     316         194 : static void kill_zombie(grpc_exec_ctx *exec_ctx, void *elem, int success) {
     317         194 :   grpc_call_destroy(grpc_call_from_top_element(elem));
     318         194 : }
     319             : 
     320       19775 : static void request_matcher_zombify_all_pending_calls(grpc_exec_ctx *exec_ctx,
     321             :                                                       request_matcher *rm) {
     322       39536 :   while (rm->pending_head) {
     323         139 :     call_data *calld = rm->pending_head;
     324         141 :     rm->pending_head = calld->pending_next;
     325         141 :     gpr_mu_lock(&calld->mu_state);
     326         141 :     calld->state = ZOMBIED;
     327         141 :     gpr_mu_unlock(&calld->mu_state);
     328         141 :     grpc_closure_init(
     329             :         &calld->kill_zombie_closure, kill_zombie,
     330         141 :         grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
     331         141 :     grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
     332             :   }
     333       19775 : }
     334             : 
     335       19775 : static void request_matcher_kill_requests(grpc_exec_ctx *exec_ctx,
     336             :                                           grpc_server *server,
     337             :                                           request_matcher *rm) {
     338             :   int request_id;
     339      100974 :   while ((request_id = gpr_stack_lockfree_pop(rm->requests)) != -1) {
     340       61579 :     fail_call(exec_ctx, server, &server->requested_calls[request_id]);
     341             :   }
     342       19775 : }
     343             : 
     344             : /*
     345             :  * server proper
     346             :  */
     347             : 
     348     4422992 : static void server_ref(grpc_server *server) {
     349     4423314 :   gpr_ref(&server->internal_refcount);
     350     4423821 : }
     351             : 
     352        3475 : static void server_delete(grpc_exec_ctx *exec_ctx, grpc_server *server) {
     353             :   registered_method *rm;
     354             :   size_t i;
     355        3475 :   grpc_channel_args_destroy(server->channel_args);
     356        3475 :   gpr_mu_destroy(&server->mu_global);
     357        3475 :   gpr_mu_destroy(&server->mu_call);
     358        3475 :   gpr_free((void *)server->channel_filters);
     359        8166 :   while ((rm = server->registered_methods) != NULL) {
     360        1225 :     server->registered_methods = rm->next;
     361        1225 :     request_matcher_destroy(&rm->request_matcher);
     362        1225 :     gpr_free(rm->method);
     363        1225 :     gpr_free(rm->host);
     364        1225 :     gpr_free(rm);
     365             :   }
     366        6990 :   for (i = 0; i < server->cq_count; i++) {
     367        3524 :     GRPC_CQ_INTERNAL_UNREF(server->cqs[i], "server");
     368             :   }
     369        3475 :   request_matcher_destroy(&server->unregistered_request_matcher);
     370        3475 :   gpr_stack_lockfree_destroy(server->request_freelist);
     371        3475 :   gpr_free(server->cqs);
     372        3475 :   gpr_free(server->pollsets);
     373        3475 :   gpr_free(server->shutdown_tags);
     374        3475 :   gpr_free(server->requested_calls);
     375        3475 :   gpr_free(server);
     376        3475 : }
     377             : 
     378     4425576 : static void server_unref(grpc_exec_ctx *exec_ctx, grpc_server *server) {
     379     4425576 :   if (gpr_unref(&server->internal_refcount)) {
     380        3475 :     server_delete(exec_ctx, server);
     381             :   }
     382     4428232 : }
     383             : 
     384        3015 : static int is_channel_orphaned(channel_data *chand) {
     385        3056 :   return chand->next == chand;
     386             : }
     387             : 
     388        3015 : static void orphan_channel(channel_data *chand) {
     389        3056 :   chand->next->prev = chand->prev;
     390        3056 :   chand->prev->next = chand->next;
     391        3056 :   chand->next = chand->prev = chand;
     392        3015 : }
     393             : 
     394        3056 : static void finish_destroy_channel(grpc_exec_ctx *exec_ctx, void *cd,
     395             :                                    int success) {
     396        3015 :   channel_data *chand = cd;
     397        3056 :   grpc_server *server = chand->server;
     398        3056 :   GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "server");
     399        3056 :   server_unref(exec_ctx, server);
     400        3056 : }
     401             : 
     402        3056 : static void destroy_channel(grpc_exec_ctx *exec_ctx, channel_data *chand) {
     403        6112 :   if (is_channel_orphaned(chand)) return;
     404        3056 :   GPR_ASSERT(chand->server != NULL);
     405        3015 :   orphan_channel(chand);
     406        3015 :   server_ref(chand->server);
     407        3056 :   maybe_finish_shutdown(exec_ctx, chand->server);
     408        3056 :   chand->finish_destroy_channel_closure.cb = finish_destroy_channel;
     409        3056 :   chand->finish_destroy_channel_closure.cb_arg = chand;
     410        3056 :   grpc_exec_ctx_enqueue(exec_ctx, &chand->finish_destroy_channel_closure, 1);
     411             : }
     412             : 
     413     2170882 : static void finish_start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_server *server,
     414             :                                  grpc_call_element *elem, request_matcher *rm) {
     415     2170882 :   call_data *calld = elem->call_data;
     416             :   int request_id;
     417             : 
     418     2170882 :   if (gpr_atm_acq_load(&server->shutdown_flag)) {
     419           0 :     gpr_mu_lock(&calld->mu_state);
     420           0 :     calld->state = ZOMBIED;
     421           0 :     gpr_mu_unlock(&calld->mu_state);
     422           0 :     grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
     423           0 :     grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
     424     2171077 :     return;
     425             :   }
     426             : 
     427     2170882 :   request_id = gpr_stack_lockfree_pop(rm->requests);
     428     2171093 :   if (request_id == -1) {
     429      104788 :     gpr_mu_lock(&server->mu_call);
     430      104788 :     gpr_mu_lock(&calld->mu_state);
     431      104788 :     calld->state = PENDING;
     432      104788 :     gpr_mu_unlock(&calld->mu_state);
     433      104788 :     if (rm->pending_head == NULL) {
     434         727 :       rm->pending_tail = rm->pending_head = calld;
     435             :     } else {
     436      104061 :       rm->pending_tail->pending_next = calld;
     437      104061 :       rm->pending_tail = calld;
     438             :     }
     439      104788 :     calld->pending_next = NULL;
     440      104788 :     gpr_mu_unlock(&server->mu_call);
     441             :   } else {
     442     2066305 :     gpr_mu_lock(&calld->mu_state);
     443     2066313 :     calld->state = ACTIVATED;
     444     2066313 :     gpr_mu_unlock(&calld->mu_state);
     445     2066311 :     begin_call(exec_ctx, server, calld, &server->requested_calls[request_id]);
     446             :   }
     447             : }
     448             : 
     449     2170928 : static void start_new_rpc(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
     450     2170928 :   channel_data *chand = elem->channel_data;
     451     2170928 :   call_data *calld = elem->call_data;
     452     2170928 :   grpc_server *server = chand->server;
     453             :   gpr_uint32 i;
     454             :   gpr_uint32 hash;
     455             :   channel_registered_method *rm;
     456             : 
     457     2170928 :   if (chand->registered_methods && calld->path && calld->host) {
     458             :     /* TODO(ctiller): unify these two searches */
     459             :     /* check for an exact match with host */
     460     1646572 :     hash = GRPC_MDSTR_KV_HASH(calld->host->hash, calld->path->hash);
     461     3294645 :     for (i = 0; i <= chand->registered_method_max_probes; i++) {
     462     3295954 :       rm = &chand->registered_methods[(hash + i) %
     463     1647977 :                                       chand->registered_method_slots];
     464     1647977 :       if (!rm) break;
     465     1648074 :       if (rm->host != calld->host) continue;
     466           1 :       if (rm->method != calld->path) continue;
     467           1 :       finish_start_new_rpc(exec_ctx, server, elem,
     468           1 :                            &rm->server_registered_method->request_matcher);
     469           1 :       return;
     470             :     }
     471             :     /* check for a wildcard method definition (no host set) */
     472     1646571 :     hash = GRPC_MDSTR_KV_HASH(0, calld->path->hash);
     473     1646959 :     for (i = 0; i <= chand->registered_method_max_probes; i++) {
     474     3293932 :       rm = &chand->registered_methods[(hash + i) %
     475     1646966 :                                       chand->registered_method_slots];
     476     1646966 :       if (!rm) break;
     477     1646979 :       if (rm->host != NULL) continue;
     478     1646849 :       if (rm->method != calld->path) continue;
     479     1646591 :       finish_start_new_rpc(exec_ctx, server, elem,
     480     1646591 :                            &rm->server_registered_method->request_matcher);
     481     1646757 :       return;
     482             :     }
     483             :   }
     484      524336 :   finish_start_new_rpc(exec_ctx, server, elem,
     485             :                        &server->unregistered_request_matcher);
     486             : }
     487             : 
     488       11403 : static int num_listeners(grpc_server *server) {
     489             :   listener *l;
     490       11403 :   int n = 0;
     491       21372 :   for (l = server->listeners; l; l = l->next) {
     492        9969 :     n++;
     493             :   }
     494       11486 :   return n;
     495             : }
     496             : 
     497        3514 : static void done_shutdown_event(grpc_exec_ctx *exec_ctx, void *server,
     498             :                                 grpc_cq_completion *completion) {
     499        3514 :   server_unref(exec_ctx, server);
     500        3514 : }
     501             : 
     502           0 : static int num_channels(grpc_server *server) {
     503             :   channel_data *chand;
     504           0 :   int n = 0;
     505           0 :   for (chand = server->root_channel_data.next;
     506           0 :        chand != &server->root_channel_data; chand = chand->next) {
     507           0 :     n++;
     508             :   }
     509           0 :   return n;
     510             : }
     511             : 
     512       14377 : static void kill_pending_work_locked(grpc_exec_ctx *exec_ctx,
     513             :                                      grpc_server *server) {
     514             :   registered_method *rm;
     515       14377 :   request_matcher_kill_requests(exec_ctx, server,
     516             :                                 &server->unregistered_request_matcher);
     517       14377 :   request_matcher_zombify_all_pending_calls(
     518             :       exec_ctx, &server->unregistered_request_matcher);
     519       19775 :   for (rm = server->registered_methods; rm; rm = rm->next) {
     520        5398 :     request_matcher_kill_requests(exec_ctx, server, &rm->request_matcher);
     521        5398 :     request_matcher_zombify_all_pending_calls(exec_ctx, &rm->request_matcher);
     522             :   }
     523       14377 : }
     524             : 
     525       12346 : static void maybe_finish_shutdown(grpc_exec_ctx *exec_ctx,
     526             :                                   grpc_server *server) {
     527             :   size_t i;
     528       12346 :   if (!gpr_atm_acq_load(&server->shutdown_flag) || server->shutdown_published) {
     529        1474 :     return;
     530             :   }
     531             : 
     532       10863 :   kill_pending_work_locked(exec_ctx, server);
     533             : 
     534       18868 :   if (server->root_channel_data.next != &server->root_channel_data ||
     535        8005 :       server->listeners_destroyed < num_listeners(server)) {
     536        7349 :     if (gpr_time_cmp(gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME),
     537             :                                   server->last_shutdown_message_time),
     538             :                      gpr_time_from_seconds(1, GPR_TIMESPAN)) >= 0) {
     539           0 :       server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
     540           0 :       gpr_log(GPR_DEBUG,
     541             :               "Waiting for %d channels and %d/%d listeners to be destroyed"
     542             :               " before shutting down server",
     543             :               num_channels(server),
     544           0 :               num_listeners(server) - server->listeners_destroyed,
     545             :               num_listeners(server));
     546             :     }
     547        7274 :     return;
     548             :   }
     549        3514 :   server->shutdown_published = 1;
     550        7028 :   for (i = 0; i < server->num_shutdown_tags; i++) {
     551        3474 :     server_ref(server);
     552        6988 :     grpc_cq_end_op(exec_ctx, server->shutdown_tags[i].cq,
     553        3474 :                    server->shutdown_tags[i].tag, 1, done_shutdown_event, server,
     554        3514 :                    &server->shutdown_tags[i].completion);
     555             :   }
     556             : }
     557             : 
     558    12417741 : static grpc_mdelem *server_filter(void *user_data, grpc_mdelem *md) {
     559    12417270 :   grpc_call_element *elem = user_data;
     560    12417741 :   call_data *calld = elem->call_data;
     561    12417741 :   if (md->key == GRPC_MDSTR_PATH) {
     562     2170690 :     calld->path = GRPC_MDSTR_REF(md->value);
     563     2171098 :     return NULL;
     564    10247051 :   } else if (md->key == GRPC_MDSTR_AUTHORITY) {
     565     2171063 :     calld->host = GRPC_MDSTR_REF(md->value);
     566     2171080 :     return NULL;
     567             :   }
     568     8075695 :   return md;
     569             : }
     570             : 
     571     2170979 : static void server_on_recv_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
     572             :                                             int success) {
     573     2170890 :   grpc_call_element *elem = ptr;
     574     2170979 :   call_data *calld = elem->call_data;
     575             :   gpr_timespec op_deadline;
     576             : 
     577     2170979 :   grpc_metadata_batch_filter(calld->recv_initial_metadata, server_filter, elem);
     578     2170839 :   op_deadline = calld->recv_initial_metadata->deadline;
     579     2170839 :   if (0 != gpr_time_cmp(op_deadline, gpr_inf_future(op_deadline.clock_type))) {
     580      524117 :     calld->deadline = op_deadline;
     581             :   }
     582     2171153 :   if (calld->host && calld->path) {
     583             :     /* do nothing */
     584             :   } else {
     585          48 :     success = 0;
     586             :   }
     587             : 
     588     4342217 :   calld->on_done_recv_initial_metadata->cb(
     589     2171064 :       exec_ctx, calld->on_done_recv_initial_metadata->cb_arg, success);
     590     2171153 : }
     591             : 
     592     8098548 : static void server_mutate_op(grpc_call_element *elem,
     593             :                              grpc_transport_stream_op *op) {
     594     8098548 :   call_data *calld = elem->call_data;
     595             : 
     596     8098997 :   if (op->recv_initial_metadata != NULL) {
     597     2171122 :     calld->recv_initial_metadata = op->recv_initial_metadata;
     598     2171122 :     calld->on_done_recv_initial_metadata = op->on_complete;
     599     2171122 :     op->on_complete = &calld->server_on_recv_initial_metadata;
     600             :   }
     601     8098548 : }
     602             : 
     603     8099259 : static void server_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
     604             :                                              grpc_call_element *elem,
     605             :                                              grpc_transport_stream_op *op) {
     606     8099259 :   GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
     607     8098810 :   server_mutate_op(elem, op);
     608     8101097 :   grpc_call_next_op(exec_ctx, elem, op);
     609     8102166 : }
     610             : 
     611     2170979 : static void got_initial_metadata(grpc_exec_ctx *exec_ctx, void *ptr,
     612             :                                  int success) {
     613     2170890 :   grpc_call_element *elem = ptr;
     614     2170979 :   call_data *calld = elem->call_data;
     615     2170979 :   if (success) {
     616     2170926 :     start_new_rpc(exec_ctx, elem);
     617             :   } else {
     618          53 :     gpr_mu_lock(&calld->mu_state);
     619          53 :     if (calld->state == NOT_STARTED) {
     620          53 :       calld->state = ZOMBIED;
     621          53 :       gpr_mu_unlock(&calld->mu_state);
     622          53 :       grpc_closure_init(&calld->kill_zombie_closure, kill_zombie, elem);
     623          53 :       grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
     624           0 :     } else if (calld->state == PENDING) {
     625           0 :       calld->state = ZOMBIED;
     626           0 :       gpr_mu_unlock(&calld->mu_state);
     627             :       /* zombied call will be destroyed when it's removed from the pending
     628             :          queue... later */
     629             :     } else {
     630           0 :       gpr_mu_unlock(&calld->mu_state);
     631             :     }
     632             :   }
     633     2171079 : }
     634             : 
     635     2171064 : static void accept_stream(grpc_exec_ctx *exec_ctx, void *cd,
     636             :                           grpc_transport *transport,
     637             :                           const void *transport_server_data) {
     638     2170975 :   channel_data *chand = cd;
     639             :   /* create a call */
     640     2171154 :   grpc_call *call =
     641     2171064 :       grpc_call_create(chand->channel, NULL, 0, NULL, transport_server_data,
     642             :                        NULL, 0, gpr_inf_future(GPR_CLOCK_MONOTONIC));
     643     2171139 :   grpc_call_element *elem =
     644     2171151 :       grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
     645     2171140 :   call_data *calld = elem->call_data;
     646             :   grpc_op op;
     647     2171140 :   memset(&op, 0, sizeof(op));
     648     2171140 :   op.op = GRPC_OP_RECV_INITIAL_METADATA;
     649     2171140 :   op.data.recv_initial_metadata = &calld->initial_metadata;
     650     2171140 :   grpc_closure_init(&calld->got_initial_metadata, got_initial_metadata, elem);
     651     2171117 :   grpc_call_start_batch_and_execute(exec_ctx, call, &op, 1,
     652             :                                     &calld->got_initial_metadata);
     653     2171118 : }
     654             : 
     655        6112 : static void channel_connectivity_changed(grpc_exec_ctx *exec_ctx, void *cd,
     656             :                                          int iomgr_status_ignored) {
     657        6030 :   channel_data *chand = cd;
     658        6112 :   grpc_server *server = chand->server;
     659        6112 :   if (chand->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE) {
     660             :     grpc_transport_op op;
     661        3056 :     memset(&op, 0, sizeof(op));
     662        3056 :     op.on_connectivity_state_change = &chand->channel_connectivity_changed,
     663        3056 :     op.connectivity_state = &chand->connectivity_state;
     664        3056 :     grpc_channel_next_op(exec_ctx,
     665             :                          grpc_channel_stack_element(
     666             :                              grpc_channel_get_channel_stack(chand->channel), 0),
     667             :                          &op);
     668             :   } else {
     669        3056 :     gpr_mu_lock(&server->mu_global);
     670        3056 :     destroy_channel(exec_ctx, chand);
     671        3056 :     gpr_mu_unlock(&server->mu_global);
     672        3056 :     GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, chand->channel, "connectivity");
     673             :   }
     674        6112 : }
     675             : 
     676     2170899 : static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
     677             :                            grpc_call_element_args *args) {
     678     2170899 :   call_data *calld = elem->call_data;
     679     2170899 :   channel_data *chand = elem->channel_data;
     680     2170899 :   memset(calld, 0, sizeof(call_data));
     681     2170899 :   calld->deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
     682     2171116 :   calld->call = grpc_call_from_top_element(elem);
     683     2171104 :   gpr_mu_init(&calld->mu_state);
     684             : 
     685     2171092 :   grpc_closure_init(&calld->server_on_recv_initial_metadata,
     686             :                     server_on_recv_initial_metadata, elem);
     687             : 
     688     2171040 :   server_ref(chand->server);
     689     2171147 : }
     690             : 
     691     2170777 : static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
     692             :                               grpc_call_element *elem) {
     693     2170777 :   channel_data *chand = elem->channel_data;
     694     2170777 :   call_data *calld = elem->call_data;
     695             : 
     696     2170777 :   GPR_ASSERT(calld->state != PENDING);
     697             : 
     698     2170777 :   if (calld->host) {
     699     2170751 :     GRPC_MDSTR_UNREF(calld->host);
     700             :   }
     701     2170963 :   if (calld->path) {
     702     2170915 :     GRPC_MDSTR_UNREF(calld->path);
     703             :   }
     704     2170555 :   grpc_metadata_array_destroy(&calld->initial_metadata);
     705             : 
     706     2170542 :   gpr_mu_destroy(&calld->mu_state);
     707             : 
     708     2170498 :   server_unref(exec_ctx, chand->server);
     709     2171016 : }
     710             : 
     711        3056 : static void init_channel_elem(grpc_exec_ctx *exec_ctx,
     712             :                               grpc_channel_element *elem,
     713             :                               grpc_channel_element_args *args) {
     714        3056 :   channel_data *chand = elem->channel_data;
     715        3056 :   GPR_ASSERT(args->is_first);
     716        3056 :   GPR_ASSERT(!args->is_last);
     717        3056 :   chand->server = NULL;
     718        3056 :   chand->channel = NULL;
     719        3056 :   chand->next = chand->prev = chand;
     720        3056 :   chand->registered_methods = NULL;
     721        3056 :   chand->connectivity_state = GRPC_CHANNEL_IDLE;
     722        3056 :   grpc_closure_init(&chand->channel_connectivity_changed,
     723             :                     channel_connectivity_changed, chand);
     724        3056 : }
     725             : 
     726        3024 : static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
     727             :                                  grpc_channel_element *elem) {
     728             :   size_t i;
     729        3024 :   channel_data *chand = elem->channel_data;
     730        3024 :   if (chand->registered_methods) {
     731        2452 :     for (i = 0; i < chand->registered_method_slots; i++) {
     732        2296 :       if (chand->registered_methods[i].method) {
     733        1148 :         GRPC_MDSTR_UNREF(chand->registered_methods[i].method);
     734             :       }
     735        2296 :       if (chand->registered_methods[i].host) {
     736         365 :         GRPC_MDSTR_UNREF(chand->registered_methods[i].host);
     737             :       }
     738             :     }
     739         156 :     gpr_free(chand->registered_methods);
     740             :   }
     741        3024 :   if (chand->server) {
     742        3024 :     gpr_mu_lock(&chand->server->mu_global);
     743        3024 :     chand->next->prev = chand->prev;
     744        3024 :     chand->prev->next = chand->next;
     745        3024 :     chand->next = chand->prev = chand;
     746        3024 :     maybe_finish_shutdown(exec_ctx, chand->server);
     747        3024 :     gpr_mu_unlock(&chand->server->mu_global);
     748        3024 :     server_unref(exec_ctx, chand->server);
     749             :   }
     750        3024 : }
     751             : 
     752             : static const grpc_channel_filter server_surface_filter = {
     753             :     server_start_transport_stream_op, grpc_channel_next_op, sizeof(call_data),
     754             :     init_call_elem, grpc_call_stack_ignore_set_pollset, destroy_call_elem,
     755             :     sizeof(channel_data), init_channel_elem, destroy_channel_elem,
     756             :     grpc_call_next_get_peer, "server",
     757             : };
     758             : 
     759        3596 : void grpc_server_register_completion_queue(grpc_server *server,
     760             :                                            grpc_completion_queue *cq,
     761             :                                            void *reserved) {
     762             :   size_t i, n;
     763        3596 :   GRPC_API_TRACE(
     764             :       "grpc_server_register_completion_queue(server=%p, cq=%p, reserved=%p)", 3,
     765             :       (server, cq, reserved));
     766        3596 :   GPR_ASSERT(!reserved);
     767        3623 :   for (i = 0; i < server->cq_count; i++) {
     768        3705 :     if (server->cqs[i] == cq) return;
     769             :   }
     770        3596 :   GRPC_CQ_INTERNAL_REF(cq, "server");
     771        3596 :   grpc_cq_mark_server_cq(cq);
     772        3596 :   n = server->cq_count++;
     773        3596 :   server->cqs = gpr_realloc(server->cqs,
     774        3514 :                             server->cq_count * sizeof(grpc_completion_queue *));
     775        3596 :   server->cqs[n] = cq;
     776             : }
     777             : 
     778        3515 : grpc_server *grpc_server_create_from_filters(
     779             :     const grpc_channel_filter **filters, size_t filter_count,
     780             :     const grpc_channel_args *args) {
     781             :   size_t i;
     782             :   /* TODO(census): restore this once we finalize census filter etc.
     783             :      int census_enabled = grpc_channel_args_is_census_enabled(args); */
     784        3474 :   int census_enabled = 0;
     785             : 
     786        3515 :   grpc_server *server = gpr_malloc(sizeof(grpc_server));
     787             : 
     788        3515 :   GPR_ASSERT(grpc_is_initialized() && "call grpc_init()");
     789             : 
     790        3515 :   memset(server, 0, sizeof(grpc_server));
     791             : 
     792        3515 :   gpr_mu_init(&server->mu_global);
     793        3515 :   gpr_mu_init(&server->mu_call);
     794             : 
     795             :   /* decremented by grpc_server_destroy */
     796        3515 :   gpr_ref_init(&server->internal_refcount, 1);
     797        3515 :   server->root_channel_data.next = server->root_channel_data.prev =
     798        3515 :       &server->root_channel_data;
     799             : 
     800             :   /* TODO(ctiller): expose a channel_arg for this */
     801        3515 :   server->max_requested_calls = 32768;
     802        3515 :   server->request_freelist =
     803        3515 :       gpr_stack_lockfree_create(server->max_requested_calls);
     804   115183035 :   for (i = 0; i < (size_t)server->max_requested_calls; i++) {
     805   115179520 :     gpr_stack_lockfree_push(server->request_freelist, (int)i);
     806             :   }
     807        3515 :   request_matcher_init(&server->unregistered_request_matcher,
     808             :                        server->max_requested_calls);
     809        3515 :   server->requested_calls = gpr_malloc(server->max_requested_calls *
     810             :                                        sizeof(*server->requested_calls));
     811             : 
     812             :   /* Server filter stack is:
     813             : 
     814             :      server_surface_filter - for making surface API calls
     815             :      grpc_server_census_filter (optional) - for stats collection and tracing
     816             :      {passed in filter stack}
     817             :      grpc_connected_channel_filter - for interfacing with transports */
     818        3515 :   server->channel_filter_count = filter_count + 1u + (census_enabled ? 1u : 0u);
     819        3515 :   server->channel_filters =
     820        3515 :       gpr_malloc(server->channel_filter_count * sizeof(grpc_channel_filter *));
     821        3515 :   server->channel_filters[0] = &server_surface_filter;
     822        3474 :   if (census_enabled) {
     823           0 :     server->channel_filters[1] = &grpc_server_census_filter;
     824             :   }
     825        6321 :   for (i = 0; i < filter_count; i++) {
     826        2765 :     server->channel_filters[i + 1u + (census_enabled ? 1u : 0u)] = filters[i];
     827             :   }
     828             : 
     829        3515 :   server->channel_args = grpc_channel_args_copy(args);
     830             : 
     831        3515 :   return server;
     832             : }
     833             : 
     834        5518 : static int streq(const char *a, const char *b) {
     835        5518 :   if (a == NULL && b == NULL) return 1;
     836        5518 :   if (a == NULL) return 0;
     837        5108 :   if (b == NULL) return 0;
     838        5108 :   return 0 == strcmp(a, b);
     839             : }
     840             : 
     841        1225 : void *grpc_server_register_method(grpc_server *server, const char *method,
     842             :                                   const char *host) {
     843             :   registered_method *m;
     844        1225 :   GRPC_API_TRACE("grpc_server_register_method(server=%p, method=%s, host=%s)",
     845             :                  3, (server, method, host));
     846        1225 :   if (!method) {
     847           0 :     gpr_log(GPR_ERROR,
     848             :             "grpc_server_register_method method string cannot be NULL");
     849           0 :     return NULL;
     850             :   }
     851        6333 :   for (m = server->registered_methods; m; m = m->next) {
     852        5108 :     if (streq(m->method, method) && streq(m->host, host)) {
     853           0 :       gpr_log(GPR_ERROR, "duplicate registration for %s@%s", method,
     854             :               host ? host : "*");
     855           0 :       return NULL;
     856             :     }
     857             :   }
     858        1225 :   m = gpr_malloc(sizeof(registered_method));
     859        1225 :   memset(m, 0, sizeof(*m));
     860        1225 :   request_matcher_init(&m->request_matcher, server->max_requested_calls);
     861        1225 :   m->method = gpr_strdup(method);
     862        1225 :   m->host = gpr_strdup(host);
     863        1225 :   m->next = server->registered_methods;
     864        1225 :   server->registered_methods = m;
     865        1225 :   return m;
     866             : }
     867             : 
     868        3505 : void grpc_server_start(grpc_server *server) {
     869             :   listener *l;
     870             :   size_t i;
     871        3505 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     872             : 
     873        3505 :   GRPC_API_TRACE("grpc_server_start(server=%p)", 1, (server));
     874             : 
     875        3505 :   server->pollsets = gpr_malloc(sizeof(grpc_pollset *) * server->cq_count);
     876        7082 :   for (i = 0; i < server->cq_count; i++) {
     877        3577 :     server->pollsets[i] = grpc_cq_pollset(server->cqs[i]);
     878             :   }
     879             : 
     880        6256 :   for (l = server->listeners; l; l = l->next) {
     881        2751 :     l->start(&exec_ctx, server, l->arg, server->pollsets, server->cq_count);
     882             :   }
     883             : 
     884        3505 :   grpc_exec_ctx_finish(&exec_ctx);
     885        3505 : }
     886             : 
     887        3056 : void grpc_server_setup_transport(grpc_exec_ctx *exec_ctx, grpc_server *s,
     888             :                                  grpc_transport *transport,
     889             :                                  grpc_channel_filter const **extra_filters,
     890             :                                  size_t num_extra_filters,
     891             :                                  const grpc_channel_args *args) {
     892        3056 :   size_t num_filters = s->channel_filter_count + num_extra_filters + 1;
     893        3056 :   grpc_channel_filter const **filters =
     894        3056 :       gpr_malloc(sizeof(grpc_channel_filter *) * num_filters);
     895             :   size_t i;
     896             :   size_t num_registered_methods;
     897             :   size_t alloc;
     898             :   registered_method *rm;
     899             :   channel_registered_method *crm;
     900             :   grpc_channel *channel;
     901             :   channel_data *chand;
     902             :   grpc_mdstr *host;
     903             :   grpc_mdstr *method;
     904             :   gpr_uint32 hash;
     905             :   size_t slots;
     906             :   gpr_uint32 probes;
     907        3015 :   gpr_uint32 max_probes = 0;
     908             :   grpc_transport_op op;
     909             : 
     910        8418 :   for (i = 0; i < s->channel_filter_count; i++) {
     911        5362 :     filters[i] = s->channel_filters[i];
     912             :   }
     913        6661 :   for (; i < s->channel_filter_count + num_extra_filters; i++) {
     914        3646 :     filters[i] = extra_filters[i - s->channel_filter_count];
     915             :   }
     916        3056 :   filters[i] = &grpc_connected_channel_filter;
     917             : 
     918        6248 :   for (i = 0; i < s->cq_count; i++) {
     919        3192 :     memset(&op, 0, sizeof(op));
     920        3192 :     op.bind_pollset = grpc_cq_pollset(s->cqs[i]);
     921        3192 :     grpc_transport_perform_op(exec_ctx, transport, &op);
     922             :   }
     923             : 
     924        3056 :   channel = grpc_channel_create_from_filters(exec_ctx, NULL, filters,
     925             :                                              num_filters, args, 0);
     926        3056 :   chand = (channel_data *)grpc_channel_stack_element(
     927             :               grpc_channel_get_channel_stack(channel), 0)->channel_data;
     928        3056 :   chand->server = s;
     929        3015 :   server_ref(s);
     930        3056 :   chand->channel = channel;
     931             : 
     932        3015 :   num_registered_methods = 0;
     933        4204 :   for (rm = s->registered_methods; rm; rm = rm->next) {
     934        1148 :     num_registered_methods++;
     935             :   }
     936             :   /* build a lookup table phrased in terms of mdstr's in this channels context
     937             :      to quickly find registered methods */
     938        3056 :   if (num_registered_methods > 0) {
     939         156 :     slots = 2 * num_registered_methods;
     940         156 :     alloc = sizeof(channel_registered_method) * slots;
     941         156 :     chand->registered_methods = gpr_malloc(alloc);
     942         156 :     memset(chand->registered_methods, 0, alloc);
     943        1304 :     for (rm = s->registered_methods; rm; rm = rm->next) {
     944        1148 :       host = rm->host ? grpc_mdstr_from_string(rm->host) : NULL;
     945        1148 :       method = grpc_mdstr_from_string(rm->method);
     946        1148 :       hash = GRPC_MDSTR_KV_HASH(host ? host->hash : 0, method->hash);
     947        4188 :       for (probes = 0; chand->registered_methods[(hash + probes) % slots]
     948        1520 :                                .server_registered_method != NULL;
     949         372 :            probes++)
     950             :         ;
     951        1148 :       if (probes > max_probes) max_probes = probes;
     952        1148 :       crm = &chand->registered_methods[(hash + probes) % slots];
     953        1148 :       crm->server_registered_method = rm;
     954        1148 :       crm->host = host;
     955        1148 :       crm->method = method;
     956             :     }
     957         156 :     GPR_ASSERT(slots <= GPR_UINT32_MAX);
     958         156 :     chand->registered_method_slots = (gpr_uint32)slots;
     959         156 :     chand->registered_method_max_probes = max_probes;
     960             :   }
     961             : 
     962        3056 :   grpc_connected_channel_bind_transport(grpc_channel_get_channel_stack(channel),
     963             :                                         transport);
     964             : 
     965        3056 :   gpr_mu_lock(&s->mu_global);
     966        3056 :   chand->next = &s->root_channel_data;
     967        3056 :   chand->prev = chand->next->prev;
     968        3056 :   chand->next->prev = chand->prev->next = chand;
     969        3056 :   gpr_mu_unlock(&s->mu_global);
     970             : 
     971        3056 :   gpr_free((void *)filters);
     972             : 
     973        3056 :   GRPC_CHANNEL_INTERNAL_REF(channel, "connectivity");
     974        3056 :   memset(&op, 0, sizeof(op));
     975        3056 :   op.set_accept_stream = accept_stream;
     976        3056 :   op.set_accept_stream_user_data = chand;
     977        3056 :   op.on_connectivity_state_change = &chand->channel_connectivity_changed;
     978        3056 :   op.connectivity_state = &chand->connectivity_state;
     979        3056 :   op.disconnect = gpr_atm_acq_load(&s->shutdown_flag) != 0;
     980        3056 :   grpc_transport_perform_op(exec_ctx, transport, &op);
     981        3056 : }
     982             : 
     983          13 : void done_published_shutdown(grpc_exec_ctx *exec_ctx, void *done_arg,
     984             :                              grpc_cq_completion *storage) {
     985             :   (void)done_arg;
     986          13 :   gpr_free(storage);
     987          13 : }
     988             : 
     989        2752 : static void listener_destroy_done(grpc_exec_ctx *exec_ctx, void *s,
     990             :                                   int success) {
     991        2724 :   grpc_server *server = s;
     992        2752 :   gpr_mu_lock(&server->mu_global);
     993        2752 :   server->listeners_destroyed++;
     994        2752 :   maybe_finish_shutdown(exec_ctx, server);
     995        2752 :   gpr_mu_unlock(&server->mu_global);
     996        2752 : }
     997             : 
     998        3527 : void grpc_server_shutdown_and_notify(grpc_server *server,
     999             :                                      grpc_completion_queue *cq, void *tag) {
    1000             :   listener *l;
    1001             :   shutdown_tag *sdt;
    1002             :   channel_broadcaster broadcaster;
    1003        3527 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
    1004             : 
    1005        3527 :   GRPC_API_TRACE("grpc_server_shutdown_and_notify(server=%p, cq=%p, tag=%p)", 3,
    1006             :                  (server, cq, tag));
    1007             : 
    1008             :   /* lock, and gather up some stuff to do */
    1009        3527 :   gpr_mu_lock(&server->mu_global);
    1010        3527 :   grpc_cq_begin_op(cq);
    1011        3527 :   if (server->shutdown_published) {
    1012          13 :     grpc_cq_end_op(&exec_ctx, cq, tag, 1, done_published_shutdown, NULL,
    1013          13 :                    gpr_malloc(sizeof(grpc_cq_completion)));
    1014          13 :     gpr_mu_unlock(&server->mu_global);
    1015          13 :     goto done;
    1016             :   }
    1017        3514 :   server->shutdown_tags =
    1018        3514 :       gpr_realloc(server->shutdown_tags,
    1019        3514 :                   sizeof(shutdown_tag) * (server->num_shutdown_tags + 1));
    1020        3514 :   sdt = &server->shutdown_tags[server->num_shutdown_tags++];
    1021        3514 :   sdt->tag = tag;
    1022        3514 :   sdt->cq = cq;
    1023        3514 :   if (gpr_atm_acq_load(&server->shutdown_flag)) {
    1024           0 :     gpr_mu_unlock(&server->mu_global);
    1025           0 :     goto done;
    1026             :   }
    1027             : 
    1028        3514 :   server->last_shutdown_message_time = gpr_now(GPR_CLOCK_REALTIME);
    1029             : 
    1030        3514 :   channel_broadcaster_init(server, &broadcaster);
    1031             : 
    1032        3514 :   gpr_atm_rel_store(&server->shutdown_flag, 1);
    1033             : 
    1034             :   /* collect all unregistered then registered calls */
    1035        3514 :   gpr_mu_lock(&server->mu_call);
    1036        3514 :   kill_pending_work_locked(&exec_ctx, server);
    1037        3514 :   gpr_mu_unlock(&server->mu_call);
    1038             : 
    1039        3514 :   maybe_finish_shutdown(&exec_ctx, server);
    1040        3514 :   gpr_mu_unlock(&server->mu_global);
    1041             : 
    1042             :   /* Shutdown listeners */
    1043        6266 :   for (l = server->listeners; l; l = l->next) {
    1044        2752 :     grpc_closure_init(&l->destroy_done, listener_destroy_done, server);
    1045        2752 :     l->destroy(&exec_ctx, server, l->arg, &l->destroy_done);
    1046             :   }
    1047             : 
    1048        3514 :   channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 1, 0);
    1049             : 
    1050             : done:
    1051        3527 :   grpc_exec_ctx_finish(&exec_ctx);
    1052        3527 : }
    1053             : 
    1054          86 : void grpc_server_cancel_all_calls(grpc_server *server) {
    1055             :   channel_broadcaster broadcaster;
    1056          86 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
    1057             : 
    1058          86 :   GRPC_API_TRACE("grpc_server_cancel_all_calls(server=%p)", 1, (server));
    1059             : 
    1060          86 :   gpr_mu_lock(&server->mu_global);
    1061          86 :   channel_broadcaster_init(server, &broadcaster);
    1062          86 :   gpr_mu_unlock(&server->mu_global);
    1063             : 
    1064          86 :   channel_broadcaster_shutdown(&exec_ctx, &broadcaster, 0, 1);
    1065          86 :   grpc_exec_ctx_finish(&exec_ctx);
    1066          86 : }
    1067             : 
    1068        3481 : void grpc_server_destroy(grpc_server *server) {
    1069             :   listener *l;
    1070        3481 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
    1071             : 
    1072        3481 :   GRPC_API_TRACE("grpc_server_destroy(server=%p)", 1, (server));
    1073             : 
    1074        3481 :   gpr_mu_lock(&server->mu_global);
    1075        3481 :   GPR_ASSERT(gpr_atm_acq_load(&server->shutdown_flag) || !server->listeners);
    1076        3490 :   GPR_ASSERT(server->listeners_destroyed == num_listeners(server));
    1077             : 
    1078        9679 :   while (server->listeners) {
    1079        2722 :     l = server->listeners;
    1080        2726 :     server->listeners = l->next;
    1081        2726 :     gpr_free(l);
    1082             :   }
    1083             : 
    1084        3481 :   gpr_mu_unlock(&server->mu_global);
    1085             : 
    1086        3481 :   server_unref(&exec_ctx, server);
    1087        3481 :   grpc_exec_ctx_finish(&exec_ctx);
    1088        3481 : }
    1089             : 
    1090        2754 : void grpc_server_add_listener(
    1091             :     grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
    1092             :     void (*start)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
    1093             :                   grpc_pollset **pollsets, size_t pollset_count),
    1094             :     void (*destroy)(grpc_exec_ctx *exec_ctx, grpc_server *server, void *arg,
    1095             :                     grpc_closure *on_done)) {
    1096        2754 :   listener *l = gpr_malloc(sizeof(listener));
    1097        2754 :   l->arg = arg;
    1098        2754 :   l->start = start;
    1099        2754 :   l->destroy = destroy;
    1100        2754 :   l->next = server->listeners;
    1101        2754 :   server->listeners = l;
    1102        2754 : }
    1103             : 
    1104     2244070 : static grpc_call_error queue_call_request(grpc_exec_ctx *exec_ctx,
    1105             :                                           grpc_server *server,
    1106             :                                           requested_call *rc) {
    1107     2243959 :   call_data *calld = NULL;
    1108     2243959 :   request_matcher *rm = NULL;
    1109             :   int request_id;
    1110     2244070 :   if (gpr_atm_acq_load(&server->shutdown_flag)) {
    1111       11593 :     fail_call(exec_ctx, server, rc);
    1112       11599 :     return GRPC_CALL_OK;
    1113             :   }
    1114     2232477 :   request_id = gpr_stack_lockfree_pop(server->request_freelist);
    1115     2232536 :   if (request_id == -1) {
    1116             :     /* out of request ids: just fail this one */
    1117           0 :     fail_call(exec_ctx, server, rc);
    1118           0 :     return GRPC_CALL_OK;
    1119             :   }
    1120     2232536 :   switch (rc->type) {
    1121             :     case BATCH_CALL:
    1122      524670 :       rm = &server->unregistered_request_matcher;
    1123      524670 :       break;
    1124             :     case REGISTERED_CALL:
    1125     1707866 :       rm = &rc->data.registered.registered_method->request_matcher;
    1126     1707866 :       break;
    1127             :   }
    1128     2232536 :   server->requested_calls[request_id] = *rc;
    1129     2232536 :   gpr_free(rc);
    1130     2232529 :   if (gpr_stack_lockfree_push(rm->requests, request_id)) {
    1131             :     /* this was the first queued request: we need to lock and start
    1132             :        matching calls */
    1133      740163 :     gpr_mu_lock(&server->mu_call);
    1134     1584973 :     while ((calld = rm->pending_head) != NULL) {
    1135      208704 :       request_id = gpr_stack_lockfree_pop(rm->requests);
    1136      208704 :       if (request_id == -1) break;
    1137      104647 :       rm->pending_head = calld->pending_next;
    1138      104647 :       gpr_mu_unlock(&server->mu_call);
    1139      104647 :       gpr_mu_lock(&calld->mu_state);
    1140      104647 :       if (calld->state == ZOMBIED) {
    1141           0 :         gpr_mu_unlock(&calld->mu_state);
    1142           0 :         grpc_closure_init(
    1143             :             &calld->kill_zombie_closure, kill_zombie,
    1144           0 :             grpc_call_stack_element(grpc_call_get_call_stack(calld->call), 0));
    1145           0 :         grpc_exec_ctx_enqueue(exec_ctx, &calld->kill_zombie_closure, 1);
    1146             :       } else {
    1147      104647 :         GPR_ASSERT(calld->state == PENDING);
    1148      104647 :         calld->state = ACTIVATED;
    1149      104647 :         gpr_mu_unlock(&calld->mu_state);
    1150      209294 :         begin_call(exec_ctx, server, calld,
    1151      209294 :                    &server->requested_calls[request_id]);
    1152             :       }
    1153      104647 :       gpr_mu_lock(&server->mu_call);
    1154             :     }
    1155      740163 :     gpr_mu_unlock(&server->mu_call);
    1156             :   }
    1157     2232425 :   return GRPC_CALL_OK;
    1158             : }
    1159             : 
    1160      524670 : grpc_call_error grpc_server_request_call(
    1161             :     grpc_server *server, grpc_call **call, grpc_call_details *details,
    1162             :     grpc_metadata_array *initial_metadata,
    1163             :     grpc_completion_queue *cq_bound_to_call,
    1164             :     grpc_completion_queue *cq_for_notification, void *tag) {
    1165             :   grpc_call_error error;
    1166      524670 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
    1167      524670 :   requested_call *rc = gpr_malloc(sizeof(*rc));
    1168      524670 :   GRPC_API_TRACE(
    1169             :       "grpc_server_request_call("
    1170             :       "server=%p, call=%p, details=%p, initial_metadata=%p, "
    1171             :       "cq_bound_to_call=%p, cq_for_notification=%p, tag=%p)",
    1172             :       7, (server, call, details, initial_metadata, cq_bound_to_call,
    1173             :           cq_for_notification, tag));
    1174      524670 :   if (!grpc_cq_is_server_cq(cq_for_notification)) {
    1175           0 :     gpr_free(rc);
    1176           0 :     error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
    1177           0 :     goto done;
    1178             :   }
    1179      524670 :   grpc_cq_begin_op(cq_for_notification);
    1180      524670 :   details->reserved = NULL;
    1181      524670 :   rc->type = BATCH_CALL;
    1182      524670 :   rc->server = server;
    1183      524670 :   rc->tag = tag;
    1184      524670 :   rc->cq_bound_to_call = cq_bound_to_call;
    1185      524670 :   rc->cq_for_notification = cq_for_notification;
    1186      524670 :   rc->call = call;
    1187      524670 :   rc->data.batch.details = details;
    1188      524670 :   rc->initial_metadata = initial_metadata;
    1189      524670 :   error = queue_call_request(&exec_ctx, server, rc);
    1190             : done:
    1191      524670 :   grpc_exec_ctx_finish(&exec_ctx);
    1192      524670 :   return error;
    1193             : }
    1194             : 
    1195     1718749 : grpc_call_error grpc_server_request_registered_call(
    1196             :     grpc_server *server, void *rmp, grpc_call **call, gpr_timespec *deadline,
    1197             :     grpc_metadata_array *initial_metadata, grpc_byte_buffer **optional_payload,
    1198             :     grpc_completion_queue *cq_bound_to_call,
    1199             :     grpc_completion_queue *cq_for_notification, void *tag) {
    1200             :   grpc_call_error error;
    1201     1718749 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
    1202     1718749 :   requested_call *rc = gpr_malloc(sizeof(*rc));
    1203     1719278 :   registered_method *rm = rmp;
    1204     1719278 :   GRPC_API_TRACE(
    1205             :       "grpc_server_request_registered_call("
    1206             :       "server=%p, rmp=%p, call=%p, deadline=%p, initial_metadata=%p, "
    1207             :       "optional_payload=%p, cq_bound_to_call=%p, cq_for_notification=%p, "
    1208             :       "tag=%p)",
    1209             :       9, (server, rmp, call, deadline, initial_metadata, optional_payload,
    1210             :           cq_bound_to_call, cq_for_notification, tag));
    1211     1719278 :   if (!grpc_cq_is_server_cq(cq_for_notification)) {
    1212           0 :     gpr_free(rc);
    1213           0 :     error = GRPC_CALL_ERROR_NOT_SERVER_COMPLETION_QUEUE;
    1214           0 :     goto done;
    1215             :   }
    1216     1719250 :   grpc_cq_begin_op(cq_for_notification);
    1217     1719404 :   rc->type = REGISTERED_CALL;
    1218     1719404 :   rc->server = server;
    1219     1719404 :   rc->tag = tag;
    1220     1719404 :   rc->cq_bound_to_call = cq_bound_to_call;
    1221     1719404 :   rc->cq_for_notification = cq_for_notification;
    1222     1719404 :   rc->call = call;
    1223     1719404 :   rc->data.registered.registered_method = rm;
    1224     1719404 :   rc->data.registered.deadline = deadline;
    1225     1719404 :   rc->initial_metadata = initial_metadata;
    1226     1719404 :   rc->data.registered.optional_payload = optional_payload;
    1227     1719404 :   error = queue_call_request(&exec_ctx, server, rc);
    1228             : done:
    1229     1719465 :   grpc_exec_ctx_finish(&exec_ctx);
    1230     1719457 :   return error;
    1231             : }
    1232             : 
    1233             : static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx,
    1234             :                                         void *user_data, int success);
    1235             : 
    1236     1048354 : static void cpstr(char **dest, size_t *capacity, grpc_mdstr *value) {
    1237     1048354 :   gpr_slice slice = value->slice;
    1238     1048354 :   size_t len = GPR_SLICE_LENGTH(slice);
    1239             : 
    1240     1048354 :   if (len + 1 > *capacity) {
    1241     1048055 :     *capacity = GPR_MAX(len + 1, *capacity * 2);
    1242     1048055 :     *dest = gpr_realloc(*dest, *capacity);
    1243             :   }
    1244     1048354 :   memcpy(*dest, grpc_mdstr_as_c_string(value), len + 1);
    1245     1048354 : }
    1246             : 
    1247     2170957 : static void begin_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
    1248             :                        call_data *calld, requested_call *rc) {
    1249             :   grpc_op ops[1];
    1250     2170870 :   grpc_op *op = ops;
    1251             : 
    1252     2170957 :   memset(ops, 0, sizeof(ops));
    1253             : 
    1254             :   /* called once initial metadata has been read by the call, but BEFORE
    1255             :      the ioreq to fetch it out of the call has been executed.
    1256             :      This means metadata related fields can be relied on in calld, but to
    1257             :      fill in the metadata array passed by the client, we need to perform
    1258             :      an ioreq op, that should complete immediately. */
    1259             : 
    1260     2170957 :   grpc_call_set_completion_queue(exec_ctx, calld->call, rc->cq_bound_to_call);
    1261     2170926 :   grpc_closure_init(&rc->publish, publish_registered_or_batch, rc);
    1262     2170868 :   *rc->call = calld->call;
    1263     2170868 :   calld->cq_new = rc->cq_for_notification;
    1264     2170868 :   GPR_SWAP(grpc_metadata_array, *rc->initial_metadata, calld->initial_metadata);
    1265     2170868 :   switch (rc->type) {
    1266             :     case BATCH_CALL:
    1267      524177 :       GPR_ASSERT(calld->host != NULL);
    1268      524177 :       GPR_ASSERT(calld->path != NULL);
    1269     1048267 :       cpstr(&rc->data.batch.details->host,
    1270      524177 :             &rc->data.batch.details->host_capacity, calld->host);
    1271     1048354 :       cpstr(&rc->data.batch.details->method,
    1272      524177 :             &rc->data.batch.details->method_capacity, calld->path);
    1273      524177 :       rc->data.batch.details->deadline = calld->deadline;
    1274      524090 :       break;
    1275             :     case REGISTERED_CALL:
    1276     1646591 :       *rc->data.registered.deadline = calld->deadline;
    1277     1646591 :       if (rc->data.registered.optional_payload) {
    1278     1646555 :         op->op = GRPC_OP_RECV_MESSAGE;
    1279     1646555 :         op->data.recv_message = rc->data.registered.optional_payload;
    1280     1646555 :         op++;
    1281             :       }
    1282     1646591 :       break;
    1283             :     default:
    1284           0 :       GPR_UNREACHABLE_CODE(return );
    1285             :   }
    1286             : 
    1287     2170868 :   GRPC_CALL_INTERNAL_REF(calld->call, "server");
    1288     4341757 :   grpc_call_start_batch_and_execute(exec_ctx, calld->call, ops,
    1289     2170922 :                                     (size_t)(op - ops), &rc->publish);
    1290     2170930 : }
    1291             : 
    1292     2244053 : static void done_request_event(grpc_exec_ctx *exec_ctx, void *req,
    1293             :                                grpc_cq_completion *c) {
    1294     2243942 :   requested_call *rc = req;
    1295     2244053 :   grpc_server *server = rc->server;
    1296             : 
    1297     4476510 :   if (rc >= server->requested_calls &&
    1298     2232457 :       rc < server->requested_calls + server->max_requested_calls) {
    1299     2232461 :     GPR_ASSERT(rc - server->requested_calls <= INT_MAX);
    1300     2232461 :     gpr_stack_lockfree_push(server->request_freelist,
    1301     2232461 :                             (int)(rc - server->requested_calls));
    1302             :   } else {
    1303       11592 :     gpr_free(req);
    1304             :   }
    1305             : 
    1306     2244136 :   server_unref(exec_ctx, server);
    1307     2244139 : }
    1308             : 
    1309       73104 : static void fail_call(grpc_exec_ctx *exec_ctx, grpc_server *server,
    1310             :                       requested_call *rc) {
    1311       73104 :   *rc->call = NULL;
    1312       73104 :   rc->initial_metadata->count = 0;
    1313             : 
    1314       73080 :   server_ref(server);
    1315       73110 :   grpc_cq_end_op(exec_ctx, rc->cq_for_notification, rc->tag, 0,
    1316             :                  done_request_event, rc, &rc->completion);
    1317       73159 : }
    1318             : 
    1319     2170925 : static void publish_registered_or_batch(grpc_exec_ctx *exec_ctx, void *prc,
    1320             :                                         int success) {
    1321     2170838 :   requested_call *rc = prc;
    1322     2170925 :   grpc_call *call = *rc->call;
    1323     2170858 :   grpc_call_element *elem =
    1324     2170925 :       grpc_call_stack_element(grpc_call_get_call_stack(call), 0);
    1325     2170861 :   call_data *calld = elem->call_data;
    1326     2170861 :   channel_data *chand = elem->channel_data;
    1327     2170861 :   server_ref(chand->server);
    1328     2170875 :   grpc_cq_end_op(exec_ctx, calld->cq_new, rc->tag, success, done_request_event,
    1329             :                  rc, &rc->completion);
    1330     2170937 :   GRPC_CALL_INTERNAL_UNREF(exec_ctx, call, "server");
    1331     2170953 : }
    1332             : 
    1333        6067 : const grpc_channel_args *grpc_server_get_channel_args(grpc_server *server) {
    1334        6067 :   return server->channel_args;
    1335             : }
    1336             : 
    1337         334 : int grpc_server_has_open_connections(grpc_server *server) {
    1338             :   int r;
    1339         334 :   gpr_mu_lock(&server->mu_global);
    1340         334 :   r = server->root_channel_data.next != &server->root_channel_data;
    1341         334 :   gpr_mu_unlock(&server->mu_global);
    1342         334 :   return r;
    1343             : }

Generated by: LCOV version 1.11