LCOV - code coverage report
Current view: top level - src/cpp/server - server.cc (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 260 273 95.2 %
Date: 2015-10-10 Functions: 48 51 94.1 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include <grpc++/server.h>
      35             : 
      36             : #include <utility>
      37             : 
      38             : #include <grpc/grpc.h>
      39             : #include <grpc/support/alloc.h>
      40             : #include <grpc/support/log.h>
      41             : #include <grpc++/completion_queue.h>
      42             : #include <grpc++/generic/async_generic_service.h>
      43             : #include <grpc++/impl/rpc_service_method.h>
      44             : #include <grpc++/impl/service_type.h>
      45             : #include <grpc++/server_context.h>
      46             : #include <grpc++/security/server_credentials.h>
      47             : #include <grpc++/support/time.h>
      48             : 
      49             : #include "src/core/profiling/timers.h"
      50             : #include "src/cpp/server/thread_pool_interface.h"
      51             : 
      52             : namespace grpc {
      53             : 
      54          42 : class Server::UnimplementedAsyncRequestContext {
      55             :  protected:
      56          42 :   UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
      57             : 
      58             :   GenericServerContext server_context_;
      59             :   GenericServerAsyncReaderWriter generic_stream_;
      60             : };
      61             : 
      62          84 : class Server::UnimplementedAsyncRequest GRPC_FINAL
      63             :     : public UnimplementedAsyncRequestContext,
      64             :       public GenericAsyncRequest {
      65             :  public:
      66          42 :   UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
      67             :       : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
      68             :                             NULL, false),
      69             :         server_(server),
      70          42 :         cq_(cq) {}
      71             : 
      72             :   bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
      73             : 
      74           2 :   ServerContext* context() { return &server_context_; }
      75           2 :   GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
      76             : 
      77             :  private:
      78             :   Server* const server_;
      79             :   ServerCompletionQueue* const cq_;
      80             : };
      81             : 
      82             : typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
      83             :     UnimplementedAsyncResponseOp;
      84             : class Server::UnimplementedAsyncResponse GRPC_FINAL
      85             :     : public UnimplementedAsyncResponseOp {
      86             :  public:
      87             :   UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
      88           4 :   ~UnimplementedAsyncResponse() { delete request_; }
      89             : 
      90           2 :   bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
      91           2 :     bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status);
      92           2 :     delete this;
      93           2 :     return r;
      94             :   }
      95             : 
      96             :  private:
      97             :   UnimplementedAsyncRequest* const request_;
      98             : };
      99             : 
     100         477 : class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
     101             :  public:
     102         159 :   bool FinalizeResult(void** tag, bool* status) {
     103         159 :     delete this;
     104         159 :     return false;
     105             :   }
     106             : };
     107             : 
     108         126 : class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
     109             :  public:
     110        1202 :   SyncRequest(RpcServiceMethod* method, void* tag)
     111             :       : method_(method),
     112             :         tag_(tag),
     113             :         in_flight_(false),
     114        1934 :         has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
     115         732 :                              method->method_type() ==
     116             :                                  RpcMethod::SERVER_STREAMING),
     117             :         call_details_(nullptr),
     118        2404 :         cq_(nullptr) {
     119        1202 :     grpc_metadata_array_init(&request_metadata_);
     120        1202 :   }
     121             : 
     122        2656 :   ~SyncRequest() {
     123        1328 :     if (call_details_) {
     124         126 :       delete call_details_;
     125             :     }
     126        1328 :     grpc_metadata_array_destroy(&request_metadata_);
     127        1328 :   }
     128             : 
     129      159751 :   static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
     130      159751 :     void* tag = nullptr;
     131      159751 :     *ok = false;
     132      159751 :     if (!cq->Next(&tag, ok)) {
     133         126 :       return nullptr;
     134             :     }
     135      159625 :     auto* mrd = static_cast<SyncRequest*>(tag);
     136      159625 :     GPR_ASSERT(mrd->in_flight_);
     137      159625 :     return mrd;
     138             :   }
     139             : 
     140        1247 :   static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
     141             :                         gpr_timespec deadline) {
     142        1247 :     void* tag = nullptr;
     143        1247 :     *ok = false;
     144        1247 :     switch (cq->AsyncNext(&tag, ok, deadline)) {
     145             :       case CompletionQueue::TIMEOUT:
     146           1 :         *req = nullptr;
     147           1 :         return true;
     148             :       case CompletionQueue::SHUTDOWN:
     149         159 :         *req = nullptr;
     150         159 :         return false;
     151             :       case CompletionQueue::GOT_EVENT:
     152        1087 :         *req = static_cast<SyncRequest*>(tag);
     153        1087 :         GPR_ASSERT((*req)->in_flight_);
     154        1087 :         return true;
     155             :     }
     156           0 :     GPR_UNREACHABLE_CODE(return false);
     157             :   }
     158             : 
     159      160714 :   void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
     160             : 
     161           2 :   void TeardownRequest() {
     162           2 :     grpc_completion_queue_destroy(cq_);
     163           2 :     cq_ = nullptr;
     164           2 :   }
     165             : 
     166      160712 :   void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
     167      160712 :     GPR_ASSERT(cq_ && !in_flight_);
     168      160712 :     in_flight_ = true;
     169      160712 :     if (tag_) {
     170      160584 :       GPR_ASSERT(GRPC_CALL_OK ==
     171             :                  grpc_server_request_registered_call(
     172             :                      server, tag_, &call_, &deadline_, &request_metadata_,
     173             :                      has_request_payload_ ? &request_payload_ : nullptr, cq_,
     174             :                      notify_cq, this));
     175             :     } else {
     176         128 :       if (!call_details_) {
     177         126 :         call_details_ = new grpc_call_details;
     178         126 :         grpc_call_details_init(call_details_);
     179             :       }
     180         128 :       GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
     181             :                                      server, &call_, call_details_,
     182             :                                      &request_metadata_, cq_, notify_cq, this));
     183             :     }
     184      160712 :   }
     185             : 
     186      160712 :   bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
     187      160712 :     if (!*status) {
     188        1200 :       grpc_completion_queue_destroy(cq_);
     189             :     }
     190      160712 :     if (call_details_) {
     191         128 :       deadline_ = call_details_->deadline;
     192         128 :       grpc_call_details_destroy(call_details_);
     193         128 :       grpc_call_details_init(call_details_);
     194             :     }
     195      160712 :     return true;
     196             :   }
     197             : 
     198             :   class CallData GRPC_FINAL {
     199             :    public:
     200      159512 :     explicit CallData(Server* server, SyncRequest* mrd)
     201             :         : cq_(mrd->cq_),
     202             :           call_(mrd->call_, server, &cq_, server->max_message_size_),
     203             :           ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
     204             :                mrd->request_metadata_.count),
     205             :           has_request_payload_(mrd->has_request_payload_),
     206             :           request_payload_(mrd->request_payload_),
     207      159512 :           method_(mrd->method_) {
     208      159512 :       ctx_.set_call(mrd->call_);
     209      159512 :       ctx_.cq_ = &cq_;
     210      159512 :       GPR_ASSERT(mrd->in_flight_);
     211      159512 :       mrd->in_flight_ = false;
     212      159512 :       mrd->request_metadata_.count = 0;
     213      159512 :     }
     214             : 
     215      319022 :     ~CallData() {
     216      159511 :       if (has_request_payload_ && request_payload_) {
     217           0 :         grpc_byte_buffer_destroy(request_payload_);
     218             :       }
     219      159512 :     }
     220             : 
     221      159494 :     void Run() {
     222      159494 :       ctx_.BeginCompletionOp(&call_);
     223      159512 :       method_->handler()->RunHandler(MethodHandler::HandlerParameter(
     224      159512 :           &call_, &ctx_, request_payload_, call_.max_message_size()));
     225      159511 :       request_payload_ = nullptr;
     226             :       void* ignored_tag;
     227             :       bool ignored_ok;
     228      159511 :       cq_.Shutdown();
     229      159512 :       GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
     230      159417 :     }
     231             : 
     232             :    private:
     233             :     CompletionQueue cq_;
     234             :     Call call_;
     235             :     ServerContext ctx_;
     236             :     const bool has_request_payload_;
     237             :     grpc_byte_buffer* request_payload_;
     238             :     RpcServiceMethod* const method_;
     239             :   };
     240             : 
     241             :  private:
     242             :   RpcServiceMethod* const method_;
     243             :   void* const tag_;
     244             :   bool in_flight_;
     245             :   const bool has_request_payload_;
     246             :   grpc_call* call_;
     247             :   grpc_call_details* call_details_;
     248             :   gpr_timespec deadline_;
     249             :   grpc_metadata_array request_metadata_;
     250             :   grpc_byte_buffer* request_payload_;
     251             :   grpc_completion_queue* cq_;
     252             : };
     253             : 
     254         159 : static grpc_server* CreateServer(
     255             :     int max_message_size, const grpc_compression_options& compression_options) {
     256             :   grpc_arg args[2];
     257         159 :   size_t args_idx = 0;
     258         159 :   if (max_message_size > 0) {
     259          82 :     args[args_idx].type = GRPC_ARG_INTEGER;
     260          82 :     args[args_idx].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
     261          82 :     args[args_idx].value.integer = max_message_size;
     262          82 :     args_idx++;
     263             :   }
     264             : 
     265         159 :   args[args_idx].type = GRPC_ARG_INTEGER;
     266         159 :   args[args_idx].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG);
     267         159 :   args[args_idx].value.integer = compression_options.enabled_algorithms_bitset;
     268         159 :   args_idx++;
     269             : 
     270         159 :   grpc_channel_args channel_args = {args_idx, args};
     271         159 :   return grpc_server_create(&channel_args, nullptr);
     272             : }
     273             : 
     274         159 : Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
     275             :                int max_message_size,
     276             :                grpc_compression_options compression_options)
     277             :     : max_message_size_(max_message_size),
     278             :       started_(false),
     279             :       shutdown_(false),
     280             :       num_running_cb_(0),
     281         159 :       sync_methods_(new std::list<SyncRequest>),
     282             :       has_generic_service_(false),
     283         159 :       server_(CreateServer(max_message_size, compression_options)),
     284             :       thread_pool_(thread_pool),
     285         477 :       thread_pool_owned_(thread_pool_owned) {
     286         159 :   grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
     287         159 : }
     288             : 
     289         477 : Server::~Server() {
     290             :   {
     291         159 :     grpc::unique_lock<grpc::mutex> lock(mu_);
     292         159 :     if (started_ && !shutdown_) {
     293          15 :       lock.unlock();
     294          15 :       Shutdown();
     295         159 :     }
     296             :   }
     297             :   void* got_tag;
     298             :   bool ok;
     299         159 :   GPR_ASSERT(!cq_.Next(&got_tag, &ok));
     300         159 :   grpc_server_destroy(server_);
     301         159 :   if (thread_pool_owned_) {
     302         126 :     delete thread_pool_;
     303             :   }
     304         159 :   delete sync_methods_;
     305         318 : }
     306             : 
     307         289 : bool Server::RegisterService(const grpc::string* host, RpcService* service) {
     308        1365 :   for (int i = 0; i < service->GetMethodCount(); ++i) {
     309        1076 :     RpcServiceMethod* method = service->GetMethod(i);
     310             :     void* tag = grpc_server_register_method(server_, method->name(),
     311        1076 :                                             host ? host->c_str() : nullptr);
     312        1076 :     if (!tag) {
     313             :       gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
     314           0 :               method->name());
     315           0 :       return false;
     316             :     }
     317        1076 :     sync_methods_->emplace_back(method, tag);
     318             :   }
     319         289 :   return true;
     320             : }
     321             : 
     322          30 : bool Server::RegisterAsyncService(const grpc::string* host,
     323             :                                   AsynchronousService* service) {
     324          30 :   GPR_ASSERT(service->server_ == nullptr &&
     325             :              "Can only register an asynchronous service against one server.");
     326          30 :   service->server_ = this;
     327          30 :   service->request_args_ = new void* [service->method_count_];
     328         168 :   for (size_t i = 0; i < service->method_count_; ++i) {
     329         138 :     void* tag = grpc_server_register_method(server_, service->method_names_[i],
     330         276 :                                             host ? host->c_str() : nullptr);
     331         138 :     if (!tag) {
     332             :       gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
     333           0 :               service->method_names_[i]);
     334           0 :       return false;
     335             :     }
     336         138 :     service->request_args_[i] = tag;
     337             :   }
     338          30 :   return true;
     339             : }
     340             : 
     341           3 : void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
     342           3 :   GPR_ASSERT(service->server_ == nullptr &&
     343             :              "Can only register an async generic service against one server.");
     344           3 :   service->server_ = this;
     345           3 :   has_generic_service_ = true;
     346           3 : }
     347             : 
     348         159 : int Server::AddListeningPort(const grpc::string& addr,
     349             :                              ServerCredentials* creds) {
     350         159 :   GPR_ASSERT(!started_);
     351         159 :   return creds->AddPortToServer(addr, server_);
     352             : }
     353             : 
     354         159 : bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
     355         159 :   GPR_ASSERT(!started_);
     356         159 :   started_ = true;
     357         159 :   grpc_server_start(server_);
     358             : 
     359         159 :   if (!has_generic_service_) {
     360         156 :     if (!sync_methods_->empty()) {
     361             :       unknown_method_.reset(new RpcServiceMethod(
     362         126 :           "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
     363             :       // Use of emplace_back with just constructor arguments is not accepted
     364             :       // here by gcc-4.4 because it can't match the anonymous nullptr with a
     365             :       // proper constructor implicitly. Construct the object and use push_back.
     366         126 :       sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
     367             :     }
     368         196 :     for (size_t i = 0; i < num_cqs; i++) {
     369          40 :       new UnimplementedAsyncRequest(this, cqs[i]);
     370             :     }
     371             :   }
     372             :   // Start processing rpcs.
     373         159 :   if (!sync_methods_->empty()) {
     374        1328 :     for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
     375        1202 :       m->SetupRequest();
     376        1202 :       m->Request(server_, cq_.cq());
     377             :     }
     378             : 
     379         126 :     ScheduleCallback();
     380             :   }
     381             : 
     382         159 :   return true;
     383             : }
     384             : 
     385         159 : void Server::ShutdownInternal(gpr_timespec deadline) {
     386         159 :   grpc::unique_lock<grpc::mutex> lock(mu_);
     387         159 :   if (started_ && !shutdown_) {
     388         159 :     shutdown_ = true;
     389         159 :     grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
     390         159 :     cq_.Shutdown();
     391             :     // Spin, eating requests until the completion queue is completely shutdown.
     392             :     // If the deadline expires then cancel anything that's pending and keep
     393             :     // spinning forever until the work is actually drained.
     394             :     // Since nothing else needs to touch state guarded by mu_, holding it
     395             :     // through this loop is fine.
     396             :     SyncRequest* request;
     397             :     bool ok;
     398        1406 :     while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
     399        1088 :       if (request == NULL) {  // deadline expired
     400           1 :         grpc_server_cancel_all_calls(server_);
     401           1 :         deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
     402        1087 :       } else if (ok) {
     403           0 :         SyncRequest::CallData call_data(this, request);
     404             :       }
     405             :     }
     406             : 
     407             :     // Wait for running callbacks to finish.
     408         505 :     while (num_running_cb_ != 0) {
     409         187 :       callback_cv_.wait(lock);
     410             :     }
     411         159 :   }
     412         159 : }
     413             : 
     414           0 : void Server::Wait() {
     415           0 :   grpc::unique_lock<grpc::mutex> lock(mu_);
     416           0 :   while (num_running_cb_ != 0) {
     417           0 :     callback_cv_.wait(lock);
     418           0 :   }
     419           0 : }
     420             : 
     421     2793600 : void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
     422             :   static const size_t MAX_OPS = 8;
     423     2793600 :   size_t nops = 0;
     424             :   grpc_op cops[MAX_OPS];
     425     2793600 :   ops->FillOps(cops, &nops);
     426     2794836 :   auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
     427     2795618 :   GPR_ASSERT(GRPC_CALL_OK == result);
     428     2795618 : }
     429             : 
     430     1245763 : Server::BaseAsyncRequest::BaseAsyncRequest(
     431             :     Server* server, ServerContext* context,
     432             :     ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
     433             :     bool delete_on_finalize)
     434             :     : server_(server),
     435             :       context_(context),
     436             :       stream_(stream),
     437             :       call_cq_(call_cq),
     438             :       tag_(tag),
     439             :       delete_on_finalize_(delete_on_finalize),
     440     1245763 :       call_(nullptr) {
     441     1245813 :   memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
     442     1245813 : }
     443             : 
     444     1245906 : Server::BaseAsyncRequest::~BaseAsyncRequest() {}
     445             : 
     446     1245671 : bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
     447     1245671 :   if (*status) {
     448     2278979 :     for (size_t i = 0; i < initial_metadata_array_.count; i++) {
     449             :       context_->client_metadata_.insert(
     450             :           std::pair<grpc::string_ref, grpc::string_ref>(
     451     1139577 :               initial_metadata_array_.metadata[i].key,
     452             :               grpc::string_ref(
     453     1139616 :                   initial_metadata_array_.metadata[i].value,
     454     3418809 :                   initial_metadata_array_.metadata[i].value_length)));
     455             :     }
     456             :   }
     457     1245427 :   grpc_metadata_array_destroy(&initial_metadata_array_);
     458     1245964 :   context_->set_call(call_);
     459     1245905 :   context_->cq_ = call_cq_;
     460     1245905 :   Call call(call_, server_, call_cq_, server_->max_message_size_);
     461     1245900 :   if (*status && call_) {
     462     1139620 :     context_->BeginCompletionOp(&call);
     463             :   }
     464             :   // just the pointers inside call are copied here
     465     1245944 :   stream_->BindCall(&call);
     466     1245880 :   *tag = tag_;
     467     1245880 :   if (delete_on_finalize_) {
     468     1245870 :     delete this;
     469             :   }
     470     1246001 :   return true;
     471             : }
     472             : 
     473     1245790 : Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
     474             :     Server* server, ServerContext* context,
     475             :     ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
     476     1245790 :     : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
     477             : 
     478     1245577 : void Server::RegisteredAsyncRequest::IssueRequest(
     479             :     void* registered_method, grpc_byte_buffer** payload,
     480             :     ServerCompletionQueue* notification_cq) {
     481             :   grpc_server_request_registered_call(
     482             :       server_->server_, registered_method, &call_, &context_->deadline_,
     483             :       &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
     484     1245577 :       this);
     485     1245982 : }
     486             : 
     487          54 : Server::GenericAsyncRequest::GenericAsyncRequest(
     488             :     Server* server, GenericServerContext* context,
     489             :     ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
     490             :     ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
     491             :     : BaseAsyncRequest(server, context, stream, call_cq, tag,
     492          54 :                        delete_on_finalize) {
     493          54 :   grpc_call_details_init(&call_details_);
     494          54 :   GPR_ASSERT(notification_cq);
     495          54 :   GPR_ASSERT(call_cq);
     496             :   grpc_server_request_call(server->server_, &call_, &call_details_,
     497             :                            &initial_metadata_array_, call_cq->cq(),
     498          54 :                            notification_cq->cq(), this);
     499          54 : }
     500             : 
     501          54 : bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
     502             :   // TODO(yangg) remove the copy here.
     503          54 :   if (*status) {
     504          14 :     static_cast<GenericServerContext*>(context_)->method_ =
     505          14 :         call_details_.method;
     506          14 :     static_cast<GenericServerContext*>(context_)->host_ = call_details_.host;
     507             :   }
     508          54 :   gpr_free(call_details_.method);
     509          54 :   gpr_free(call_details_.host);
     510          54 :   return BaseAsyncRequest::FinalizeResult(tag, status);
     511             : }
     512             : 
     513          42 : bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
     514             :                                                        bool* status) {
     515          42 :   if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
     516           2 :     new UnimplementedAsyncRequest(server_, cq_);
     517           2 :     new UnimplementedAsyncResponse(this);
     518             :   } else {
     519          40 :     delete this;
     520             :   }
     521          42 :   return false;
     522             : }
     523             : 
     524           2 : Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
     525             :     UnimplementedAsyncRequest* request)
     526           2 :     : request_(request) {
     527           2 :   Status status(StatusCode::UNIMPLEMENTED, "");
     528           2 :   UnknownMethodHandler::FillOps(request_->context(), this);
     529           2 :   request_->stream()->call_.PerformOps(this);
     530           2 : }
     531             : 
     532      159751 : void Server::ScheduleCallback() {
     533             :   {
     534      159751 :     grpc::unique_lock<grpc::mutex> lock(mu_);
     535      159751 :     num_running_cb_++;
     536             :   }
     537      159751 :   thread_pool_->Add(std::bind(&Server::RunRpc, this));
     538      159751 : }
     539             : 
     540      159751 : void Server::RunRpc() {
     541             :   // Wait for one more incoming rpc.
     542             :   bool ok;
     543      159751 :   auto* mrd = SyncRequest::Wait(&cq_, &ok);
     544      159751 :   if (mrd) {
     545      159625 :     ScheduleCallback();
     546      159626 :     if (ok) {
     547      159512 :       SyncRequest::CallData cd(this, mrd);
     548             :       {
     549      159512 :         mrd->SetupRequest();
     550      159512 :         grpc::unique_lock<grpc::mutex> lock(mu_);
     551      159512 :         if (!shutdown_) {
     552      159510 :           mrd->Request(server_, cq_.cq());
     553             :         } else {
     554             :           // destroy the structure that was created
     555           2 :           mrd->TeardownRequest();
     556      159512 :         }
     557             :       }
     558      159512 :       cd.Run();
     559             :     }
     560             :   }
     561             : 
     562             :   {
     563      159751 :     grpc::unique_lock<grpc::mutex> lock(mu_);
     564      159751 :     num_running_cb_--;
     565      159751 :     if (shutdown_) {
     566         252 :       callback_cv_.notify_all();
     567      159751 :     }
     568             :   }
     569      159751 : }
     570             : 
     571             : }  // namespace grpc

Generated by: LCOV version 1.10