LCOV - code coverage report
Current view: top level - test/cpp/qps - driver.cc (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 114 125 91.2 %
Date: 2015-10-10 Functions: 7 7 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include <list>
      35             : #include <thread>
      36             : #include <deque>
      37             : #include <vector>
      38             : 
      39             : #include <grpc/support/alloc.h>
      40             : #include <grpc/support/log.h>
      41             : #include <grpc/support/host_port.h>
      42             : #include <grpc++/client_context.h>
      43             : #include <grpc++/create_channel.h>
      44             : 
      45             : #include "src/core/support/env.h"
      46             : #include "test/core/util/port.h"
      47             : #include "test/core/util/test_config.h"
      48             : #include "test/cpp/qps/driver.h"
      49             : #include "test/cpp/qps/histogram.h"
      50             : #include "test/cpp/qps/qps_worker.h"
      51             : 
      52             : using std::list;
      53             : using std::thread;
      54             : using std::unique_ptr;
      55             : using std::deque;
      56             : using std::vector;
      57             : 
      58             : namespace grpc {
      59             : namespace testing {
      60           6 : static deque<string> get_hosts(const string& name) {
      61           6 :   char* env = gpr_getenv(name.c_str());
      62           6 :   if (!env) return deque<string>();
      63             : 
      64           0 :   deque<string> out;
      65           0 :   char* p = env;
      66             :   for (;;) {
      67           0 :     char* comma = strchr(p, ',');
      68           0 :     if (comma) {
      69           0 :       out.emplace_back(p, comma);
      70           0 :       p = comma + 1;
      71             :     } else {
      72           0 :       out.emplace_back(p);
      73           0 :       gpr_free(env);
      74           0 :       return out;
      75             :     }
      76           0 :   }
      77             : }
      78             : 
      79             : // Namespace for classes and functions used only in RunScenario
      80             : // Using this rather than local definitions to workaround gcc-4.4 limitations
      81             : // regarding using templates without linkage
      82             : namespace runsc {
      83             : 
      84             : // ClientContext allocator
      85             : template <class T>
      86          12 : static ClientContext* AllocContext(list<ClientContext>* contexts, T deadline) {
      87          12 :   contexts->emplace_back();
      88          12 :   auto context = &contexts->back();
      89          12 :   context->set_deadline(deadline);
      90          12 :   return context;
      91             : }
      92             : 
      93          12 : struct ServerData {
      94             :   unique_ptr<Worker::Stub> stub;
      95             :   unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
      96             : };
      97             : 
      98          12 : struct ClientData {
      99             :   unique_ptr<Worker::Stub> stub;
     100             :   unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
     101             : };
     102             : }  // namespace runsc
     103             : 
     104           6 : std::unique_ptr<ScenarioResult> RunScenario(
     105             :     const ClientConfig& initial_client_config, size_t num_clients,
     106             :     const ServerConfig& server_config, size_t num_servers, int warmup_seconds,
     107             :     int benchmark_seconds, int spawn_local_worker_count) {
     108             :   // ClientContext allocations (all are destroyed at scope exit)
     109           6 :   list<ClientContext> contexts;
     110             : 
     111             :   // To be added to the result, containing the final configuration used for
     112             :   // client and config (incluiding host, etc.)
     113          12 :   ClientConfig result_client_config;
     114          12 :   ServerConfig result_server_config;
     115             : 
     116             :   // Get client, server lists
     117          12 :   auto workers = get_hosts("QPS_WORKERS");
     118          12 :   ClientConfig client_config = initial_client_config;
     119             : 
     120             :   // Spawn some local workers if desired
     121          12 :   vector<unique_ptr<QpsWorker>> local_workers;
     122          18 :   for (int i = 0; i < abs(spawn_local_worker_count); i++) {
     123             :     // act as if we're a new test -- gets a good rng seed
     124             :     static bool called_init = false;
     125          12 :     if (!called_init) {
     126             :       char args_buf[100];
     127           6 :       strcpy(args_buf, "some-benchmark");
     128           6 :       char* args[] = {args_buf};
     129           6 :       grpc_test_init(1, args);
     130           6 :       called_init = true;
     131             :     }
     132             : 
     133          12 :     int driver_port = grpc_pick_unused_port_or_die();
     134          12 :     int benchmark_port = grpc_pick_unused_port_or_die();
     135          12 :     local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port));
     136             :     char addr[256];
     137          12 :     sprintf(addr, "localhost:%d", driver_port);
     138          12 :     if (spawn_local_worker_count < 0) {
     139          12 :       workers.push_front(addr);
     140             :     } else {
     141           0 :       workers.push_back(addr);
     142             :     }
     143             :   }
     144             : 
     145             :   // TODO(ctiller): support running multiple configurations, and binpack
     146             :   // client/server pairs
     147             :   // to available workers
     148           6 :   GPR_ASSERT(workers.size() >= num_clients + num_servers);
     149             : 
     150             :   // Trim to just what we need
     151           6 :   workers.resize(num_clients + num_servers);
     152             : 
     153             :   gpr_timespec deadline =
     154             :       gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
     155             :                    gpr_time_from_seconds(
     156           6 :                        warmup_seconds + benchmark_seconds + 20, GPR_TIMESPAN));
     157             : 
     158             :   // Start servers
     159             :   using runsc::ServerData;
     160             :   // servers is array rather than std::vector to avoid gcc-4.4 issues
     161             :   // where class contained in std::vector must have a copy constructor
     162           6 :   auto* servers = new ServerData[num_servers];
     163          12 :   for (size_t i = 0; i < num_servers; i++) {
     164          18 :     servers[i].stub =
     165          12 :         Worker::NewStub(CreateChannel(workers[i], InsecureCredentials()));
     166           6 :     ServerArgs args;
     167           6 :     result_server_config = server_config;
     168           6 :     result_server_config.set_host(workers[i]);
     169           6 :     *args.mutable_setup() = server_config;
     170          18 :     servers[i].stream =
     171          12 :         servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline));
     172           6 :     GPR_ASSERT(servers[i].stream->Write(args));
     173          12 :     ServerStatus init_status;
     174           6 :     GPR_ASSERT(servers[i].stream->Read(&init_status));
     175             :     char* host;
     176             :     char* driver_port;
     177             :     char* cli_target;
     178           6 :     gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
     179           6 :     gpr_join_host_port(&cli_target, host, init_status.port());
     180           6 :     client_config.add_server_targets(cli_target);
     181           6 :     gpr_free(host);
     182           6 :     gpr_free(driver_port);
     183           6 :     gpr_free(cli_target);
     184           6 :   }
     185             : 
     186             :   // Start clients
     187             :   using runsc::ClientData;
     188             :   // clients is array rather than std::vector to avoid gcc-4.4 issues
     189             :   // where class contained in std::vector must have a copy constructor
     190           6 :   auto* clients = new ClientData[num_clients];
     191          12 :   for (size_t i = 0; i < num_clients; i++) {
     192          18 :     clients[i].stub = Worker::NewStub(
     193          12 :         CreateChannel(workers[i + num_servers], InsecureCredentials()));
     194           6 :     ClientArgs args;
     195           6 :     result_client_config = client_config;
     196           6 :     result_client_config.set_host(workers[i + num_servers]);
     197           6 :     *args.mutable_setup() = client_config;
     198          18 :     clients[i].stream =
     199          12 :         clients[i].stub->RunTest(runsc::AllocContext(&contexts, deadline));
     200           6 :     GPR_ASSERT(clients[i].stream->Write(args));
     201          12 :     ClientStatus init_status;
     202           6 :     GPR_ASSERT(clients[i].stream->Read(&init_status));
     203           6 :   }
     204             : 
     205             :   // Let everything warmup
     206           6 :   gpr_log(GPR_INFO, "Warming up");
     207           6 :   gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
     208             :   gpr_sleep_until(
     209           6 :       gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN)));
     210             : 
     211             :   // Start a run
     212           6 :   gpr_log(GPR_INFO, "Starting");
     213          12 :   ServerArgs server_mark;
     214           6 :   server_mark.mutable_mark();
     215          12 :   ClientArgs client_mark;
     216           6 :   client_mark.mutable_mark();
     217          12 :   for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
     218           6 :     GPR_ASSERT(server->stream->Write(server_mark));
     219             :   }
     220          12 :   for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
     221           6 :     GPR_ASSERT(client->stream->Write(client_mark));
     222             :   }
     223          12 :   ServerStatus server_status;
     224          12 :   ClientStatus client_status;
     225          12 :   for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
     226           6 :     GPR_ASSERT(server->stream->Read(&server_status));
     227             :   }
     228          12 :   for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
     229           6 :     GPR_ASSERT(client->stream->Read(&client_status));
     230             :   }
     231             : 
     232             :   // Wait some time
     233           6 :   gpr_log(GPR_INFO, "Running");
     234             :   // Use gpr_sleep_until rather than this_thread::sleep_until to support
     235             :   // compilers that don't work with this_thread
     236             :   gpr_sleep_until(gpr_time_add(
     237           6 :       start, gpr_time_from_seconds(benchmark_seconds, GPR_TIMESPAN)));
     238             : 
     239             :   // Finish a run
     240           6 :   std::unique_ptr<ScenarioResult> result(new ScenarioResult);
     241           6 :   result->client_config = result_client_config;
     242           6 :   result->server_config = result_server_config;
     243           6 :   gpr_log(GPR_INFO, "Finishing");
     244          12 :   for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
     245           6 :     GPR_ASSERT(server->stream->Write(server_mark));
     246             :   }
     247          12 :   for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
     248           6 :     GPR_ASSERT(client->stream->Write(client_mark));
     249             :   }
     250          12 :   for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
     251           6 :     GPR_ASSERT(server->stream->Read(&server_status));
     252           6 :     const auto& stats = server_status.stats();
     253           6 :     result->server_resources.emplace_back(
     254          12 :         stats.time_elapsed(), stats.time_user(), stats.time_system());
     255             :   }
     256          12 :   for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
     257           6 :     GPR_ASSERT(client->stream->Read(&client_status));
     258           6 :     const auto& stats = client_status.stats();
     259           6 :     result->latencies.MergeProto(stats.latencies());
     260           6 :     result->client_resources.emplace_back(
     261          12 :         stats.time_elapsed(), stats.time_user(), stats.time_system());
     262             :   }
     263             : 
     264          12 :   for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
     265           6 :     GPR_ASSERT(client->stream->WritesDone());
     266           6 :     GPR_ASSERT(client->stream->Finish().ok());
     267             :   }
     268          12 :   for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
     269           6 :     GPR_ASSERT(server->stream->WritesDone());
     270           6 :     GPR_ASSERT(server->stream->Finish().ok());
     271             :   }
     272           6 :   delete[] clients;
     273           6 :   delete[] servers;
     274          12 :   return result;
     275             : }
     276             : }  // namespace testing
     277             : }  // namespace grpc

Generated by: LCOV version 1.10