Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <grpc++/server.h>
35 :
36 : #include <utility>
37 :
38 : #include <grpc/grpc.h>
39 : #include <grpc/support/alloc.h>
40 : #include <grpc/support/log.h>
41 : #include <grpc++/completion_queue.h>
42 : #include <grpc++/generic/async_generic_service.h>
43 : #include <grpc++/impl/rpc_service_method.h>
44 : #include <grpc++/impl/service_type.h>
45 : #include <grpc++/server_context.h>
46 : #include <grpc++/security/server_credentials.h>
47 : #include <grpc++/support/time.h>
48 :
49 : #include "src/core/profiling/timers.h"
50 : #include "src/cpp/server/thread_pool_interface.h"
51 :
52 : namespace grpc {
53 :
54 42 : class Server::UnimplementedAsyncRequestContext {
55 : protected:
56 42 : UnimplementedAsyncRequestContext() : generic_stream_(&server_context_) {}
57 :
58 : GenericServerContext server_context_;
59 : GenericServerAsyncReaderWriter generic_stream_;
60 : };
61 :
62 84 : class Server::UnimplementedAsyncRequest GRPC_FINAL
63 : : public UnimplementedAsyncRequestContext,
64 : public GenericAsyncRequest {
65 : public:
66 42 : UnimplementedAsyncRequest(Server* server, ServerCompletionQueue* cq)
67 : : GenericAsyncRequest(server, &server_context_, &generic_stream_, cq, cq,
68 : NULL, false),
69 : server_(server),
70 42 : cq_(cq) {}
71 :
72 : bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
73 :
74 2 : ServerContext* context() { return &server_context_; }
75 2 : GenericServerAsyncReaderWriter* stream() { return &generic_stream_; }
76 :
77 : private:
78 : Server* const server_;
79 : ServerCompletionQueue* const cq_;
80 : };
81 :
82 : typedef SneakyCallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus>
83 : UnimplementedAsyncResponseOp;
84 : class Server::UnimplementedAsyncResponse GRPC_FINAL
85 : : public UnimplementedAsyncResponseOp {
86 : public:
87 : UnimplementedAsyncResponse(UnimplementedAsyncRequest* request);
88 4 : ~UnimplementedAsyncResponse() { delete request_; }
89 :
90 2 : bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
91 2 : bool r = UnimplementedAsyncResponseOp::FinalizeResult(tag, status);
92 2 : delete this;
93 2 : return r;
94 : }
95 :
96 : private:
97 : UnimplementedAsyncRequest* const request_;
98 : };
99 :
100 477 : class Server::ShutdownRequest GRPC_FINAL : public CompletionQueueTag {
101 : public:
102 159 : bool FinalizeResult(void** tag, bool* status) {
103 159 : delete this;
104 159 : return false;
105 : }
106 : };
107 :
108 126 : class Server::SyncRequest GRPC_FINAL : public CompletionQueueTag {
109 : public:
110 1202 : SyncRequest(RpcServiceMethod* method, void* tag)
111 : : method_(method),
112 : tag_(tag),
113 : in_flight_(false),
114 1934 : has_request_payload_(method->method_type() == RpcMethod::NORMAL_RPC ||
115 732 : method->method_type() ==
116 : RpcMethod::SERVER_STREAMING),
117 : call_details_(nullptr),
118 2404 : cq_(nullptr) {
119 1202 : grpc_metadata_array_init(&request_metadata_);
120 1202 : }
121 :
122 2656 : ~SyncRequest() {
123 1328 : if (call_details_) {
124 126 : delete call_details_;
125 : }
126 1328 : grpc_metadata_array_destroy(&request_metadata_);
127 1328 : }
128 :
129 159751 : static SyncRequest* Wait(CompletionQueue* cq, bool* ok) {
130 159751 : void* tag = nullptr;
131 159751 : *ok = false;
132 159751 : if (!cq->Next(&tag, ok)) {
133 126 : return nullptr;
134 : }
135 159625 : auto* mrd = static_cast<SyncRequest*>(tag);
136 159625 : GPR_ASSERT(mrd->in_flight_);
137 159625 : return mrd;
138 : }
139 :
140 1247 : static bool AsyncWait(CompletionQueue* cq, SyncRequest** req, bool* ok,
141 : gpr_timespec deadline) {
142 1247 : void* tag = nullptr;
143 1247 : *ok = false;
144 1247 : switch (cq->AsyncNext(&tag, ok, deadline)) {
145 : case CompletionQueue::TIMEOUT:
146 1 : *req = nullptr;
147 1 : return true;
148 : case CompletionQueue::SHUTDOWN:
149 159 : *req = nullptr;
150 159 : return false;
151 : case CompletionQueue::GOT_EVENT:
152 1087 : *req = static_cast<SyncRequest*>(tag);
153 1087 : GPR_ASSERT((*req)->in_flight_);
154 1087 : return true;
155 : }
156 0 : GPR_UNREACHABLE_CODE(return false);
157 : }
158 :
159 160714 : void SetupRequest() { cq_ = grpc_completion_queue_create(nullptr); }
160 :
161 2 : void TeardownRequest() {
162 2 : grpc_completion_queue_destroy(cq_);
163 2 : cq_ = nullptr;
164 2 : }
165 :
166 160712 : void Request(grpc_server* server, grpc_completion_queue* notify_cq) {
167 160712 : GPR_ASSERT(cq_ && !in_flight_);
168 160712 : in_flight_ = true;
169 160712 : if (tag_) {
170 160584 : GPR_ASSERT(GRPC_CALL_OK ==
171 : grpc_server_request_registered_call(
172 : server, tag_, &call_, &deadline_, &request_metadata_,
173 : has_request_payload_ ? &request_payload_ : nullptr, cq_,
174 : notify_cq, this));
175 : } else {
176 128 : if (!call_details_) {
177 126 : call_details_ = new grpc_call_details;
178 126 : grpc_call_details_init(call_details_);
179 : }
180 128 : GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
181 : server, &call_, call_details_,
182 : &request_metadata_, cq_, notify_cq, this));
183 : }
184 160712 : }
185 :
186 160712 : bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
187 160712 : if (!*status) {
188 1200 : grpc_completion_queue_destroy(cq_);
189 : }
190 160712 : if (call_details_) {
191 128 : deadline_ = call_details_->deadline;
192 128 : grpc_call_details_destroy(call_details_);
193 128 : grpc_call_details_init(call_details_);
194 : }
195 160712 : return true;
196 : }
197 :
198 : class CallData GRPC_FINAL {
199 : public:
200 159512 : explicit CallData(Server* server, SyncRequest* mrd)
201 : : cq_(mrd->cq_),
202 : call_(mrd->call_, server, &cq_, server->max_message_size_),
203 : ctx_(mrd->deadline_, mrd->request_metadata_.metadata,
204 : mrd->request_metadata_.count),
205 : has_request_payload_(mrd->has_request_payload_),
206 : request_payload_(mrd->request_payload_),
207 159512 : method_(mrd->method_) {
208 159512 : ctx_.set_call(mrd->call_);
209 159512 : ctx_.cq_ = &cq_;
210 159512 : GPR_ASSERT(mrd->in_flight_);
211 159512 : mrd->in_flight_ = false;
212 159512 : mrd->request_metadata_.count = 0;
213 159512 : }
214 :
215 319022 : ~CallData() {
216 159511 : if (has_request_payload_ && request_payload_) {
217 0 : grpc_byte_buffer_destroy(request_payload_);
218 : }
219 159512 : }
220 :
221 159494 : void Run() {
222 159494 : ctx_.BeginCompletionOp(&call_);
223 159512 : method_->handler()->RunHandler(MethodHandler::HandlerParameter(
224 159512 : &call_, &ctx_, request_payload_, call_.max_message_size()));
225 159511 : request_payload_ = nullptr;
226 : void* ignored_tag;
227 : bool ignored_ok;
228 159511 : cq_.Shutdown();
229 159512 : GPR_ASSERT(cq_.Next(&ignored_tag, &ignored_ok) == false);
230 159417 : }
231 :
232 : private:
233 : CompletionQueue cq_;
234 : Call call_;
235 : ServerContext ctx_;
236 : const bool has_request_payload_;
237 : grpc_byte_buffer* request_payload_;
238 : RpcServiceMethod* const method_;
239 : };
240 :
241 : private:
242 : RpcServiceMethod* const method_;
243 : void* const tag_;
244 : bool in_flight_;
245 : const bool has_request_payload_;
246 : grpc_call* call_;
247 : grpc_call_details* call_details_;
248 : gpr_timespec deadline_;
249 : grpc_metadata_array request_metadata_;
250 : grpc_byte_buffer* request_payload_;
251 : grpc_completion_queue* cq_;
252 : };
253 :
254 159 : static grpc_server* CreateServer(
255 : int max_message_size, const grpc_compression_options& compression_options) {
256 : grpc_arg args[2];
257 159 : size_t args_idx = 0;
258 159 : if (max_message_size > 0) {
259 82 : args[args_idx].type = GRPC_ARG_INTEGER;
260 82 : args[args_idx].key = const_cast<char*>(GRPC_ARG_MAX_MESSAGE_LENGTH);
261 82 : args[args_idx].value.integer = max_message_size;
262 82 : args_idx++;
263 : }
264 :
265 159 : args[args_idx].type = GRPC_ARG_INTEGER;
266 159 : args[args_idx].key = const_cast<char*>(GRPC_COMPRESSION_ALGORITHM_STATE_ARG);
267 159 : args[args_idx].value.integer = compression_options.enabled_algorithms_bitset;
268 159 : args_idx++;
269 :
270 159 : grpc_channel_args channel_args = {args_idx, args};
271 159 : return grpc_server_create(&channel_args, nullptr);
272 : }
273 :
274 159 : Server::Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
275 : int max_message_size,
276 : grpc_compression_options compression_options)
277 : : max_message_size_(max_message_size),
278 : started_(false),
279 : shutdown_(false),
280 : num_running_cb_(0),
281 159 : sync_methods_(new std::list<SyncRequest>),
282 : has_generic_service_(false),
283 159 : server_(CreateServer(max_message_size, compression_options)),
284 : thread_pool_(thread_pool),
285 477 : thread_pool_owned_(thread_pool_owned) {
286 159 : grpc_server_register_completion_queue(server_, cq_.cq(), nullptr);
287 159 : }
288 :
289 477 : Server::~Server() {
290 : {
291 159 : grpc::unique_lock<grpc::mutex> lock(mu_);
292 159 : if (started_ && !shutdown_) {
293 15 : lock.unlock();
294 15 : Shutdown();
295 159 : }
296 : }
297 : void* got_tag;
298 : bool ok;
299 159 : GPR_ASSERT(!cq_.Next(&got_tag, &ok));
300 159 : grpc_server_destroy(server_);
301 159 : if (thread_pool_owned_) {
302 126 : delete thread_pool_;
303 : }
304 159 : delete sync_methods_;
305 318 : }
306 :
307 289 : bool Server::RegisterService(const grpc::string* host, RpcService* service) {
308 1365 : for (int i = 0; i < service->GetMethodCount(); ++i) {
309 1076 : RpcServiceMethod* method = service->GetMethod(i);
310 : void* tag = grpc_server_register_method(server_, method->name(),
311 1076 : host ? host->c_str() : nullptr);
312 1076 : if (!tag) {
313 : gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
314 0 : method->name());
315 0 : return false;
316 : }
317 1076 : sync_methods_->emplace_back(method, tag);
318 : }
319 289 : return true;
320 : }
321 :
322 30 : bool Server::RegisterAsyncService(const grpc::string* host,
323 : AsynchronousService* service) {
324 30 : GPR_ASSERT(service->server_ == nullptr &&
325 : "Can only register an asynchronous service against one server.");
326 30 : service->server_ = this;
327 30 : service->request_args_ = new void* [service->method_count_];
328 168 : for (size_t i = 0; i < service->method_count_; ++i) {
329 138 : void* tag = grpc_server_register_method(server_, service->method_names_[i],
330 276 : host ? host->c_str() : nullptr);
331 138 : if (!tag) {
332 : gpr_log(GPR_DEBUG, "Attempt to register %s multiple times",
333 0 : service->method_names_[i]);
334 0 : return false;
335 : }
336 138 : service->request_args_[i] = tag;
337 : }
338 30 : return true;
339 : }
340 :
341 3 : void Server::RegisterAsyncGenericService(AsyncGenericService* service) {
342 3 : GPR_ASSERT(service->server_ == nullptr &&
343 : "Can only register an async generic service against one server.");
344 3 : service->server_ = this;
345 3 : has_generic_service_ = true;
346 3 : }
347 :
348 159 : int Server::AddListeningPort(const grpc::string& addr,
349 : ServerCredentials* creds) {
350 159 : GPR_ASSERT(!started_);
351 159 : return creds->AddPortToServer(addr, server_);
352 : }
353 :
354 159 : bool Server::Start(ServerCompletionQueue** cqs, size_t num_cqs) {
355 159 : GPR_ASSERT(!started_);
356 159 : started_ = true;
357 159 : grpc_server_start(server_);
358 :
359 159 : if (!has_generic_service_) {
360 156 : if (!sync_methods_->empty()) {
361 : unknown_method_.reset(new RpcServiceMethod(
362 126 : "unknown", RpcMethod::BIDI_STREAMING, new UnknownMethodHandler));
363 : // Use of emplace_back with just constructor arguments is not accepted
364 : // here by gcc-4.4 because it can't match the anonymous nullptr with a
365 : // proper constructor implicitly. Construct the object and use push_back.
366 126 : sync_methods_->push_back(SyncRequest(unknown_method_.get(), nullptr));
367 : }
368 196 : for (size_t i = 0; i < num_cqs; i++) {
369 40 : new UnimplementedAsyncRequest(this, cqs[i]);
370 : }
371 : }
372 : // Start processing rpcs.
373 159 : if (!sync_methods_->empty()) {
374 1328 : for (auto m = sync_methods_->begin(); m != sync_methods_->end(); m++) {
375 1202 : m->SetupRequest();
376 1202 : m->Request(server_, cq_.cq());
377 : }
378 :
379 126 : ScheduleCallback();
380 : }
381 :
382 159 : return true;
383 : }
384 :
385 159 : void Server::ShutdownInternal(gpr_timespec deadline) {
386 159 : grpc::unique_lock<grpc::mutex> lock(mu_);
387 159 : if (started_ && !shutdown_) {
388 159 : shutdown_ = true;
389 159 : grpc_server_shutdown_and_notify(server_, cq_.cq(), new ShutdownRequest());
390 159 : cq_.Shutdown();
391 : // Spin, eating requests until the completion queue is completely shutdown.
392 : // If the deadline expires then cancel anything that's pending and keep
393 : // spinning forever until the work is actually drained.
394 : // Since nothing else needs to touch state guarded by mu_, holding it
395 : // through this loop is fine.
396 : SyncRequest* request;
397 : bool ok;
398 1406 : while (SyncRequest::AsyncWait(&cq_, &request, &ok, deadline)) {
399 1088 : if (request == NULL) { // deadline expired
400 1 : grpc_server_cancel_all_calls(server_);
401 1 : deadline = gpr_inf_future(GPR_CLOCK_MONOTONIC);
402 1087 : } else if (ok) {
403 0 : SyncRequest::CallData call_data(this, request);
404 : }
405 : }
406 :
407 : // Wait for running callbacks to finish.
408 505 : while (num_running_cb_ != 0) {
409 187 : callback_cv_.wait(lock);
410 : }
411 159 : }
412 159 : }
413 :
414 0 : void Server::Wait() {
415 0 : grpc::unique_lock<grpc::mutex> lock(mu_);
416 0 : while (num_running_cb_ != 0) {
417 0 : callback_cv_.wait(lock);
418 0 : }
419 0 : }
420 :
421 2793600 : void Server::PerformOpsOnCall(CallOpSetInterface* ops, Call* call) {
422 : static const size_t MAX_OPS = 8;
423 2793600 : size_t nops = 0;
424 : grpc_op cops[MAX_OPS];
425 2793600 : ops->FillOps(cops, &nops);
426 2794836 : auto result = grpc_call_start_batch(call->call(), cops, nops, ops, nullptr);
427 2795618 : GPR_ASSERT(GRPC_CALL_OK == result);
428 2795618 : }
429 :
430 1245763 : Server::BaseAsyncRequest::BaseAsyncRequest(
431 : Server* server, ServerContext* context,
432 : ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag,
433 : bool delete_on_finalize)
434 : : server_(server),
435 : context_(context),
436 : stream_(stream),
437 : call_cq_(call_cq),
438 : tag_(tag),
439 : delete_on_finalize_(delete_on_finalize),
440 1245763 : call_(nullptr) {
441 1245813 : memset(&initial_metadata_array_, 0, sizeof(initial_metadata_array_));
442 1245813 : }
443 :
444 1245906 : Server::BaseAsyncRequest::~BaseAsyncRequest() {}
445 :
446 1245671 : bool Server::BaseAsyncRequest::FinalizeResult(void** tag, bool* status) {
447 1245671 : if (*status) {
448 2278979 : for (size_t i = 0; i < initial_metadata_array_.count; i++) {
449 : context_->client_metadata_.insert(
450 : std::pair<grpc::string_ref, grpc::string_ref>(
451 1139577 : initial_metadata_array_.metadata[i].key,
452 : grpc::string_ref(
453 1139616 : initial_metadata_array_.metadata[i].value,
454 3418809 : initial_metadata_array_.metadata[i].value_length)));
455 : }
456 : }
457 1245427 : grpc_metadata_array_destroy(&initial_metadata_array_);
458 1245964 : context_->set_call(call_);
459 1245905 : context_->cq_ = call_cq_;
460 1245905 : Call call(call_, server_, call_cq_, server_->max_message_size_);
461 1245900 : if (*status && call_) {
462 1139620 : context_->BeginCompletionOp(&call);
463 : }
464 : // just the pointers inside call are copied here
465 1245944 : stream_->BindCall(&call);
466 1245880 : *tag = tag_;
467 1245880 : if (delete_on_finalize_) {
468 1245870 : delete this;
469 : }
470 1246001 : return true;
471 : }
472 :
473 1245790 : Server::RegisteredAsyncRequest::RegisteredAsyncRequest(
474 : Server* server, ServerContext* context,
475 : ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq, void* tag)
476 1245790 : : BaseAsyncRequest(server, context, stream, call_cq, tag, true) {}
477 :
478 1245577 : void Server::RegisteredAsyncRequest::IssueRequest(
479 : void* registered_method, grpc_byte_buffer** payload,
480 : ServerCompletionQueue* notification_cq) {
481 : grpc_server_request_registered_call(
482 : server_->server_, registered_method, &call_, &context_->deadline_,
483 : &initial_metadata_array_, payload, call_cq_->cq(), notification_cq->cq(),
484 1245577 : this);
485 1245982 : }
486 :
487 54 : Server::GenericAsyncRequest::GenericAsyncRequest(
488 : Server* server, GenericServerContext* context,
489 : ServerAsyncStreamingInterface* stream, CompletionQueue* call_cq,
490 : ServerCompletionQueue* notification_cq, void* tag, bool delete_on_finalize)
491 : : BaseAsyncRequest(server, context, stream, call_cq, tag,
492 54 : delete_on_finalize) {
493 54 : grpc_call_details_init(&call_details_);
494 54 : GPR_ASSERT(notification_cq);
495 54 : GPR_ASSERT(call_cq);
496 : grpc_server_request_call(server->server_, &call_, &call_details_,
497 : &initial_metadata_array_, call_cq->cq(),
498 54 : notification_cq->cq(), this);
499 54 : }
500 :
501 54 : bool Server::GenericAsyncRequest::FinalizeResult(void** tag, bool* status) {
502 : // TODO(yangg) remove the copy here.
503 54 : if (*status) {
504 14 : static_cast<GenericServerContext*>(context_)->method_ =
505 14 : call_details_.method;
506 14 : static_cast<GenericServerContext*>(context_)->host_ = call_details_.host;
507 : }
508 54 : gpr_free(call_details_.method);
509 54 : gpr_free(call_details_.host);
510 54 : return BaseAsyncRequest::FinalizeResult(tag, status);
511 : }
512 :
513 42 : bool Server::UnimplementedAsyncRequest::FinalizeResult(void** tag,
514 : bool* status) {
515 42 : if (GenericAsyncRequest::FinalizeResult(tag, status) && *status) {
516 2 : new UnimplementedAsyncRequest(server_, cq_);
517 2 : new UnimplementedAsyncResponse(this);
518 : } else {
519 40 : delete this;
520 : }
521 42 : return false;
522 : }
523 :
524 2 : Server::UnimplementedAsyncResponse::UnimplementedAsyncResponse(
525 : UnimplementedAsyncRequest* request)
526 2 : : request_(request) {
527 2 : Status status(StatusCode::UNIMPLEMENTED, "");
528 2 : UnknownMethodHandler::FillOps(request_->context(), this);
529 2 : request_->stream()->call_.PerformOps(this);
530 2 : }
531 :
532 159751 : void Server::ScheduleCallback() {
533 : {
534 159751 : grpc::unique_lock<grpc::mutex> lock(mu_);
535 159751 : num_running_cb_++;
536 : }
537 159751 : thread_pool_->Add(std::bind(&Server::RunRpc, this));
538 159751 : }
539 :
540 159751 : void Server::RunRpc() {
541 : // Wait for one more incoming rpc.
542 : bool ok;
543 159751 : auto* mrd = SyncRequest::Wait(&cq_, &ok);
544 159751 : if (mrd) {
545 159625 : ScheduleCallback();
546 159626 : if (ok) {
547 159512 : SyncRequest::CallData cd(this, mrd);
548 : {
549 159512 : mrd->SetupRequest();
550 159512 : grpc::unique_lock<grpc::mutex> lock(mu_);
551 159512 : if (!shutdown_) {
552 159510 : mrd->Request(server_, cq_.cq());
553 : } else {
554 : // destroy the structure that was created
555 2 : mrd->TeardownRequest();
556 159512 : }
557 : }
558 159512 : cd.Run();
559 : }
560 : }
561 :
562 : {
563 159751 : grpc::unique_lock<grpc::mutex> lock(mu_);
564 159751 : num_running_cb_--;
565 159751 : if (shutdown_) {
566 252 : callback_cv_.notify_all();
567 159751 : }
568 : }
569 159751 : }
570 :
571 : } // namespace grpc
|