Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <grpc++/impl/sync.h>
35 : #include <grpc++/impl/thd.h>
36 :
37 : #include "src/cpp/server/dynamic_thread_pool.h"
38 :
39 : namespace grpc {
40 18145 : DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
41 : : pool_(pool),
42 : thd_(new grpc::thread(&DynamicThreadPool::DynamicThread::ThreadFunc,
43 18145 : this)) {}
44 36290 : DynamicThreadPool::DynamicThread::~DynamicThread() {
45 18145 : thd_->join();
46 18145 : thd_.reset();
47 18145 : }
48 :
49 18144 : void DynamicThreadPool::DynamicThread::ThreadFunc() {
50 18144 : pool_->ThreadFunc();
51 : // Now that we have killed ourselves, we should reduce the thread count
52 18144 : grpc::unique_lock<grpc::mutex> lock(pool_->mu_);
53 18145 : pool_->nthreads_--;
54 : // Move ourselves to dead list
55 18145 : pool_->dead_threads_.push_back(this);
56 :
57 18145 : if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
58 180 : pool_->shutdown_cv_.notify_one();
59 18145 : }
60 18145 : }
61 :
62 312335 : void DynamicThreadPool::ThreadFunc() {
63 : for (;;) {
64 : // Wait until work is available or we are shutting down.
65 312335 : grpc::unique_lock<grpc::mutex> lock(mu_);
66 312336 : if (!shutdown_ && callbacks_.empty()) {
67 : // If there are too many threads waiting, then quit this thread
68 134786 : if (threads_waiting_ >= reserve_threads_) {
69 17322 : break;
70 : }
71 117464 : threads_waiting_++;
72 117464 : cv_.wait(lock);
73 117464 : threads_waiting_--;
74 : }
75 : // Drain callbacks before considering shutdown to ensure all work
76 : // gets completed.
77 295014 : if (!callbacks_.empty()) {
78 209710 : auto cb = callbacks_.front();
79 209710 : callbacks_.pop();
80 209710 : lock.unlock();
81 209710 : cb();
82 85304 : } else if (shutdown_) {
83 823 : break;
84 : }
85 294191 : }
86 18144 : }
87 :
88 180 : DynamicThreadPool::DynamicThreadPool(int reserve_threads)
89 : : shutdown_(false),
90 : reserve_threads_(reserve_threads),
91 : nthreads_(0),
92 180 : threads_waiting_(0) {
93 900 : for (int i = 0; i < reserve_threads_; i++) {
94 720 : grpc::lock_guard<grpc::mutex> lock(mu_);
95 720 : nthreads_++;
96 720 : new DynamicThread(this);
97 720 : }
98 180 : }
99 :
100 1585 : void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
101 19730 : for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
102 18145 : delete *t;
103 : }
104 1585 : }
105 :
106 540 : DynamicThreadPool::~DynamicThreadPool() {
107 180 : grpc::unique_lock<grpc::mutex> lock(mu_);
108 180 : shutdown_ = true;
109 180 : cv_.notify_all();
110 540 : while (nthreads_ != 0) {
111 180 : shutdown_cv_.wait(lock);
112 : }
113 180 : ReapThreads(&dead_threads_);
114 360 : }
115 :
116 209710 : void DynamicThreadPool::Add(const std::function<void()>& callback) {
117 209710 : grpc::lock_guard<grpc::mutex> lock(mu_);
118 : // Add works to the callbacks list
119 209710 : callbacks_.push(callback);
120 : // Increase pool size or notify as needed
121 209710 : if (threads_waiting_ == 0) {
122 : // Kick off a new thread
123 17425 : nthreads_++;
124 17425 : new DynamicThread(this);
125 : } else {
126 192285 : cv_.notify_one();
127 : }
128 : // Also use this chance to harvest dead threads
129 209710 : if (!dead_threads_.empty()) {
130 1405 : ReapThreads(&dead_threads_);
131 209710 : }
132 209710 : }
133 :
134 : } // namespace grpc
|