Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <cassert>
35 : #include <forward_list>
36 : #include <functional>
37 : #include <list>
38 : #include <memory>
39 : #include <mutex>
40 : #include <string>
41 : #include <thread>
42 : #include <vector>
43 : #include <sstream>
44 :
45 : #include <grpc/grpc.h>
46 : #include <grpc/support/histogram.h>
47 : #include <grpc/support/log.h>
48 : #include <gflags/gflags.h>
49 : #include <grpc++/client_context.h>
50 :
51 : #include "test/cpp/qps/qpstest.grpc.pb.h"
52 : #include "test/cpp/qps/timer.h"
53 : #include "test/cpp/qps/client.h"
54 : #include "test/cpp/util/create_test_channel.h"
55 :
56 : namespace grpc {
57 : namespace testing {
58 :
59 : typedef std::list<grpc_time> deadline_list;
60 :
61 : class ClientRpcContext {
62 : public:
63 1139445 : explicit ClientRpcContext(int ch) : channel_id_(ch) {}
64 1139011 : virtual ~ClientRpcContext() {}
65 : // next state, return false if done. Collect stats when appropriate
66 : virtual bool RunNextState(bool, Histogram* hist) = 0;
67 : virtual ClientRpcContext* StartNewClone() = 0;
68 1192227 : static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
69 1192324 : static ClientRpcContext* detag(void* t) {
70 1192324 : return reinterpret_cast<ClientRpcContext*>(t);
71 : }
72 :
73 246604 : deadline_list::iterator deadline_posn() const { return deadline_posn_; }
74 254657 : void set_deadline_posn(const deadline_list::iterator& it) {
75 254657 : deadline_posn_ = it;
76 254657 : }
77 : virtual void Start(CompletionQueue* cq) = 0;
78 246676 : int channel_id() const { return channel_id_; }
79 :
80 : protected:
81 : int channel_id_;
82 :
83 : private:
84 : deadline_list::iterator deadline_posn_;
85 : };
86 :
87 : template <class RequestType, class ResponseType>
88 : class ClientRpcContextUnaryImpl : public ClientRpcContext {
89 : public:
90 1139442 : ClientRpcContextUnaryImpl(
91 : int channel_id, TestService::Stub* stub, const RequestType& req,
92 : std::function<
93 : std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
94 : TestService::Stub*, grpc::ClientContext*, const RequestType&,
95 : CompletionQueue*)> start_req,
96 : std::function<void(grpc::Status, ResponseType*)> on_done)
97 : : ClientRpcContext(channel_id),
98 : context_(),
99 : stub_(stub),
100 : req_(req),
101 : response_(),
102 : next_state_(&ClientRpcContextUnaryImpl::RespDone),
103 : callback_(on_done),
104 1139442 : start_req_(start_req) {}
105 1139499 : void Start(CompletionQueue* cq) GRPC_OVERRIDE {
106 1139499 : start_ = Timer::Now();
107 1139606 : response_reader_ = start_req_(stub_, &context_, req_, cq);
108 1139577 : response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
109 1139314 : }
110 2278205 : ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
111 2246442 : bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
112 2246442 : bool ret = (this->*next_state_)(ok);
113 2246437 : if (!ret) {
114 1123516 : hist->Add((Timer::Now() - start_) * 1e9);
115 : }
116 2246373 : return ret;
117 : }
118 :
119 1123266 : ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
120 : return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_,
121 1123266 : callback_);
122 : }
123 :
124 : private:
125 1123154 : bool RespDone(bool) {
126 1123154 : next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
127 1123154 : return false;
128 : }
129 1122895 : bool DoCallBack(bool) {
130 1122895 : callback_(status_, &response_);
131 1123319 : return true; // we're done, this'll be ignored
132 : }
133 : grpc::ClientContext context_;
134 : TestService::Stub* stub_;
135 : RequestType req_;
136 : ResponseType response_;
137 : bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
138 : std::function<void(grpc::Status, ResponseType*)> callback_;
139 : std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
140 : TestService::Stub*, grpc::ClientContext*, const RequestType&,
141 : CompletionQueue*)> start_req_;
142 : grpc::Status status_;
143 : double start_;
144 : std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
145 : response_reader_;
146 : };
147 :
148 : typedef std::forward_list<ClientRpcContext*> context_list;
149 :
150 : class AsyncClient : public Client {
151 : public:
152 4 : explicit AsyncClient(
153 : const ClientConfig& config,
154 : std::function<ClientRpcContext*(int, TestService::Stub*,
155 : const SimpleRequest&)> setup_ctx)
156 : : Client(config),
157 8 : channel_lock_(new std::mutex[config.client_channels()]),
158 4 : contexts_(config.client_channels()),
159 4 : max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
160 4 : channel_count_(config.client_channels()),
161 24 : pref_channel_inc_(config.async_client_threads()) {
162 4 : SetupLoadTest(config, config.async_client_threads());
163 :
164 22 : for (int i = 0; i < config.async_client_threads(); i++) {
165 18 : cli_cqs_.emplace_back(new CompletionQueue);
166 18 : if (!closed_loop_) {
167 8 : rpc_deadlines_.emplace_back();
168 8 : next_channel_.push_back(i % channel_count_);
169 8 : issue_allowed_.emplace_back(true);
170 :
171 8 : grpc_time next_issue;
172 8 : NextIssueTime(i, &next_issue);
173 8 : next_issue_.push_back(next_issue);
174 : }
175 : }
176 :
177 4 : int t = 0;
178 2006 : for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
179 18004 : for (int ch = 0; ch < channel_count_; ch++) {
180 16002 : auto* cq = cli_cqs_[t].get();
181 16002 : t = (t + 1) % cli_cqs_.size();
182 16002 : auto ctx = setup_ctx(ch, channels_[ch].get_stub(), request_);
183 16002 : if (closed_loop_) {
184 8002 : ctx->Start(cq);
185 : } else {
186 8000 : contexts_[ch].push_front(ctx);
187 : }
188 : }
189 : }
190 4 : }
191 8 : virtual ~AsyncClient() {
192 22 : for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
193 18 : (*cq)->Shutdown();
194 : void* got_tag;
195 : bool ok;
196 16038 : while ((*cq)->Next(&got_tag, &ok)) {
197 16002 : delete ClientRpcContext::detag(got_tag);
198 : }
199 : }
200 : // Now clear out all the pre-allocated idle contexts
201 22 : for (int ch = 0; ch < channel_count_; ch++) {
202 36 : while (!contexts_[ch].empty()) {
203 : // Get an idle context from the front of the list
204 0 : auto* ctx = *(contexts_[ch].begin());
205 0 : contexts_[ch].pop_front();
206 0 : delete ctx;
207 : }
208 : }
209 4 : delete[] channel_lock_;
210 4 : }
211 :
212 3437351 : bool ThreadFunc(Histogram* histogram,
213 : size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
214 : void* got_tag;
215 : bool ok;
216 3437351 : grpc_time deadline, short_deadline;
217 3437351 : if (closed_loop_) {
218 929905 : deadline = grpc_time_source::now() + std::chrono::seconds(1);
219 929895 : short_deadline = deadline;
220 : } else {
221 2507446 : if (rpc_deadlines_[thread_idx].empty()) {
222 8 : deadline = grpc_time_source::now() + std::chrono::seconds(1);
223 : } else {
224 2506939 : deadline = *(rpc_deadlines_[thread_idx].begin());
225 : }
226 : short_deadline =
227 2501470 : issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline;
228 : }
229 :
230 : bool got_event;
231 :
232 3429954 : switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) {
233 : case CompletionQueue::SHUTDOWN:
234 0 : return false;
235 : case CompletionQueue::TIMEOUT:
236 2255426 : got_event = false;
237 2255426 : break;
238 : case CompletionQueue::GOT_EVENT:
239 1176420 : got_event = true;
240 1176420 : break;
241 : default:
242 0 : GPR_ASSERT(false);
243 : break;
244 : }
245 3431846 : if (got_event) {
246 1176602 : ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
247 1176587 : if (ctx->RunNextState(ok, histogram) == false) {
248 : // call the callback and then clone the ctx
249 1123429 : ctx->RunNextState(ok, histogram);
250 1123381 : ClientRpcContext* clone_ctx = ctx->StartNewClone();
251 1123565 : if (closed_loop_) {
252 876900 : clone_ctx->Start(cli_cqs_[thread_idx].get());
253 : } else {
254 : // Remove the entry from the rpc deadlines list
255 246665 : rpc_deadlines_[thread_idx].erase(ctx->deadline_posn());
256 : // Put the clone_ctx in the list of idle contexts for this channel
257 : // Under lock
258 246676 : int ch = clone_ctx->channel_id();
259 246676 : std::lock_guard<std::mutex> g(channel_lock_[ch]);
260 246677 : contexts_[ch].push_front(clone_ctx);
261 : }
262 : // delete the old version
263 1123280 : delete ctx;
264 : }
265 1176612 : if (!closed_loop_)
266 246672 : issue_allowed_[thread_idx] =
267 246672 : true; // may be ok now even if it hadn't been
268 : }
269 10551436 : if (!closed_loop_ && issue_allowed_[thread_idx] &&
270 4207798 : grpc_time_source::now() >= next_issue_[thread_idx]) {
271 : // Attempt to issue
272 259184 : bool issued = false;
273 1356314 : for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx];
274 1350055 : num_attempts < channel_count_ && !issued; num_attempts++) {
275 419054 : bool can_issue = false;
276 419054 : ClientRpcContext* ctx = nullptr;
277 : {
278 419054 : std::lock_guard<std::mutex> g(channel_lock_[channel_attempt]);
279 419077 : if (!contexts_[channel_attempt].empty()) {
280 : // Get an idle context from the front of the list
281 254676 : ctx = *(contexts_[channel_attempt].begin());
282 254676 : contexts_[channel_attempt].pop_front();
283 254674 : can_issue = true;
284 419075 : }
285 : }
286 419090 : if (can_issue) {
287 : // do the work to issue
288 509335 : rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() +
289 763999 : std::chrono::seconds(1));
290 254657 : auto it = rpc_deadlines_[thread_idx].end();
291 254657 : --it;
292 254657 : ctx->set_deadline_posn(it);
293 254656 : ctx->Start(cli_cqs_[thread_idx].get());
294 254458 : issued = true;
295 : // If we did issue, then next time, try our thread's next
296 : // preferred channel
297 254458 : next_channel_[thread_idx] += pref_channel_inc_;
298 254666 : if (next_channel_[thread_idx] >= channel_count_)
299 254591 : next_channel_[thread_idx] = (thread_idx % channel_count_);
300 : } else {
301 : // Do a modular increment of channel attempt if we couldn't issue
302 164414 : channel_attempt = (channel_attempt + 1) % channel_count_;
303 : }
304 : }
305 259177 : if (issued) {
306 : // We issued one; see when we can issue the next
307 254664 : grpc_time next_issue;
308 254664 : NextIssueTime(thread_idx, &next_issue);
309 254666 : next_issue_[thread_idx] = next_issue;
310 : } else {
311 4513 : issue_allowed_[thread_idx] = false;
312 : }
313 : }
314 3430239 : return true;
315 : }
316 :
317 : private:
318 : class boolean { // exists only to avoid data-race on vector<bool>
319 : public:
320 : boolean() : val_(false) {}
321 8 : boolean(bool b) : val_(b) {}
322 4927446 : operator bool() const { return val_; }
323 251185 : boolean& operator=(bool b) {
324 251185 : val_ = b;
325 251185 : return *this;
326 : }
327 :
328 : private:
329 : bool val_;
330 : };
331 : std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
332 :
333 : std::vector<deadline_list> rpc_deadlines_; // per thread deadlines
334 : std::vector<int> next_channel_; // per thread round-robin channel ctr
335 : std::vector<boolean> issue_allowed_; // may this thread attempt to issue
336 : std::vector<grpc_time> next_issue_; // when should it issue?
337 :
338 : std::mutex*
339 : channel_lock_; // a vector, but avoid std::vector for old compilers
340 : std::vector<context_list> contexts_; // per-channel list of idle contexts
341 : int max_outstanding_per_channel_;
342 : int channel_count_;
343 : int pref_channel_inc_;
344 : };
345 :
346 : class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
347 : public:
348 3 : explicit AsyncUnaryClient(const ClientConfig& config)
349 3 : : AsyncClient(config, SetupCtx) {
350 3 : StartThreads(config.async_client_threads());
351 3 : }
352 6 : ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
353 :
354 : private:
355 1123130 : static void CheckDone(grpc::Status s, SimpleResponse* response) {}
356 : static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
357 1139411 : StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
358 : const SimpleRequest& request, CompletionQueue* cq) {
359 1139411 : return stub->AsyncUnaryCall(ctx, request, cq);
360 : };
361 16001 : static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
362 : const SimpleRequest& req) {
363 : return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
364 : channel_id, stub, req, AsyncUnaryClient::StartReq,
365 16001 : AsyncUnaryClient::CheckDone);
366 : }
367 : };
368 :
369 : template <class RequestType, class ResponseType>
370 : class ClientRpcContextStreamingImpl : public ClientRpcContext {
371 : public:
372 1 : ClientRpcContextStreamingImpl(
373 : int channel_id, TestService::Stub* stub, const RequestType& req,
374 : std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter<
375 : RequestType, ResponseType>>(TestService::Stub*, grpc::ClientContext*,
376 : CompletionQueue*, void*)> start_req,
377 : std::function<void(grpc::Status, ResponseType*)> on_done)
378 : : ClientRpcContext(channel_id),
379 : context_(),
380 : stub_(stub),
381 : req_(req),
382 : response_(),
383 : next_state_(&ClientRpcContextStreamingImpl::ReqSent),
384 : callback_(on_done),
385 : start_req_(start_req),
386 1 : start_(Timer::Now()) {}
387 2 : ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
388 53049 : bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
389 53049 : return (this->*next_state_)(ok, hist);
390 : }
391 0 : ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
392 : return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_,
393 0 : start_req_, callback_);
394 : }
395 1 : void Start(CompletionQueue* cq) GRPC_OVERRIDE {
396 1 : stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
397 1 : }
398 :
399 : private:
400 1 : bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
401 26525 : bool StartWrite(bool ok) {
402 26525 : if (!ok) {
403 0 : return (false);
404 : }
405 26525 : start_ = Timer::Now();
406 26525 : next_state_ = &ClientRpcContextStreamingImpl::WriteDone;
407 26525 : stream_->Write(req_, ClientRpcContext::tag(this));
408 26525 : return true;
409 : }
410 26524 : bool WriteDone(bool ok, Histogram*) {
411 26524 : if (!ok) {
412 0 : return (false);
413 : }
414 26524 : next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
415 26524 : stream_->Read(&response_, ClientRpcContext::tag(this));
416 26524 : return true;
417 : }
418 26524 : bool ReadDone(bool ok, Histogram* hist) {
419 26524 : hist->Add((Timer::Now() - start_) * 1e9);
420 26524 : return StartWrite(ok);
421 : }
422 : grpc::ClientContext context_;
423 : TestService::Stub* stub_;
424 : RequestType req_;
425 : ResponseType response_;
426 : bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
427 : std::function<void(grpc::Status, ResponseType*)> callback_;
428 : std::function<
429 : std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
430 : TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)>
431 : start_req_;
432 : grpc::Status status_;
433 : double start_;
434 : std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
435 : stream_;
436 : };
437 :
438 : class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
439 : public:
440 1 : explicit AsyncStreamingClient(const ClientConfig& config)
441 1 : : AsyncClient(config, SetupCtx) {
442 : // async streaming currently only supported closed loop
443 1 : GPR_ASSERT(config.load_type() == CLOSED_LOOP);
444 :
445 1 : StartThreads(config.async_client_threads());
446 1 : }
447 :
448 2 : ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
449 :
450 : private:
451 0 : static void CheckDone(grpc::Status s, SimpleResponse* response) {}
452 : static std::unique_ptr<
453 : grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
454 1 : StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
455 : CompletionQueue* cq, void* tag) {
456 1 : auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
457 1 : return stream;
458 : };
459 1 : static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
460 : const SimpleRequest& req) {
461 : return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
462 : channel_id, stub, req, AsyncStreamingClient::StartReq,
463 1 : AsyncStreamingClient::CheckDone);
464 : }
465 : };
466 :
467 3 : std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) {
468 3 : return std::unique_ptr<Client>(new AsyncUnaryClient(args));
469 : }
470 1 : std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) {
471 1 : return std::unique_ptr<Client>(new AsyncStreamingClient(args));
472 : }
473 :
474 : } // namespace testing
475 : } // namespace grpc
|