Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <memory>
35 :
36 : #include <grpc/grpc.h>
37 : #include <grpc/support/thd.h>
38 : #include <grpc/support/time.h>
39 : #include <grpc++/impl/proto_utils.h>
40 : #include <grpc++/channel.h>
41 : #include <grpc++/client_context.h>
42 : #include <grpc++/create_channel.h>
43 : #include <grpc++/generic/async_generic_service.h>
44 : #include <grpc++/generic/generic_stub.h>
45 : #include <grpc++/server.h>
46 : #include <grpc++/server_builder.h>
47 : #include <grpc++/server_context.h>
48 : #include <grpc++/support/slice.h>
49 : #include <gtest/gtest.h>
50 :
51 : #include "test/core/util/port.h"
52 : #include "test/core/util/test_config.h"
53 : #include "test/cpp/util/echo.grpc.pb.h"
54 :
55 : using grpc::cpp::test::util::EchoRequest;
56 : using grpc::cpp::test::util::EchoResponse;
57 : using std::chrono::system_clock;
58 :
59 : namespace grpc {
60 : namespace testing {
61 : namespace {
62 :
63 218 : void* tag(int i) { return (void*)(gpr_intptr)i; }
64 :
65 109 : void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
66 : bool ok;
67 : void* got_tag;
68 109 : EXPECT_TRUE(cq->Next(&got_tag, &ok));
69 109 : EXPECT_EQ(expect_ok, ok);
70 109 : EXPECT_EQ(tag(i), got_tag);
71 109 : }
72 :
73 24 : bool ParseFromByteBuffer(ByteBuffer* buffer, grpc::protobuf::Message* message) {
74 24 : std::vector<Slice> slices;
75 24 : buffer->Dump(&slices);
76 48 : grpc::string buf;
77 24 : buf.reserve(buffer->Length());
78 48 : for (auto s = slices.begin(); s != slices.end(); s++) {
79 24 : buf.append(reinterpret_cast<const char*>(s->begin()), s->size());
80 : }
81 48 : return message->ParseFromString(buf);
82 : }
83 :
84 24 : std::unique_ptr<ByteBuffer> SerializeToByteBuffer(
85 : grpc::protobuf::Message* message) {
86 24 : grpc::string buf;
87 24 : message->SerializeToString(&buf);
88 24 : gpr_slice s = gpr_slice_from_copied_string(buf.c_str());
89 48 : Slice slice(s, Slice::STEAL_REF);
90 48 : return std::unique_ptr<ByteBuffer>(new ByteBuffer(&slice, 1));
91 : }
92 :
93 3 : class GenericEnd2endTest : public ::testing::Test {
94 : protected:
95 3 : GenericEnd2endTest() : generic_service_("*"), server_host_("localhost") {}
96 :
97 3 : void SetUp() GRPC_OVERRIDE {
98 3 : int port = grpc_pick_unused_port_or_die();
99 3 : server_address_ << server_host_ << ":" << port;
100 : // Setup server
101 3 : ServerBuilder builder;
102 : builder.AddListeningPort(server_address_.str(),
103 3 : InsecureServerCredentials());
104 3 : builder.RegisterAsyncGenericService(&generic_service_);
105 3 : srv_cq_ = builder.AddCompletionQueue();
106 3 : server_ = builder.BuildAndStart();
107 3 : }
108 :
109 3 : void TearDown() GRPC_OVERRIDE {
110 3 : server_->Shutdown();
111 : void* ignored_tag;
112 : bool ignored_ok;
113 3 : cli_cq_.Shutdown();
114 3 : srv_cq_->Shutdown();
115 3 : while (cli_cq_.Next(&ignored_tag, &ignored_ok))
116 : ;
117 3 : while (srv_cq_->Next(&ignored_tag, &ignored_ok))
118 : ;
119 3 : }
120 :
121 3 : void ResetStub() {
122 : std::shared_ptr<Channel> channel =
123 3 : CreateChannel(server_address_.str(), InsecureCredentials());
124 3 : generic_stub_.reset(new GenericStub(channel));
125 3 : }
126 :
127 36 : void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
128 60 : void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
129 1 : void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
130 : void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
131 :
132 2 : void SendRpc(int num_rpcs) {
133 2 : const grpc::string kMethodName("/grpc.cpp.test.util.TestService/Echo");
134 13 : for (int i = 0; i < num_rpcs; i++) {
135 11 : EchoRequest send_request;
136 22 : EchoRequest recv_request;
137 22 : EchoResponse send_response;
138 22 : EchoResponse recv_response;
139 22 : Status recv_status;
140 :
141 22 : ClientContext cli_ctx;
142 22 : GenericServerContext srv_ctx;
143 22 : GenericServerAsyncReaderWriter stream(&srv_ctx);
144 :
145 : // The string needs to be long enough to test heap-based slice.
146 11 : send_request.set_message("Hello world. Hello world. Hello world.");
147 : std::unique_ptr<GenericClientAsyncReaderWriter> call =
148 22 : generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1));
149 11 : client_ok(1);
150 : std::unique_ptr<ByteBuffer> send_buffer =
151 22 : SerializeToByteBuffer(&send_request);
152 11 : call->Write(*send_buffer, tag(2));
153 11 : client_ok(2);
154 11 : call->WritesDone(tag(3));
155 11 : client_ok(3);
156 :
157 11 : generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
158 22 : srv_cq_.get(), tag(4));
159 :
160 11 : verify_ok(srv_cq_.get(), 4, true);
161 11 : EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
162 11 : EXPECT_EQ(kMethodName, srv_ctx.method());
163 11 : ByteBuffer recv_buffer;
164 11 : stream.Read(&recv_buffer, tag(5));
165 11 : server_ok(5);
166 11 : EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
167 11 : EXPECT_EQ(send_request.message(), recv_request.message());
168 :
169 11 : send_response.set_message(recv_request.message());
170 11 : send_buffer = SerializeToByteBuffer(&send_response);
171 11 : stream.Write(*send_buffer, tag(6));
172 11 : server_ok(6);
173 :
174 11 : stream.Finish(Status::OK, tag(7));
175 11 : server_ok(7);
176 :
177 11 : recv_buffer.Clear();
178 11 : call->Read(&recv_buffer, tag(8));
179 11 : client_ok(8);
180 11 : EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
181 :
182 11 : call->Finish(&recv_status, tag(9));
183 11 : client_ok(9);
184 :
185 11 : EXPECT_EQ(send_response.message(), recv_response.message());
186 11 : EXPECT_TRUE(recv_status.ok());
187 24 : }
188 2 : }
189 :
190 : CompletionQueue cli_cq_;
191 : std::unique_ptr<ServerCompletionQueue> srv_cq_;
192 : std::unique_ptr<grpc::cpp::test::util::TestService::Stub> stub_;
193 : std::unique_ptr<grpc::GenericStub> generic_stub_;
194 : std::unique_ptr<Server> server_;
195 : AsyncGenericService generic_service_;
196 : const grpc::string server_host_;
197 : std::ostringstream server_address_;
198 : };
199 :
200 5 : TEST_F(GenericEnd2endTest, SimpleRpc) {
201 1 : ResetStub();
202 1 : SendRpc(1);
203 1 : }
204 :
205 5 : TEST_F(GenericEnd2endTest, SequentialRpcs) {
206 1 : ResetStub();
207 1 : SendRpc(10);
208 1 : }
209 :
210 : // One ping, one pong.
211 5 : TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
212 1 : ResetStub();
213 :
214 1 : const grpc::string kMethodName("/grpc.cpp.test.util.TestService/BidiStream");
215 2 : EchoRequest send_request;
216 2 : EchoRequest recv_request;
217 2 : EchoResponse send_response;
218 2 : EchoResponse recv_response;
219 2 : Status recv_status;
220 2 : ClientContext cli_ctx;
221 2 : GenericServerContext srv_ctx;
222 2 : GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
223 :
224 1 : cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
225 1 : send_request.set_message("Hello");
226 : std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
227 2 : generic_stub_->Call(&cli_ctx, kMethodName, &cli_cq_, tag(1));
228 1 : client_ok(1);
229 :
230 1 : generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
231 2 : srv_cq_.get(), tag(2));
232 :
233 1 : verify_ok(srv_cq_.get(), 2, true);
234 1 : EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
235 1 : EXPECT_EQ(kMethodName, srv_ctx.method());
236 :
237 : std::unique_ptr<ByteBuffer> send_buffer =
238 1 : SerializeToByteBuffer(&send_request);
239 1 : cli_stream->Write(*send_buffer, tag(3));
240 1 : client_ok(3);
241 :
242 2 : ByteBuffer recv_buffer;
243 1 : srv_stream.Read(&recv_buffer, tag(4));
244 1 : server_ok(4);
245 1 : EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
246 1 : EXPECT_EQ(send_request.message(), recv_request.message());
247 :
248 1 : send_response.set_message(recv_request.message());
249 1 : send_buffer = SerializeToByteBuffer(&send_response);
250 1 : srv_stream.Write(*send_buffer, tag(5));
251 1 : server_ok(5);
252 :
253 1 : cli_stream->Read(&recv_buffer, tag(6));
254 1 : client_ok(6);
255 1 : EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
256 1 : EXPECT_EQ(send_response.message(), recv_response.message());
257 :
258 1 : cli_stream->WritesDone(tag(7));
259 1 : client_ok(7);
260 :
261 1 : srv_stream.Read(&recv_buffer, tag(8));
262 1 : server_fail(8);
263 :
264 1 : srv_stream.Finish(Status::OK, tag(9));
265 1 : server_ok(9);
266 :
267 1 : cli_stream->Finish(&recv_status, tag(10));
268 1 : client_ok(10);
269 :
270 1 : EXPECT_EQ(send_response.message(), recv_response.message());
271 3 : EXPECT_TRUE(recv_status.ok());
272 1 : }
273 :
274 : } // namespace
275 : } // namespace testing
276 : } // namespace grpc
277 :
278 1 : int main(int argc, char** argv) {
279 1 : grpc_test_init(argc, argv);
280 1 : ::testing::InitGoogleTest(&argc, argv);
281 1 : return RUN_ALL_TESTS();
282 3 : }
|