Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <mutex>
35 : #include <thread>
36 :
37 : #include <grpc/grpc.h>
38 : #include <grpc/support/thd.h>
39 : #include <grpc/support/time.h>
40 : #include <grpc++/channel.h>
41 : #include <grpc++/client_context.h>
42 : #include <grpc++/create_channel.h>
43 : #include <grpc++/server.h>
44 : #include <grpc++/server_builder.h>
45 : #include <grpc++/server_context.h>
46 : #include <gtest/gtest.h>
47 :
48 : #include "test/core/util/port.h"
49 : #include "test/core/util/test_config.h"
50 : #include "test/cpp/util/echo_duplicate.grpc.pb.h"
51 : #include "test/cpp/util/echo.grpc.pb.h"
52 :
53 : using grpc::cpp::test::util::EchoRequest;
54 : using grpc::cpp::test::util::EchoResponse;
55 : using std::chrono::system_clock;
56 :
57 : namespace grpc {
58 : namespace testing {
59 :
60 : namespace {
61 :
62 : // When echo_deadline is requested, deadline seen in the ServerContext is set in
63 : // the response in seconds.
64 100000 : void MaybeEchoDeadline(ServerContext* context, const EchoRequest* request,
65 : EchoResponse* response) {
66 100000 : if (request->has_param() && request->param().echo_deadline()) {
67 0 : gpr_timespec deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
68 0 : if (context->deadline() != system_clock::time_point::max()) {
69 0 : Timepoint2Timespec(context->deadline(), &deadline);
70 : }
71 0 : response->mutable_param()->set_request_deadline(deadline.tv_sec);
72 : }
73 99980 : }
74 :
75 : } // namespace
76 :
77 1 : class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
78 : public:
79 1 : TestServiceImpl() : signal_client_(false) {}
80 :
81 99998 : Status Echo(ServerContext* context, const EchoRequest* request,
82 : EchoResponse* response) GRPC_OVERRIDE {
83 99998 : response->set_message(request->message());
84 100000 : MaybeEchoDeadline(context, request, response);
85 100000 : if (request->has_param() && request->param().client_cancel_after_us()) {
86 : {
87 0 : std::unique_lock<std::mutex> lock(mu_);
88 0 : signal_client_ = true;
89 : }
90 0 : while (!context->IsCancelled()) {
91 : gpr_sleep_until(gpr_time_add(
92 : gpr_now(GPR_CLOCK_REALTIME),
93 0 : gpr_time_from_micros(request->param().client_cancel_after_us(),
94 0 : GPR_TIMESPAN)));
95 : }
96 0 : return Status::CANCELLED;
97 99999 : } else if (request->has_param() &&
98 0 : request->param().server_cancel_after_us()) {
99 : gpr_sleep_until(gpr_time_add(
100 : gpr_now(GPR_CLOCK_REALTIME),
101 0 : gpr_time_from_micros(request->param().server_cancel_after_us(),
102 0 : GPR_TIMESPAN)));
103 0 : return Status::CANCELLED;
104 : } else {
105 100000 : EXPECT_FALSE(context->IsCancelled());
106 : }
107 99992 : return Status::OK;
108 : }
109 :
110 : // Unimplemented is left unimplemented to test the returned error.
111 :
112 0 : Status RequestStream(ServerContext* context,
113 : ServerReader<EchoRequest>* reader,
114 : EchoResponse* response) GRPC_OVERRIDE {
115 0 : EchoRequest request;
116 0 : response->set_message("");
117 0 : while (reader->Read(&request)) {
118 0 : response->mutable_message()->append(request.message());
119 : }
120 0 : return Status::OK;
121 : }
122 :
123 : // Return 3 messages.
124 : // TODO(yangg) make it generic by adding a parameter into EchoRequest
125 0 : Status ResponseStream(ServerContext* context, const EchoRequest* request,
126 : ServerWriter<EchoResponse>* writer) GRPC_OVERRIDE {
127 0 : EchoResponse response;
128 0 : response.set_message(request->message() + "0");
129 0 : writer->Write(response);
130 0 : response.set_message(request->message() + "1");
131 0 : writer->Write(response);
132 0 : response.set_message(request->message() + "2");
133 0 : writer->Write(response);
134 :
135 0 : return Status::OK;
136 : }
137 :
138 0 : Status BidiStream(ServerContext* context,
139 : ServerReaderWriter<EchoResponse, EchoRequest>* stream)
140 : GRPC_OVERRIDE {
141 0 : EchoRequest request;
142 0 : EchoResponse response;
143 0 : while (stream->Read(&request)) {
144 0 : gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
145 0 : response.set_message(request.message());
146 0 : stream->Write(response);
147 : }
148 0 : return Status::OK;
149 : }
150 :
151 : bool signal_client() {
152 : std::unique_lock<std::mutex> lock(mu_);
153 : return signal_client_;
154 : }
155 :
156 : private:
157 : bool signal_client_;
158 : std::mutex mu_;
159 : };
160 :
161 2 : class TestServiceImplDupPkg
162 : : public ::grpc::cpp::test::util::duplicate::TestService::Service {
163 : public:
164 0 : Status Echo(ServerContext* context, const EchoRequest* request,
165 : EchoResponse* response) GRPC_OVERRIDE {
166 0 : response->set_message("no package");
167 0 : return Status::OK;
168 : }
169 : };
170 :
171 1 : class End2endTest : public ::testing::Test {
172 : protected:
173 1 : End2endTest() : kMaxMessageSize_(8192) {}
174 :
175 1 : void SetUp() GRPC_OVERRIDE {
176 1 : int port = grpc_pick_unused_port_or_die();
177 1 : server_address_ << "localhost:" << port;
178 : // Setup server
179 1 : ServerBuilder builder;
180 : builder.AddListeningPort(server_address_.str(),
181 1 : InsecureServerCredentials());
182 1 : builder.RegisterService(&service_);
183 : builder.SetMaxMessageSize(
184 1 : kMaxMessageSize_); // For testing max message size.
185 1 : builder.RegisterService(&dup_pkg_service_);
186 1 : server_ = builder.BuildAndStart();
187 1 : }
188 :
189 1 : void TearDown() GRPC_OVERRIDE { server_->Shutdown(); }
190 :
191 1 : void ResetStub() {
192 : std::shared_ptr<Channel> channel =
193 1 : CreateChannel(server_address_.str(), InsecureCredentials());
194 1 : stub_ = grpc::cpp::test::util::TestService::NewStub(channel);
195 1 : }
196 :
197 : std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
198 : std::unique_ptr<Server> server_;
199 : std::ostringstream server_address_;
200 : const int kMaxMessageSize_;
201 : TestServiceImpl service_;
202 : TestServiceImplDupPkg dup_pkg_service_;
203 : };
204 :
205 100 : static void SendRpc(grpc::cpp::test::util::TestService::Stub* stub,
206 : int num_rpcs) {
207 100 : EchoRequest request;
208 200 : EchoResponse response;
209 100 : request.set_message("Hello");
210 :
211 100097 : for (int i = 0; i < num_rpcs; ++i) {
212 99997 : ClientContext context;
213 199988 : Status s = stub->Echo(&context, request, &response);
214 100000 : EXPECT_EQ(response.message(), request.message());
215 99997 : EXPECT_TRUE(s.ok());
216 100093 : }
217 100 : }
218 :
219 5 : TEST_F(End2endTest, ThreadStress) {
220 1 : ResetStub();
221 1 : std::vector<std::thread*> threads;
222 101 : for (int i = 0; i < 100; ++i) {
223 100 : threads.push_back(new std::thread(SendRpc, stub_.get(), 1000));
224 : }
225 101 : for (int i = 0; i < 100; ++i) {
226 100 : threads[i]->join();
227 100 : delete threads[i];
228 1 : }
229 1 : }
230 :
231 : } // namespace testing
232 : } // namespace grpc
233 :
234 1 : int main(int argc, char** argv) {
235 1 : grpc_test_init(argc, argv);
236 1 : ::testing::InitGoogleTest(&argc, argv);
237 1 : return RUN_ALL_TESTS();
238 3 : }
|