Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <list>
35 : #include <thread>
36 : #include <deque>
37 : #include <vector>
38 :
39 : #include <grpc/support/alloc.h>
40 : #include <grpc/support/log.h>
41 : #include <grpc/support/host_port.h>
42 : #include <grpc++/client_context.h>
43 : #include <grpc++/create_channel.h>
44 :
45 : #include "src/core/support/env.h"
46 : #include "test/core/util/port.h"
47 : #include "test/core/util/test_config.h"
48 : #include "test/cpp/qps/driver.h"
49 : #include "test/cpp/qps/histogram.h"
50 : #include "test/cpp/qps/qps_worker.h"
51 :
52 : using std::list;
53 : using std::thread;
54 : using std::unique_ptr;
55 : using std::deque;
56 : using std::vector;
57 :
58 : namespace grpc {
59 : namespace testing {
60 6 : static deque<string> get_hosts(const string& name) {
61 6 : char* env = gpr_getenv(name.c_str());
62 6 : if (!env) return deque<string>();
63 :
64 0 : deque<string> out;
65 0 : char* p = env;
66 : for (;;) {
67 0 : char* comma = strchr(p, ',');
68 0 : if (comma) {
69 0 : out.emplace_back(p, comma);
70 0 : p = comma + 1;
71 : } else {
72 0 : out.emplace_back(p);
73 0 : gpr_free(env);
74 0 : return out;
75 : }
76 0 : }
77 : }
78 :
79 : // Namespace for classes and functions used only in RunScenario
80 : // Using this rather than local definitions to workaround gcc-4.4 limitations
81 : // regarding using templates without linkage
82 : namespace runsc {
83 :
84 : // ClientContext allocator
85 : template <class T>
86 12 : static ClientContext* AllocContext(list<ClientContext>* contexts, T deadline) {
87 12 : contexts->emplace_back();
88 12 : auto context = &contexts->back();
89 12 : context->set_deadline(deadline);
90 12 : return context;
91 : }
92 :
93 12 : struct ServerData {
94 : unique_ptr<Worker::Stub> stub;
95 : unique_ptr<ClientReaderWriter<ServerArgs, ServerStatus>> stream;
96 : };
97 :
98 12 : struct ClientData {
99 : unique_ptr<Worker::Stub> stub;
100 : unique_ptr<ClientReaderWriter<ClientArgs, ClientStatus>> stream;
101 : };
102 : } // namespace runsc
103 :
104 6 : std::unique_ptr<ScenarioResult> RunScenario(
105 : const ClientConfig& initial_client_config, size_t num_clients,
106 : const ServerConfig& server_config, size_t num_servers, int warmup_seconds,
107 : int benchmark_seconds, int spawn_local_worker_count) {
108 : // ClientContext allocations (all are destroyed at scope exit)
109 6 : list<ClientContext> contexts;
110 :
111 : // To be added to the result, containing the final configuration used for
112 : // client and config (incluiding host, etc.)
113 12 : ClientConfig result_client_config;
114 12 : ServerConfig result_server_config;
115 :
116 : // Get client, server lists
117 12 : auto workers = get_hosts("QPS_WORKERS");
118 12 : ClientConfig client_config = initial_client_config;
119 :
120 : // Spawn some local workers if desired
121 12 : vector<unique_ptr<QpsWorker>> local_workers;
122 18 : for (int i = 0; i < abs(spawn_local_worker_count); i++) {
123 : // act as if we're a new test -- gets a good rng seed
124 : static bool called_init = false;
125 12 : if (!called_init) {
126 : char args_buf[100];
127 6 : strcpy(args_buf, "some-benchmark");
128 6 : char* args[] = {args_buf};
129 6 : grpc_test_init(1, args);
130 6 : called_init = true;
131 : }
132 :
133 12 : int driver_port = grpc_pick_unused_port_or_die();
134 12 : int benchmark_port = grpc_pick_unused_port_or_die();
135 12 : local_workers.emplace_back(new QpsWorker(driver_port, benchmark_port));
136 : char addr[256];
137 12 : sprintf(addr, "localhost:%d", driver_port);
138 12 : if (spawn_local_worker_count < 0) {
139 12 : workers.push_front(addr);
140 : } else {
141 0 : workers.push_back(addr);
142 : }
143 : }
144 :
145 : // TODO(ctiller): support running multiple configurations, and binpack
146 : // client/server pairs
147 : // to available workers
148 6 : GPR_ASSERT(workers.size() >= num_clients + num_servers);
149 :
150 : // Trim to just what we need
151 6 : workers.resize(num_clients + num_servers);
152 :
153 : gpr_timespec deadline =
154 : gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
155 : gpr_time_from_seconds(
156 6 : warmup_seconds + benchmark_seconds + 20, GPR_TIMESPAN));
157 :
158 : // Start servers
159 : using runsc::ServerData;
160 : // servers is array rather than std::vector to avoid gcc-4.4 issues
161 : // where class contained in std::vector must have a copy constructor
162 6 : auto* servers = new ServerData[num_servers];
163 12 : for (size_t i = 0; i < num_servers; i++) {
164 18 : servers[i].stub =
165 12 : Worker::NewStub(CreateChannel(workers[i], InsecureCredentials()));
166 6 : ServerArgs args;
167 6 : result_server_config = server_config;
168 6 : result_server_config.set_host(workers[i]);
169 6 : *args.mutable_setup() = server_config;
170 18 : servers[i].stream =
171 12 : servers[i].stub->RunServer(runsc::AllocContext(&contexts, deadline));
172 6 : GPR_ASSERT(servers[i].stream->Write(args));
173 12 : ServerStatus init_status;
174 6 : GPR_ASSERT(servers[i].stream->Read(&init_status));
175 : char* host;
176 : char* driver_port;
177 : char* cli_target;
178 6 : gpr_split_host_port(workers[i].c_str(), &host, &driver_port);
179 6 : gpr_join_host_port(&cli_target, host, init_status.port());
180 6 : client_config.add_server_targets(cli_target);
181 6 : gpr_free(host);
182 6 : gpr_free(driver_port);
183 6 : gpr_free(cli_target);
184 6 : }
185 :
186 : // Start clients
187 : using runsc::ClientData;
188 : // clients is array rather than std::vector to avoid gcc-4.4 issues
189 : // where class contained in std::vector must have a copy constructor
190 6 : auto* clients = new ClientData[num_clients];
191 12 : for (size_t i = 0; i < num_clients; i++) {
192 18 : clients[i].stub = Worker::NewStub(
193 12 : CreateChannel(workers[i + num_servers], InsecureCredentials()));
194 6 : ClientArgs args;
195 6 : result_client_config = client_config;
196 6 : result_client_config.set_host(workers[i + num_servers]);
197 6 : *args.mutable_setup() = client_config;
198 18 : clients[i].stream =
199 12 : clients[i].stub->RunTest(runsc::AllocContext(&contexts, deadline));
200 6 : GPR_ASSERT(clients[i].stream->Write(args));
201 12 : ClientStatus init_status;
202 6 : GPR_ASSERT(clients[i].stream->Read(&init_status));
203 6 : }
204 :
205 : // Let everything warmup
206 6 : gpr_log(GPR_INFO, "Warming up");
207 6 : gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
208 : gpr_sleep_until(
209 6 : gpr_time_add(start, gpr_time_from_seconds(warmup_seconds, GPR_TIMESPAN)));
210 :
211 : // Start a run
212 6 : gpr_log(GPR_INFO, "Starting");
213 12 : ServerArgs server_mark;
214 6 : server_mark.mutable_mark();
215 12 : ClientArgs client_mark;
216 6 : client_mark.mutable_mark();
217 12 : for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
218 6 : GPR_ASSERT(server->stream->Write(server_mark));
219 : }
220 12 : for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
221 6 : GPR_ASSERT(client->stream->Write(client_mark));
222 : }
223 12 : ServerStatus server_status;
224 12 : ClientStatus client_status;
225 12 : for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
226 6 : GPR_ASSERT(server->stream->Read(&server_status));
227 : }
228 12 : for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
229 6 : GPR_ASSERT(client->stream->Read(&client_status));
230 : }
231 :
232 : // Wait some time
233 6 : gpr_log(GPR_INFO, "Running");
234 : // Use gpr_sleep_until rather than this_thread::sleep_until to support
235 : // compilers that don't work with this_thread
236 : gpr_sleep_until(gpr_time_add(
237 6 : start, gpr_time_from_seconds(benchmark_seconds, GPR_TIMESPAN)));
238 :
239 : // Finish a run
240 6 : std::unique_ptr<ScenarioResult> result(new ScenarioResult);
241 6 : result->client_config = result_client_config;
242 6 : result->server_config = result_server_config;
243 6 : gpr_log(GPR_INFO, "Finishing");
244 12 : for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
245 6 : GPR_ASSERT(server->stream->Write(server_mark));
246 : }
247 12 : for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
248 6 : GPR_ASSERT(client->stream->Write(client_mark));
249 : }
250 12 : for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
251 6 : GPR_ASSERT(server->stream->Read(&server_status));
252 6 : const auto& stats = server_status.stats();
253 6 : result->server_resources.emplace_back(
254 12 : stats.time_elapsed(), stats.time_user(), stats.time_system());
255 : }
256 12 : for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
257 6 : GPR_ASSERT(client->stream->Read(&client_status));
258 6 : const auto& stats = client_status.stats();
259 6 : result->latencies.MergeProto(stats.latencies());
260 6 : result->client_resources.emplace_back(
261 12 : stats.time_elapsed(), stats.time_user(), stats.time_system());
262 : }
263 :
264 12 : for (auto client = &clients[0]; client != &clients[num_clients]; client++) {
265 6 : GPR_ASSERT(client->stream->WritesDone());
266 6 : GPR_ASSERT(client->stream->Finish().ok());
267 : }
268 12 : for (auto server = &servers[0]; server != &servers[num_servers]; server++) {
269 6 : GPR_ASSERT(server->stream->WritesDone());
270 6 : GPR_ASSERT(server->stream->Finish().ok());
271 : }
272 6 : delete[] clients;
273 6 : delete[] servers;
274 12 : return result;
275 : }
276 : } // namespace testing
277 : } // namespace grpc
|