Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <grpc++/server.h>
35 :
36 : #include <utility>
37 :
38 : #include <grpc/grpc.h>
39 : #include <grpc/support/alloc.h>
40 : #include <grpc/support/log.h>
41 : #include <grpc++/completion_queue.h>
42 : #include <grpc++/generic/async_generic_service.h>
43 : #include <grpc++/impl/rpc_service_method.h>
44 : #include <grpc++/impl/service_type.h>
45 : #include <grpc++/server_context.h>
46 : #include <grpc++/security/server_credentials.h>
47 : #include <grpc++/support/time.h>
48 :
49 : #include "src/core/profiling/timers.h"
50 : #include "src/cpp/server/thread_pool_interface.h"
51 :
52 : namespace grpc {
53 :
54 16 : class DefaultGlobalCallbacks GRPC_FINAL : public Server::GlobalCallbacks {
55 : public:
56 209491 : void PreSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {}
57 209475 : void PostSynchronousRequest(ServerContext* context) GRPC_OVERRIDE {}
58 : };
59 :
60 : static Server::GlobalCallbacks* g_callbacks = nullptr;
61 : static gpr_once g_once_init_callbacks = GPR_ONCE_INIT;
62 :
63 16 : static void InitGlobalCallbacks() {
64 16 : if (g_callbacks == nullptr) {
65 16 : static DefaultGlobalCallbacks default_global_callbacks;
66 16 : g_callbacks = &default_global_callbacks;
67 : }
68 16 : }
69 :
70 38 : class Server::UnimplementedAsyncRequestContext {
71 : protected:
72 38 : UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
73 :
74 : GenericServerContext server_context_;
75 : GenericServerAsyncReaderWriter generic_stream_;
76 : };
77 :
78 76 : class Server::UnimplementedAsyncRequest GRPC_FINAL
79 : : public UnimplementedAsyncRequestContext,
80 : public GenericAsyncRequest {
81 : public:
82 38 : UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
83 : : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
84 : NULL, false),
85 : server_(server),
86 38 : cq_(cq) {}
87 :
88 : bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
89 :
90 2 : ServerContext* context() { return &server_context_; }
91 2 : GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
92 :
93 : private:
94 : Server* const server_;
95 : ServerCompletionQueue* const cq_;
96 : };
97 :
98 : typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
99 : UnimplementedAsyncResponseOp;
100 : class Server::UnimplementedAsyncResponse GRPC_FINAL
101 : : public UnimplementedAsyncResponseOp {
102 : public:
103 : UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
104 4 : ~UnimplementedAsyncResponse() { delete request_; }
105 :
106 2 : bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
107 2 : bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status);
108 2 : delete this;
109 2 : return r;
110 : }
111 :
112 : private:
113 : UnimplementedAsyncRequest* const request_;
114 : };
115 :
116 480 : class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
117 : public:
118 160 : bool FinalizeResult(void** tag, bool* status) {
119 160 : delete this;
120 160 : return false;
121 : }
122 : };
123 :
124 128 : class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
125 : public:
126 1217 : SyncRequest(RpcServiceMethod* method, void* tag)
127 : : method_(method),
128 : tag_(tag),
129 : in_flight_(false),
130 1958 : has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
131 741 : method->method_type() ==
132 : RpcMethod::SERVER_STREAMING),
133 : call_details_(nullptr),
134 2434 : cq_(nullptr) {
135 1217 : grpc_metadata_array_init(&request_metadata_);
136 1217 : }
137 :
138 2690 : ~SyncRequest() {
139 1345 : if (call_details_) {
140 128 : delete call_details_;
141 : }
142 1345 : grpc_metadata_array_destroy(&request_metadata_);
143 1345 : }
144 :
145 209705 : static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
146 209705 : void* tag = nullptr;
147 209705 : *ok = false;
148 209705 : if (!cq->Next(&tag, ok)) {
149 128 : return nullptr;
150 : }
151 209577 : auto* mrd = static_cast<SyncRequest*>(tag);
152 209577 : GPR_ASSERT(mrd->in_flight_);
153 209577 : return mrd;
154 : }
155 :
156 1293 : static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
157 : gpr_timespec deadline) {
158 1293 : void* tag = nullptr;
159 1293 : *ok = false;
160 1293 : switch (cq->AsyncNext(&tag, ok, deadline)) {
161 : case CompletionQueue::TIMEOUT:
162 1 : *req = nullptr;
163 1294 : return true;
164 : case CompletionQueue::SHUTDOWN:
165 160 : *req = nullptr;
166 160 : return false;
167 : case CompletionQueue::GOT_EVENT:
168 1132 : *req = static_cast<SyncRequest*>(tag);
169 1132 : GPR_ASSERT((*req)->in_flight_);
170 1132 : return true;
171 : }
172 : GPR_UNREACHABLE_CODE(return false);
173 0 : }
174 :
175 210709 : void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
176 :
177 0 : void TeardownRequest() {
178 0 : grpc_completion_queue_destroy(cq_);
179 0 : cq_ = nullptr;
180 0 : }
181 :
182 210709 : void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
183 210709 : GPR_ASSERT(cq_ && !in_flight_);
184 210709 : in_flight_ = true;
185 210709 : if (tag_) {
186 210579 : GPR_ASSERT(GRPC_CALL_OK ==
187 : grpc_server_request_registered_call(
188 : server, tag_, &call_, &deadline_, &request_metadata_,
189 : has_request_payload_ ? &request_payload_ : nullptr, cq_,
190 : notify_cq, this));
191 : } else {
192 130 : if (!call_details_) {
193 128 : call_details_ = new grpc_call_details;
194 128 : grpc_call_details_init(call_details_);
195 : }
196 130 : GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
197 : server, &call_, call_details_,
198 : &request_metadata_, cq_, notify_cq, this));
199 : }
200 210709 : }
201 :
202 210709 : bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
203 210709 : if (!*status) {
204 1216 : grpc_completion_queue_destroy(cq_);
205 : }
206 210709 : if (call_details_) {
207 130 : deadline_ = call_details_->deadline;
208 130 : grpc_call_details_destroy(call_details_);
209 130 : grpc_call_details_init(call_details_);
210 : }
211 210709 : return true;
212 : }
213 :
214 : class CallData GRPC_FINAL {
215 : public:
216 209493 : explicit CallData(Server* server, SyncRequest* mrd)
217 : : cq_(mrd->cq_),
218 : call_(mrd->call_, server, &cq_, server->max_message_size_),
219 : ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
220 : mrd->request_metadata_.count),
221 : has_request_payload_(mrd->has_request_payload_),
222 : request_payload_(mrd->request_payload_),
223 209493 : method_(mrd->method_) {
224 209493 : ctx_.set_call(mrd->call_);
225 209493 : ctx_.cq_ = &cq_;
226 209493 : GPR_ASSERT(mrd->in_flight_);
227 209493 : mrd->in_flight_ = false;
228 209493 : mrd->request_metadata_.count = 0;
229 209493 : }
230 :
231 418560 : ~CallData() {
232 209280 : if (has_request_payload_ && request_payload_) {
233 0 : grpc_byte_buffer_destroy(request_payload_);
234 : }
235 209493 : }
236 :
237 209492 : void Run() {
238 209492 : ctx_.BeginCompletionOp(&call_);
239 209491 : g_callbacks->PreSynchronousRequest(&ctx_);
240 209491 : method_->handler()->RunHandler(MethodHandler::HandlerParameter(
241 209491 : &call_, &ctx_, request_payload_, call_.max_message_size()));
242 209393 : g_callbacks->PostSynchronousRequest(&ctx_);
243 209491 : request_payload_ = nullptr;
244 : void* ignored_tag;
245 : bool ignored_ok;
246 209491 : cq_.Shutdown();
247 209492 : GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
248 209492 : }
249 :
250 : private:
251 : CompletionQueue cq_;
252 : Call call_;
253 : ServerContext ctx_;
254 : const bool has_request_payload_;
255 : grpc_byte_buffer* request_payload_;
256 : RpcServiceMethod* const method_;
257 : };
258 :
259 : private:
260 : RpcServiceMethod* const method_;
261 : void* const tag_;
262 : bool in_flight_;
263 : const bool has_request_payload_;
264 : grpc_call* call_;
265 : grpc_call_details* call_details_;
266 : gpr_timespec deadline_;
267 : grpc_metadata_array request_metadata_;
268 : grpc_byte_buffer* request_payload_;
269 : grpc_completion_queue* cq_;
270 : };
271 :
272 160 : static grpc_server* CreateServer(const ChannelArguments& args) {
273 : grpc_channel_args channel_args;
274 160 : args.SetChannelArgs(&channel_args);
275 160 : return grpc_server_create(&channel_args, nullptr);
276 : }
277 :
278 160 : Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
279 : int max_message_size, const ChannelArguments& args)
280 : : max_message_size_(max_message_size),
281 : started_(false),
282 : shutdown_(false),
283 : num_running_cb_(0),
284 160 : sync_methods_(new std::list<SyncRequest>),
285 : has_generic_service_(false),
286 160 : server_(CreateServer(args)),
287 : thread_pool_(thread_pool),
288 480 : thread_pool_owned_(thread_pool_owned) {
289 160 : gpr_once_init(&g_once_init_callbacks, InitGlobalCallbacks);
290 160 : grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
291 160 : }
292 :
293 480 : Server::~Server() {
294 : {
295 160 : grpc::unique_lock<grpc::mutex> lock(mu_);
296 160 : if (started_ && !shutdown_) {
297 16 : lock.unlock();
298 16 : Shutdown();
299 160 : }
300 : }
301 : void* got_tag;
302 : bool ok;
303 160 : GPR_ASSERT(!cq_.Next(&got_tag, &ok));
304 160 : grpc_server_destroy(server_);
305 160 : if (thread_pool_owned_) {
306 128 : delete thread_pool_;
307 : }
308 160 : delete sync_methods_;
309 320 : }
310 :
311 0 : void Server::SetGlobalCallbacks(GlobalCallbacks* callbacks) {
312 0 : GPR_ASSERT(g_callbacks == nullptr);
313 0 : GPR_ASSERT(callbacks != nullptr);
314 0 : g_callbacks = callbacks;
315 0 : }
316 :
317 293 : bool Server::RegisterService(const grpc::string* host, RpcService* service) {
318 1382 : for (int i = 0; i < service->GetMethodCount(); ++i) {
319 1089 : RpcServiceMethod* method = service->GetMethod(i);
320 : void* tag = grpc_server_register_method(server_, method->name(),
321 1089 : host ? host->c_str() : nullptr);
322 1089 : if (!tag) {
323 : gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
324 0 : method->name());
325 0 : return false;
326 : }
327 1089 : sync_methods_->emplace_back(method, tag);
328 : }
329 293 : return true;
330 : }
331 :
332 29 : bool Server::RegisterAsyncService(const grpc::string* host,
333 : AsynchronousService* service) {
334 29 : GPR_ASSERT(service->server_ == nullptr &&
335 : "Can only register an asynchronous service against one server.");
336 29 : service->server_ = this;
337 29 : service->request_args_ = new void* [service->method_count_];
338 165 : for (size_t i = 0; i < service->method_count_; ++i) {
339 136 : void* tag = grpc_server_register_method(server_, service->method_names_[i],
340 272 : host ? host->c_str() : nullptr);
341 136 : if (!tag) {
342 : gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
343 0 : service->method_names_[i]);
344 0 : return false;
345 : }
346 136 : service->request_args_[i] = tag;
347 : }
348 29 : return true;
349 : }
350 :
351 3 : void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
352 3 : GPR_ASSERT(service->server_ == nullptr &&
353 : "Can only register an async generic service against one server.");
354 3 : service->server_ = this;
355 3 : has_generic_service_ = true;
356 3 : }
357 :
358 160 : int Server::AddListeningPort(const grpc::string& addr,
359 : ServerCredentials* creds) {
360 160 : GPR_ASSERT(!started_);
361 160 : return creds->AddPortToServer(addr, server_);
362 : }
363 :
364 160 : bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
365 160 : GPR_ASSERT(!started_);
366 160 : started_ = true;
367 160 : grpc_server_start(server_);
368 :
369 160 : if (!has_generic_service_) {
370 157 : if (!sync_methods_->empty()) {
371 : unknown_method_.reset(new RpcServiceMethod(
372 128 : "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
373 : // Use of emplace_back with just constructor arguments is not accepted
374 : // here by gcc-4.4 because it can't match the anonymous nullptr with a
375 : // proper constructor implicitly. Construct the object and use push_back.
376 128 : sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
377 : }
378 193 : for (size_t i = 0; i < num_cqs; i++) {
379 36 : new UnimplementedAsyncRequest(this, cqs[i]);
380 : }
381 : }
382 : // Start processing rpcs.
383 160 : if (!sync_methods_->empty()) {
384 1345 : for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
385 1217 : m->SetupRequest();
386 1217 : m->Request(server_, cq_.cq());
387 : }
388 :
389 128 : ScheduleCallback();
390 : }
391 :
392 160 : return true;
393 : }
394 :
395 160 : void Server::ShutdownInternal(gpr_timespec deadline) {
396 160 : grpc::unique_lock<grpc::mutex> lock(mu_);
397 160 : if (started_ && !shutdown_) {
398 160 : shutdown_ = true;
399 160 : grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
400 160 : cq_.Shutdown();
401 160 : lock.unlock();
402 : // Spin, eating requests until the completion queue is completely shutdown.
403 : // If the deadline expires then cancel anything that's pending and keep
404 : // spinning forever until the work is actually drained.
405 : // Since nothing else needs to touch state guarded by mu_, holding it
406 : // through this loop is fine.
407 : SyncRequest* request;
408 : bool ok;
409 1453 : while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
410 1133 : if (request == NULL) { // deadline expired
411 1 : grpc_server_cancel_all_calls(server_);
412 1 : deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
413 1132 : } else if (ok) {
414 1 : SyncRequest::CallData call_data(this, request);
415 : }
416 : }
417 160 : lock.lock();
418 :
419 : // Wait for running callbacks to finish.
420 484 : while (num_running_cb_ != 0) {
421 164 : callback_cv_.wait(lock);
422 : }
423 160 : }
424 160 : }
425 :
426 0 : void Server::Wait() {
427 0 : grpc::unique_lock<grpc::mutex> lock(mu_);
428 0 : while (num_running_cb_ != 0) {
429 0 : callback_cv_.wait(lock);
430 0 : }
431 0 : }
432 :
433 3754083 : void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
434 : static const size_t MAX_OPS = 8;
435 3754083 : size_t nops = 0;
436 : grpc_op cops[MAX_OPS];
437 3754083 : ops->FillOps(cops, &nops);
438 3754684 : auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
439 3754724 : GPR_ASSERT(GRPC_CALL_OK == result);
440 3754724 : }
441 :
442 1508425 : Server::BaseAsyncRequest::BaseAsyncRequest(
443 : Server* server, ServerContext* context,
444 : ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
445 : bool delete_on_finalize)
446 : : server_(server),
447 : context_(context),
448 : stream_(stream),
449 : call_cq_(call_cq),
450 : tag_(tag),
451 : delete_on_finalize_(delete_on_finalize),
452 1508425 : call_(nullptr) {
453 1508619 : memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
454 1508619 : }
455 :
456 1508722 : Server::BaseAsyncRequest::~BaseAsyncRequest() {}
457 :
458 1508421 : bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
459 1508421 : if (*status) {
460 2874237 : for (size_t i = 0; i < initial_metadata_array_.count; i++) {
461 : context_->client_metadata_.insert(
462 : std::pair<grpc::string_ref, grpc::string_ref>(
463 1436998 : initial_metadata_array_.metadata[i].key,
464 : grpc::string_ref(
465 1437227 : initial_metadata_array_.metadata[i].value,
466 4311452 : initial_metadata_array_.metadata[i].value_length)));
467 : }
468 : }
469 1508211 : grpc_metadata_array_destroy(&initial_metadata_array_);
470 1508871 : context_->set_call(call_);
471 1508705 : context_->cq_ = call_cq_;
472 1508705 : Call call(call_, server_, call_cq_, server_->max_message_size_);
473 1508707 : if (*status && call_) {
474 1437098 : context_->BeginCompletionOp(&call);
475 : }
476 : // just the pointers inside call are copied here
477 1508735 : stream_->BindCall(&call);
478 1508772 : *tag = tag_;
479 1508772 : if (delete_on_finalize_) {
480 1508744 : delete this;
481 : }
482 1508889 : return true;
483 : }
484 :
485 1508590 : Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
486 : Server* server, ServerContext* context,
487 : ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
488 1508590 : : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
489 :
490 1508039 : void Server::RegisteredAsyncRequest::IssueRequest(
491 : void* registered_method, grpc_byte_buffer** payload,
492 : ServerCompletionQueue* notification_cq) {
493 : grpc_server_request_registered_call(
494 : server_->server_, registered_method, &call_, &context_->deadline_,
495 : &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
496 1508039 : this);
497 1508876 : }
498 :
499 50 : Server::GenericAsyncRequest::GenericAsyncRequest(
500 : Server* server, GenericServerContext* context,
501 : ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
502 : ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
503 : : BaseAsyncRequest(server, context, stream, call_cq, tag,
504 50 : delete_on_finalize) {
505 50 : grpc_call_details_init(&call_details_);
506 50 : GPR_ASSERT(notification_cq);
507 50 : GPR_ASSERT(call_cq);
508 : grpc_server_request_call(server->server_, &call_, &call_details_,
509 : &initial_metadata_array_, call_cq->cq(),
510 50 : notification_cq->cq(), this);
511 50 : }
512 :
513 50 : bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
514 : // TODO(yangg) remove the copy here.
515 50 : if (*status) {
516 14 : static_cast<GenericServerContext*>(context_)->method_ =
517 14 : call_details_.method;
518 14 : static_cast<GenericServerContext*>(context_)->host_ = call_details_.host;
519 : }
520 50 : gpr_free(call_details_.method);
521 50 : gpr_free(call_details_.host);
522 50 : return BaseAsyncRequest::FinalizeResult(tag, status);
523 : }
524 :
525 38 : bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
526 : bool* status) {
527 38 : if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
528 2 : new UnimplementedAsyncRequest(server_, cq_);
529 2 : new UnimplementedAsyncResponse(this);
530 : } else {
531 36 : delete this;
532 : }
533 38 : return false;
534 : }
535 :
536 2 : Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
537 : UnimplementedAsyncRequest* request)
538 2 : : request_(request) {
539 2 : Status status(StatusCode::UNIMPLEMENTED, "");
540 2 : UnknownMethodHandler::FillOps(request_->context(), this);
541 2 : request_->stream()->call_.PerformOps(this);
542 2 : }
543 :
544 209705 : void Server::ScheduleCallback() {
545 : {
546 209705 : grpc::unique_lock<grpc::mutex> lock(mu_);
547 209705 : num_running_cb_++;
548 : }
549 209705 : thread_pool_->Add(std::bind(&Server::RunRpc, this));
550 209705 : }
551 :
552 209705 : void Server::RunRpc() {
553 : // Wait for one more incoming rpc.
554 : bool ok;
555 : GPR_TIMER_SCOPE("Server::RunRpc", 0);
556 209705 : auto* mrd = SyncRequest::Wait(&cq_, &ok);
557 209705 : if (mrd) {
558 209577 : ScheduleCallback();
559 209737 : if (ok) {
560 209492 : SyncRequest::CallData cd(this, mrd);
561 : {
562 209492 : mrd->SetupRequest();
563 209492 : grpc::unique_lock<grpc::mutex> lock(mu_);
564 209492 : if (!shutdown_) {
565 209492 : mrd->Request(server_, cq_.cq());
566 : } else {
567 : // destroy the structure that was created
568 0 : mrd->TeardownRequest();
569 209492 : }
570 : }
571 : GPR_TIMER_SCOPE("cd.Run()", 0);
572 209492 : cd.Run();
573 : }
574 : }
575 :
576 : {
577 209705 : grpc::unique_lock<grpc::mutex> lock(mu_);
578 209705 : num_running_cb_--;
579 209705 : if (shutdown_) {
580 266 : callback_cv_.notify_all();
581 209705 : }
582 : }
583 209705 : }
584 :
585 : } // namespace grpc
|