Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <mutex>
35 : #include <thread>
36 : #include <time.h>
37 :
38 : #include <grpc++/channel.h>
39 : #include <grpc++/client_context.h>
40 : #include <grpc++/create_channel.h>
41 : #include <grpc++/security/credentials.h>
42 : #include <grpc++/security/server_credentials.h>
43 : #include <grpc++/server.h>
44 : #include <grpc++/server_builder.h>
45 : #include <grpc++/server_context.h>
46 : #include <grpc/grpc.h>
47 : #include <grpc/support/atm.h>
48 : #include <grpc/support/thd.h>
49 : #include <grpc/support/time.h>
50 : #include <gtest/gtest.h>
51 :
52 : #include "test/core/util/port.h"
53 : #include "test/core/util/test_config.h"
54 : #include "test/cpp/util/echo_duplicate.grpc.pb.h"
55 : #include "test/cpp/util/echo.grpc.pb.h"
56 :
57 : using grpc::cpp::test::util::EchoRequest;
58 : using grpc::cpp::test::util::EchoResponse;
59 : using std::chrono::system_clock;
60 :
61 : const char* kLargeString =
62 : "("
63 : "To be, or not to be- that is the question:"
64 : "Whether 'tis nobler in the mind to suffer"
65 : "The slings and arrows of outrageous fortune"
66 : "Or to take arms against a sea of troubles,"
67 : "And by opposing end them. To die- to sleep-"
68 : "No more; and by a sleep to say we end"
69 : "The heartache, and the thousand natural shock"
70 : "That flesh is heir to. 'Tis a consummation"
71 : "Devoutly to be wish'd. To die- to sleep."
72 : "To sleep- perchance to dream: ay, there's the rub!"
73 : "For in that sleep of death what dreams may come"
74 : "When we have shuffled off this mortal coil,"
75 : "Must give us pause. There's the respect"
76 : "That makes calamity of so long life."
77 : "For who would bear the whips and scorns of time,"
78 : "Th' oppressor's wrong, the proud man's contumely,"
79 : "The pangs of despis'd love, the law's delay,"
80 : "The insolence of office, and the spurns"
81 : "That patient merit of th' unworthy takes,"
82 : "When he himself might his quietus make"
83 : "With a bare bodkin? Who would these fardels bear,"
84 : "To grunt and sweat under a weary life,"
85 : "But that the dread of something after death-"
86 : "The undiscover'd country, from whose bourn"
87 : "No traveller returns- puzzles the will,"
88 : "And makes us rather bear those ills we have"
89 : "Than fly to others that we know not of?"
90 : "Thus conscience does make cowards of us all,"
91 : "And thus the native hue of resolution"
92 : "Is sicklied o'er with the pale cast of thought,"
93 : "And enterprises of great pith and moment"
94 : "With this regard their currents turn awry"
95 : "And lose the name of action.- Soft you now!"
96 : "The fair Ophelia!- Nymph, in thy orisons"
97 : "Be all my sins rememb'red.";
98 :
99 : namespace grpc {
100 : namespace testing {
101 :
102 2 : class TestServiceImpl : public ::grpc::cpp::test::util::TestService::Service {
103 : public:
104 1 : static void BidiStream_Sender(
105 : ServerReaderWriter<EchoResponse, EchoRequest>* stream,
106 : gpr_atm* should_exit) {
107 1 : EchoResponse response;
108 1 : response.set_message(kLargeString);
109 28176 : while (gpr_atm_acq_load(should_exit) == static_cast<gpr_atm>(0)) {
110 28174 : struct timespec tv = {0, 1000000}; // 1 ms
111 : struct timespec rem;
112 : // TODO (vpai): Mark this blocking
113 56348 : while (nanosleep(&tv, &rem) != 0) {
114 0 : tv = rem;
115 : };
116 :
117 28174 : stream->Write(response);
118 1 : }
119 1 : }
120 :
121 : // Only implement the one method we will be calling for brevity.
122 1 : Status BidiStream(ServerContext* context,
123 : ServerReaderWriter<EchoResponse, EchoRequest>* stream)
124 : GRPC_OVERRIDE {
125 1 : EchoRequest request;
126 : gpr_atm should_exit;
127 1 : gpr_atm_rel_store(&should_exit, static_cast<gpr_atm>(0));
128 :
129 : std::thread sender(
130 2 : std::bind(&TestServiceImpl::BidiStream_Sender, stream, &should_exit));
131 :
132 10002 : while (stream->Read(&request)) {
133 10000 : struct timespec tv = {0, 3000000}; // 3 ms
134 : struct timespec rem;
135 : // TODO (vpai): Mark this blocking
136 20000 : while (nanosleep(&tv, &rem) != 0) {
137 0 : tv = rem;
138 : };
139 : }
140 1 : gpr_atm_rel_store(&should_exit, static_cast<gpr_atm>(1));
141 1 : sender.join();
142 2 : return Status::OK;
143 : }
144 : };
145 :
146 2 : class End2endTest : public ::testing::Test {
147 : protected:
148 1 : void SetUp() GRPC_OVERRIDE {
149 1 : int port = grpc_pick_unused_port_or_die();
150 1 : server_address_ << "localhost:" << port;
151 : // Setup server
152 1 : ServerBuilder builder;
153 : builder.AddListeningPort(server_address_.str(),
154 1 : InsecureServerCredentials());
155 1 : builder.RegisterService(&service_);
156 1 : server_ = builder.BuildAndStart();
157 1 : }
158 :
159 1 : void TearDown() GRPC_OVERRIDE { server_->Shutdown(); }
160 :
161 1 : void ResetStub() {
162 : std::shared_ptr<Channel> channel =
163 1 : CreateChannel(server_address_.str(), InsecureCredentials());
164 1 : stub_ = grpc::cpp::test::util::TestService::NewStub(channel);
165 1 : }
166 :
167 : std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
168 : std::unique_ptr<Server> server_;
169 : std::ostringstream server_address_;
170 : TestServiceImpl service_;
171 : };
172 :
173 1 : static void Drainer(ClientReaderWriter<EchoRequest, EchoResponse>* reader) {
174 1 : EchoResponse response;
175 1 : while (reader->Read(&response)) {
176 : // Just drain out the responses as fast as possible.
177 1 : }
178 1 : }
179 :
180 5 : TEST_F(End2endTest, StreamingThroughput) {
181 1 : ResetStub();
182 1 : grpc::ClientContext context;
183 2 : auto stream = stub_->BidiStream(&context);
184 :
185 1 : auto reader = stream.get();
186 2 : std::thread receiver(std::bind(Drainer, reader));
187 :
188 10001 : for (int i = 0; i < 10000; i++) {
189 10000 : EchoRequest request;
190 10000 : request.set_message(kLargeString);
191 10001 : ASSERT_TRUE(stream->Write(request));
192 10000 : if (i % 1000 == 0) {
193 10 : gpr_log(GPR_INFO, "Send count = %d", i);
194 : }
195 10000 : }
196 1 : stream->WritesDone();
197 2 : receiver.join();
198 : }
199 :
200 : } // namespace testing
201 : } // namespace grpc
202 :
203 1 : int main(int argc, char** argv) {
204 1 : grpc_test_init(argc, argv);
205 1 : ::testing::InitGoogleTest(&argc, argv);
206 1 : return RUN_ALL_TESTS();
207 3 : }
|