LCOV - code coverage report
Current view: top level - test/cpp/qps - client_async.cc (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 179 189 94.7 %
Date: 2015-10-10 Functions: 43 47 91.5 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include <cassert>
      35             : #include <forward_list>
      36             : #include <functional>
      37             : #include <list>
      38             : #include <memory>
      39             : #include <mutex>
      40             : #include <string>
      41             : #include <thread>
      42             : #include <vector>
      43             : #include <sstream>
      44             : 
      45             : #include <grpc/grpc.h>
      46             : #include <grpc/support/histogram.h>
      47             : #include <grpc/support/log.h>
      48             : #include <gflags/gflags.h>
      49             : #include <grpc++/client_context.h>
      50             : 
      51             : #include "test/cpp/qps/qpstest.grpc.pb.h"
      52             : #include "test/cpp/qps/timer.h"
      53             : #include "test/cpp/qps/client.h"
      54             : #include "test/cpp/util/create_test_channel.h"
      55             : 
      56             : namespace grpc {
      57             : namespace testing {
      58             : 
      59             : typedef std::list<grpc_time> deadline_list;
      60             : 
      61             : class ClientRpcContext {
      62             :  public:
      63     1139445 :   explicit ClientRpcContext(int ch) : channel_id_(ch) {}
      64     1139011 :   virtual ~ClientRpcContext() {}
      65             :   // next state, return false if done. Collect stats when appropriate
      66             :   virtual bool RunNextState(bool, Histogram* hist) = 0;
      67             :   virtual ClientRpcContext* StartNewClone() = 0;
      68     1192227 :   static void* tag(ClientRpcContext* c) { return reinterpret_cast<void*>(c); }
      69     1192324 :   static ClientRpcContext* detag(void* t) {
      70     1192324 :     return reinterpret_cast<ClientRpcContext*>(t);
      71             :   }
      72             : 
      73      246604 :   deadline_list::iterator deadline_posn() const { return deadline_posn_; }
      74      254657 :   void set_deadline_posn(const deadline_list::iterator& it) {
      75      254657 :     deadline_posn_ = it;
      76      254657 :   }
      77             :   virtual void Start(CompletionQueue* cq) = 0;
      78      246676 :   int channel_id() const { return channel_id_; }
      79             : 
      80             :  protected:
      81             :   int channel_id_;
      82             : 
      83             :  private:
      84             :   deadline_list::iterator deadline_posn_;
      85             : };
      86             : 
      87             : template <class RequestType, class ResponseType>
      88             : class ClientRpcContextUnaryImpl : public ClientRpcContext {
      89             :  public:
      90     1139442 :   ClientRpcContextUnaryImpl(
      91             :       int channel_id, TestService::Stub* stub, const RequestType& req,
      92             :       std::function<
      93             :           std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
      94             :               TestService::Stub*, grpc::ClientContext*, const RequestType&,
      95             :               CompletionQueue*)> start_req,
      96             :       std::function<void(grpc::Status, ResponseType*)> on_done)
      97             :       : ClientRpcContext(channel_id),
      98             :         context_(),
      99             :         stub_(stub),
     100             :         req_(req),
     101             :         response_(),
     102             :         next_state_(&ClientRpcContextUnaryImpl::RespDone),
     103             :         callback_(on_done),
     104     1139442 :         start_req_(start_req) {}
     105     1139499 :   void Start(CompletionQueue* cq) GRPC_OVERRIDE {
     106     1139499 :     start_ = Timer::Now();
     107     1139606 :     response_reader_ = start_req_(stub_, &context_, req_, cq);
     108     1139577 :     response_reader_->Finish(&response_, &status_, ClientRpcContext::tag(this));
     109     1139314 :   }
     110     2278205 :   ~ClientRpcContextUnaryImpl() GRPC_OVERRIDE {}
     111     2246442 :   bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
     112     2246442 :     bool ret = (this->*next_state_)(ok);
     113     2246437 :     if (!ret) {
     114     1123516 :       hist->Add((Timer::Now() - start_) * 1e9);
     115             :     }
     116     2246373 :     return ret;
     117             :   }
     118             : 
     119     1123266 :   ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
     120             :     return new ClientRpcContextUnaryImpl(channel_id_, stub_, req_, start_req_,
     121     1123266 :                                          callback_);
     122             :   }
     123             : 
     124             :  private:
     125     1123154 :   bool RespDone(bool) {
     126     1123154 :     next_state_ = &ClientRpcContextUnaryImpl::DoCallBack;
     127     1123154 :     return false;
     128             :   }
     129     1122895 :   bool DoCallBack(bool) {
     130     1122895 :     callback_(status_, &response_);
     131     1123319 :     return true;  // we're done, this'll be ignored
     132             :   }
     133             :   grpc::ClientContext context_;
     134             :   TestService::Stub* stub_;
     135             :   RequestType req_;
     136             :   ResponseType response_;
     137             :   bool (ClientRpcContextUnaryImpl::*next_state_)(bool);
     138             :   std::function<void(grpc::Status, ResponseType*)> callback_;
     139             :   std::function<std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>(
     140             :       TestService::Stub*, grpc::ClientContext*, const RequestType&,
     141             :       CompletionQueue*)> start_req_;
     142             :   grpc::Status status_;
     143             :   double start_;
     144             :   std::unique_ptr<grpc::ClientAsyncResponseReader<ResponseType>>
     145             :       response_reader_;
     146             : };
     147             : 
     148             : typedef std::forward_list<ClientRpcContext*> context_list;
     149             : 
     150             : class AsyncClient : public Client {
     151             :  public:
     152           4 :   explicit AsyncClient(
     153             :       const ClientConfig& config,
     154             :       std::function<ClientRpcContext*(int, TestService::Stub*,
     155             :                                       const SimpleRequest&)> setup_ctx)
     156             :       : Client(config),
     157           8 :         channel_lock_(new std::mutex[config.client_channels()]),
     158           4 :         contexts_(config.client_channels()),
     159           4 :         max_outstanding_per_channel_(config.outstanding_rpcs_per_channel()),
     160           4 :         channel_count_(config.client_channels()),
     161          24 :         pref_channel_inc_(config.async_client_threads()) {
     162           4 :     SetupLoadTest(config, config.async_client_threads());
     163             : 
     164          22 :     for (int i = 0; i < config.async_client_threads(); i++) {
     165          18 :       cli_cqs_.emplace_back(new CompletionQueue);
     166          18 :       if (!closed_loop_) {
     167           8 :         rpc_deadlines_.emplace_back();
     168           8 :         next_channel_.push_back(i % channel_count_);
     169           8 :         issue_allowed_.emplace_back(true);
     170             : 
     171           8 :         grpc_time next_issue;
     172           8 :         NextIssueTime(i, &next_issue);
     173           8 :         next_issue_.push_back(next_issue);
     174             :       }
     175             :     }
     176             : 
     177           4 :     int t = 0;
     178        2006 :     for (int i = 0; i < config.outstanding_rpcs_per_channel(); i++) {
     179       18004 :       for (int ch = 0; ch < channel_count_; ch++) {
     180       16002 :         auto* cq = cli_cqs_[t].get();
     181       16002 :         t = (t + 1) % cli_cqs_.size();
     182       16002 :         auto ctx = setup_ctx(ch, channels_[ch].get_stub(), request_);
     183       16002 :         if (closed_loop_) {
     184        8002 :           ctx->Start(cq);
     185             :         } else {
     186        8000 :           contexts_[ch].push_front(ctx);
     187             :         }
     188             :       }
     189             :     }
     190           4 :   }
     191           8 :   virtual ~AsyncClient() {
     192          22 :     for (auto cq = cli_cqs_.begin(); cq != cli_cqs_.end(); cq++) {
     193          18 :       (*cq)->Shutdown();
     194             :       void* got_tag;
     195             :       bool ok;
     196       16038 :       while ((*cq)->Next(&got_tag, &ok)) {
     197       16002 :         delete ClientRpcContext::detag(got_tag);
     198             :       }
     199             :     }
     200             :     // Now clear out all the pre-allocated idle contexts
     201          22 :     for (int ch = 0; ch < channel_count_; ch++) {
     202          36 :       while (!contexts_[ch].empty()) {
     203             :         // Get an idle context from the front of the list
     204           0 :         auto* ctx = *(contexts_[ch].begin());
     205           0 :         contexts_[ch].pop_front();
     206           0 :         delete ctx;
     207             :       }
     208             :     }
     209           4 :     delete[] channel_lock_;
     210           4 :   }
     211             : 
     212     3437351 :   bool ThreadFunc(Histogram* histogram,
     213             :                   size_t thread_idx) GRPC_OVERRIDE GRPC_FINAL {
     214             :     void* got_tag;
     215             :     bool ok;
     216     3437351 :     grpc_time deadline, short_deadline;
     217     3437351 :     if (closed_loop_) {
     218      929905 :       deadline = grpc_time_source::now() + std::chrono::seconds(1);
     219      929895 :       short_deadline = deadline;
     220             :     } else {
     221     2507446 :       if (rpc_deadlines_[thread_idx].empty()) {
     222           8 :         deadline = grpc_time_source::now() + std::chrono::seconds(1);
     223             :       } else {
     224     2506939 :         deadline = *(rpc_deadlines_[thread_idx].begin());
     225             :       }
     226             :       short_deadline =
     227     2501470 :           issue_allowed_[thread_idx] ? next_issue_[thread_idx] : deadline;
     228             :     }
     229             : 
     230             :     bool got_event;
     231             : 
     232     3429954 :     switch (cli_cqs_[thread_idx]->AsyncNext(&got_tag, &ok, short_deadline)) {
     233             :       case CompletionQueue::SHUTDOWN:
     234           0 :         return false;
     235             :       case CompletionQueue::TIMEOUT:
     236     2255426 :         got_event = false;
     237     2255426 :         break;
     238             :       case CompletionQueue::GOT_EVENT:
     239     1176420 :         got_event = true;
     240     1176420 :         break;
     241             :       default:
     242           0 :         GPR_ASSERT(false);
     243             :         break;
     244             :     }
     245     3431846 :     if (got_event) {
     246     1176602 :       ClientRpcContext* ctx = ClientRpcContext::detag(got_tag);
     247     1176587 :       if (ctx->RunNextState(ok, histogram) == false) {
     248             :         // call the callback and then clone the ctx
     249     1123429 :         ctx->RunNextState(ok, histogram);
     250     1123381 :         ClientRpcContext* clone_ctx = ctx->StartNewClone();
     251     1123565 :         if (closed_loop_) {
     252      876900 :           clone_ctx->Start(cli_cqs_[thread_idx].get());
     253             :         } else {
     254             :           // Remove the entry from the rpc deadlines list
     255      246665 :           rpc_deadlines_[thread_idx].erase(ctx->deadline_posn());
     256             :           // Put the clone_ctx in the list of idle contexts for this channel
     257             :           // Under lock
     258      246676 :           int ch = clone_ctx->channel_id();
     259      246676 :           std::lock_guard<std::mutex> g(channel_lock_[ch]);
     260      246677 :           contexts_[ch].push_front(clone_ctx);
     261             :         }
     262             :         // delete the old version
     263     1123280 :         delete ctx;
     264             :       }
     265     1176612 :       if (!closed_loop_)
     266      246672 :         issue_allowed_[thread_idx] =
     267      246672 :             true;  // may be ok now even if it hadn't been
     268             :     }
     269    10551436 :     if (!closed_loop_ && issue_allowed_[thread_idx] &&
     270     4207798 :         grpc_time_source::now() >= next_issue_[thread_idx]) {
     271             :       // Attempt to issue
     272      259184 :       bool issued = false;
     273     1356314 :       for (int num_attempts = 0, channel_attempt = next_channel_[thread_idx];
     274     1350055 :            num_attempts < channel_count_ && !issued; num_attempts++) {
     275      419054 :         bool can_issue = false;
     276      419054 :         ClientRpcContext* ctx = nullptr;
     277             :         {
     278      419054 :           std::lock_guard<std::mutex> g(channel_lock_[channel_attempt]);
     279      419077 :           if (!contexts_[channel_attempt].empty()) {
     280             :             // Get an idle context from the front of the list
     281      254676 :             ctx = *(contexts_[channel_attempt].begin());
     282      254676 :             contexts_[channel_attempt].pop_front();
     283      254674 :             can_issue = true;
     284      419075 :           }
     285             :         }
     286      419090 :         if (can_issue) {
     287             :           // do the work to issue
     288      509335 :           rpc_deadlines_[thread_idx].emplace_back(grpc_time_source::now() +
     289      763999 :                                                   std::chrono::seconds(1));
     290      254657 :           auto it = rpc_deadlines_[thread_idx].end();
     291      254657 :           --it;
     292      254657 :           ctx->set_deadline_posn(it);
     293      254656 :           ctx->Start(cli_cqs_[thread_idx].get());
     294      254458 :           issued = true;
     295             :           // If we did issue, then next time, try our thread's next
     296             :           // preferred channel
     297      254458 :           next_channel_[thread_idx] += pref_channel_inc_;
     298      254666 :           if (next_channel_[thread_idx] >= channel_count_)
     299      254591 :             next_channel_[thread_idx] = (thread_idx % channel_count_);
     300             :         } else {
     301             :           // Do a modular increment of channel attempt if we couldn't issue
     302      164414 :           channel_attempt = (channel_attempt + 1) % channel_count_;
     303             :         }
     304             :       }
     305      259177 :       if (issued) {
     306             :         // We issued one; see when we can issue the next
     307      254664 :         grpc_time next_issue;
     308      254664 :         NextIssueTime(thread_idx, &next_issue);
     309      254666 :         next_issue_[thread_idx] = next_issue;
     310             :       } else {
     311        4513 :         issue_allowed_[thread_idx] = false;
     312             :       }
     313             :     }
     314     3430239 :     return true;
     315             :   }
     316             : 
     317             :  private:
     318             :   class boolean {  // exists only to avoid data-race on vector<bool>
     319             :    public:
     320             :     boolean() : val_(false) {}
     321           8 :     boolean(bool b) : val_(b) {}
     322     4927446 :     operator bool() const { return val_; }
     323      251185 :     boolean& operator=(bool b) {
     324      251185 :       val_ = b;
     325      251185 :       return *this;
     326             :     }
     327             : 
     328             :    private:
     329             :     bool val_;
     330             :   };
     331             :   std::vector<std::unique_ptr<CompletionQueue>> cli_cqs_;
     332             : 
     333             :   std::vector<deadline_list> rpc_deadlines_;  // per thread deadlines
     334             :   std::vector<int> next_channel_;       // per thread round-robin channel ctr
     335             :   std::vector<boolean> issue_allowed_;  // may this thread attempt to issue
     336             :   std::vector<grpc_time> next_issue_;   // when should it issue?
     337             : 
     338             :   std::mutex*
     339             :       channel_lock_;  // a vector, but avoid std::vector for old compilers
     340             :   std::vector<context_list> contexts_;  // per-channel list of idle contexts
     341             :   int max_outstanding_per_channel_;
     342             :   int channel_count_;
     343             :   int pref_channel_inc_;
     344             : };
     345             : 
     346             : class AsyncUnaryClient GRPC_FINAL : public AsyncClient {
     347             :  public:
     348           3 :   explicit AsyncUnaryClient(const ClientConfig& config)
     349           3 :       : AsyncClient(config, SetupCtx) {
     350           3 :     StartThreads(config.async_client_threads());
     351           3 :   }
     352           6 :   ~AsyncUnaryClient() GRPC_OVERRIDE { EndThreads(); }
     353             : 
     354             :  private:
     355     1123130 :   static void CheckDone(grpc::Status s, SimpleResponse* response) {}
     356             :   static std::unique_ptr<grpc::ClientAsyncResponseReader<SimpleResponse>>
     357     1139411 :   StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
     358             :            const SimpleRequest& request, CompletionQueue* cq) {
     359     1139411 :     return stub->AsyncUnaryCall(ctx, request, cq);
     360             :   };
     361       16001 :   static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
     362             :                                     const SimpleRequest& req) {
     363             :     return new ClientRpcContextUnaryImpl<SimpleRequest, SimpleResponse>(
     364             :         channel_id, stub, req, AsyncUnaryClient::StartReq,
     365       16001 :         AsyncUnaryClient::CheckDone);
     366             :   }
     367             : };
     368             : 
     369             : template <class RequestType, class ResponseType>
     370             : class ClientRpcContextStreamingImpl : public ClientRpcContext {
     371             :  public:
     372           1 :   ClientRpcContextStreamingImpl(
     373             :       int channel_id, TestService::Stub* stub, const RequestType& req,
     374             :       std::function<std::unique_ptr<grpc::ClientAsyncReaderWriter<
     375             :           RequestType, ResponseType>>(TestService::Stub*, grpc::ClientContext*,
     376             :                                       CompletionQueue*, void*)> start_req,
     377             :       std::function<void(grpc::Status, ResponseType*)> on_done)
     378             :       : ClientRpcContext(channel_id),
     379             :         context_(),
     380             :         stub_(stub),
     381             :         req_(req),
     382             :         response_(),
     383             :         next_state_(&ClientRpcContextStreamingImpl::ReqSent),
     384             :         callback_(on_done),
     385             :         start_req_(start_req),
     386           1 :         start_(Timer::Now()) {}
     387           2 :   ~ClientRpcContextStreamingImpl() GRPC_OVERRIDE {}
     388       53049 :   bool RunNextState(bool ok, Histogram* hist) GRPC_OVERRIDE {
     389       53049 :     return (this->*next_state_)(ok, hist);
     390             :   }
     391           0 :   ClientRpcContext* StartNewClone() GRPC_OVERRIDE {
     392             :     return new ClientRpcContextStreamingImpl(channel_id_, stub_, req_,
     393           0 :                                              start_req_, callback_);
     394             :   }
     395           1 :   void Start(CompletionQueue* cq) GRPC_OVERRIDE {
     396           1 :     stream_ = start_req_(stub_, &context_, cq, ClientRpcContext::tag(this));
     397           1 :   }
     398             : 
     399             :  private:
     400           1 :   bool ReqSent(bool ok, Histogram*) { return StartWrite(ok); }
     401       26525 :   bool StartWrite(bool ok) {
     402       26525 :     if (!ok) {
     403           0 :       return (false);
     404             :     }
     405       26525 :     start_ = Timer::Now();
     406       26525 :     next_state_ = &ClientRpcContextStreamingImpl::WriteDone;
     407       26525 :     stream_->Write(req_, ClientRpcContext::tag(this));
     408       26525 :     return true;
     409             :   }
     410       26524 :   bool WriteDone(bool ok, Histogram*) {
     411       26524 :     if (!ok) {
     412           0 :       return (false);
     413             :     }
     414       26524 :     next_state_ = &ClientRpcContextStreamingImpl::ReadDone;
     415       26524 :     stream_->Read(&response_, ClientRpcContext::tag(this));
     416       26524 :     return true;
     417             :   }
     418       26524 :   bool ReadDone(bool ok, Histogram* hist) {
     419       26524 :     hist->Add((Timer::Now() - start_) * 1e9);
     420       26524 :     return StartWrite(ok);
     421             :   }
     422             :   grpc::ClientContext context_;
     423             :   TestService::Stub* stub_;
     424             :   RequestType req_;
     425             :   ResponseType response_;
     426             :   bool (ClientRpcContextStreamingImpl::*next_state_)(bool, Histogram*);
     427             :   std::function<void(grpc::Status, ResponseType*)> callback_;
     428             :   std::function<
     429             :       std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>(
     430             :           TestService::Stub*, grpc::ClientContext*, CompletionQueue*, void*)>
     431             :       start_req_;
     432             :   grpc::Status status_;
     433             :   double start_;
     434             :   std::unique_ptr<grpc::ClientAsyncReaderWriter<RequestType, ResponseType>>
     435             :       stream_;
     436             : };
     437             : 
     438             : class AsyncStreamingClient GRPC_FINAL : public AsyncClient {
     439             :  public:
     440           1 :   explicit AsyncStreamingClient(const ClientConfig& config)
     441           1 :       : AsyncClient(config, SetupCtx) {
     442             :     // async streaming currently only supported closed loop
     443           1 :     GPR_ASSERT(config.load_type() == CLOSED_LOOP);
     444             : 
     445           1 :     StartThreads(config.async_client_threads());
     446           1 :   }
     447             : 
     448           2 :   ~AsyncStreamingClient() GRPC_OVERRIDE { EndThreads(); }
     449             : 
     450             :  private:
     451           0 :   static void CheckDone(grpc::Status s, SimpleResponse* response) {}
     452             :   static std::unique_ptr<
     453             :       grpc::ClientAsyncReaderWriter<SimpleRequest, SimpleResponse>>
     454           1 :   StartReq(TestService::Stub* stub, grpc::ClientContext* ctx,
     455             :            CompletionQueue* cq, void* tag) {
     456           1 :     auto stream = stub->AsyncStreamingCall(ctx, cq, tag);
     457           1 :     return stream;
     458             :   };
     459           1 :   static ClientRpcContext* SetupCtx(int channel_id, TestService::Stub* stub,
     460             :                                     const SimpleRequest& req) {
     461             :     return new ClientRpcContextStreamingImpl<SimpleRequest, SimpleResponse>(
     462             :         channel_id, stub, req, AsyncStreamingClient::StartReq,
     463           1 :         AsyncStreamingClient::CheckDone);
     464             :   }
     465             : };
     466             : 
     467           3 : std::unique_ptr<Client> CreateAsyncUnaryClient(const ClientConfig& args) {
     468           3 :   return std::unique_ptr<Client>(new AsyncUnaryClient(args));
     469             : }
     470           1 : std::unique_ptr<Client> CreateAsyncStreamingClient(const ClientConfig& args) {
     471           1 :   return std::unique_ptr<Client>(new AsyncStreamingClient(args));
     472             : }
     473             : 
     474             : }  // namespace testing
     475             : }  // namespace grpc

Generated by: LCOV version 1.10