Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <signal.h>
35 : #include <unistd.h>
36 :
37 : #include <fstream>
38 : #include <memory>
39 : #include <sstream>
40 : #include <thread>
41 :
42 : #include <gflags/gflags.h>
43 : #include <grpc/grpc.h>
44 : #include <grpc/support/log.h>
45 : #include <grpc/support/useful.h>
46 : #include <grpc++/server.h>
47 : #include <grpc++/server_builder.h>
48 : #include <grpc++/server_context.h>
49 : #include <grpc++/security/server_credentials.h>
50 :
51 : #include "test/cpp/interop/server_helper.h"
52 : #include "test/cpp/util/test_config.h"
53 : #include "test/proto/test.grpc.pb.h"
54 : #include "test/proto/empty.grpc.pb.h"
55 : #include "test/proto/messages.grpc.pb.h"
56 :
57 1 : DEFINE_bool(use_tls, false, "Whether to use tls.");
58 1 : DEFINE_int32(port, 0, "Server port.");
59 :
60 : using grpc::Server;
61 : using grpc::ServerBuilder;
62 : using grpc::ServerContext;
63 : using grpc::ServerCredentials;
64 : using grpc::ServerReader;
65 : using grpc::ServerReaderWriter;
66 : using grpc::ServerWriter;
67 : using grpc::SslServerCredentialsOptions;
68 : using grpc::testing::InteropServerContextInspector;
69 : using grpc::testing::Payload;
70 : using grpc::testing::PayloadType;
71 : using grpc::testing::SimpleRequest;
72 : using grpc::testing::SimpleResponse;
73 : using grpc::testing::StreamingInputCallRequest;
74 : using grpc::testing::StreamingInputCallResponse;
75 : using grpc::testing::StreamingOutputCallRequest;
76 : using grpc::testing::StreamingOutputCallResponse;
77 : using grpc::testing::TestService;
78 : using grpc::Status;
79 :
80 : static bool got_sigint = false;
81 : static const char* kRandomFile = "test/cpp/interop/rnd.dat";
82 :
83 4 : bool SetPayload(PayloadType type, int size, Payload* payload) {
84 : PayloadType response_type;
85 4 : if (type == PayloadType::RANDOM) {
86 : response_type =
87 0 : rand() & 0x1 ? PayloadType::COMPRESSABLE : PayloadType::UNCOMPRESSABLE;
88 : } else {
89 4 : response_type = type;
90 : }
91 4 : payload->set_type(response_type);
92 4 : switch (response_type) {
93 : case PayloadType::COMPRESSABLE: {
94 4 : std::unique_ptr<char[]> body(new char[size]());
95 4 : payload->set_body(body.get(), size);
96 4 : } break;
97 : case PayloadType::UNCOMPRESSABLE: {
98 0 : std::unique_ptr<char[]> body(new char[size]());
99 0 : std::ifstream rnd_file(kRandomFile);
100 0 : GPR_ASSERT(rnd_file.good());
101 0 : rnd_file.read(body.get(), size);
102 0 : GPR_ASSERT(!rnd_file.eof()); // Requested more rnd bytes than available
103 0 : payload->set_body(body.get(), size);
104 0 : } break;
105 : default:
106 0 : GPR_ASSERT(false);
107 : }
108 4 : return true;
109 : }
110 :
111 : template <typename RequestType>
112 4 : void SetResponseCompression(ServerContext* context,
113 : const RequestType& request) {
114 4 : switch (request.response_compression()) {
115 : case grpc::testing::NONE:
116 4 : context->set_compression_algorithm(GRPC_COMPRESS_NONE);
117 4 : break;
118 : case grpc::testing::GZIP:
119 0 : context->set_compression_algorithm(GRPC_COMPRESS_GZIP);
120 0 : break;
121 : case grpc::testing::DEFLATE:
122 0 : context->set_compression_algorithm(GRPC_COMPRESS_DEFLATE);
123 0 : break;
124 : default:
125 0 : abort();
126 : }
127 4 : }
128 :
129 2 : class TestServiceImpl : public TestService::Service {
130 : public:
131 0 : Status EmptyCall(ServerContext* context, const grpc::testing::Empty* request,
132 : grpc::testing::Empty* response) {
133 0 : return Status::OK;
134 : }
135 :
136 4 : Status UnaryCall(ServerContext* context, const SimpleRequest* request,
137 : SimpleResponse* response) {
138 4 : SetResponseCompression(context, *request);
139 4 : if (request->response_size() > 0) {
140 4 : if (!SetPayload(request->response_type(), request->response_size(),
141 4 : response->mutable_payload())) {
142 0 : return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
143 : }
144 : }
145 :
146 4 : if (request->has_response_status()) {
147 : return Status(
148 0 : static_cast<grpc::StatusCode>(request->response_status().code()),
149 0 : request->response_status().message());
150 : }
151 :
152 4 : return Status::OK;
153 : }
154 :
155 0 : Status StreamingOutputCall(
156 : ServerContext* context, const StreamingOutputCallRequest* request,
157 : ServerWriter<StreamingOutputCallResponse>* writer) {
158 0 : SetResponseCompression(context, *request);
159 0 : StreamingOutputCallResponse response;
160 0 : bool write_success = true;
161 0 : for (int i = 0; write_success && i < request->response_parameters_size();
162 : i++) {
163 0 : if (!SetPayload(request->response_type(),
164 0 : request->response_parameters(i).size(),
165 0 : response.mutable_payload())) {
166 0 : return Status(grpc::StatusCode::INTERNAL, "Error creating payload.");
167 : }
168 0 : write_success = writer->Write(response);
169 : }
170 0 : if (write_success) {
171 0 : return Status::OK;
172 : } else {
173 0 : return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
174 0 : }
175 : }
176 :
177 0 : Status StreamingInputCall(ServerContext* context,
178 : ServerReader<StreamingInputCallRequest>* reader,
179 : StreamingInputCallResponse* response) {
180 0 : StreamingInputCallRequest request;
181 0 : int aggregated_payload_size = 0;
182 0 : while (reader->Read(&request)) {
183 0 : if (request.has_payload()) {
184 0 : aggregated_payload_size += request.payload().body().size();
185 : }
186 : }
187 0 : response->set_aggregated_payload_size(aggregated_payload_size);
188 0 : return Status::OK;
189 : }
190 :
191 0 : Status FullDuplexCall(
192 : ServerContext* context,
193 : ServerReaderWriter<StreamingOutputCallResponse,
194 : StreamingOutputCallRequest>* stream) {
195 0 : StreamingOutputCallRequest request;
196 0 : StreamingOutputCallResponse response;
197 0 : bool write_success = true;
198 0 : while (write_success && stream->Read(&request)) {
199 0 : SetResponseCompression(context, request);
200 0 : if (request.response_parameters_size() != 0) {
201 0 : response.mutable_payload()->set_type(request.payload().type());
202 : response.mutable_payload()->set_body(
203 0 : grpc::string(request.response_parameters(0).size(), '\0'));
204 0 : write_success = stream->Write(response);
205 : }
206 : }
207 0 : if (write_success) {
208 0 : return Status::OK;
209 : } else {
210 0 : return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
211 0 : }
212 : }
213 :
214 0 : Status HalfDuplexCall(
215 : ServerContext* context,
216 : ServerReaderWriter<StreamingOutputCallResponse,
217 : StreamingOutputCallRequest>* stream) {
218 0 : std::vector<StreamingOutputCallRequest> requests;
219 0 : StreamingOutputCallRequest request;
220 0 : while (stream->Read(&request)) {
221 0 : requests.push_back(request);
222 : }
223 :
224 0 : StreamingOutputCallResponse response;
225 0 : bool write_success = true;
226 0 : for (unsigned int i = 0; write_success && i < requests.size(); i++) {
227 0 : response.mutable_payload()->set_type(requests[i].payload().type());
228 0 : if (requests[i].response_parameters_size() == 0) {
229 : return Status(grpc::StatusCode::INTERNAL,
230 0 : "Request does not have response parameters.");
231 : }
232 : response.mutable_payload()->set_body(
233 0 : grpc::string(requests[i].response_parameters(0).size(), '\0'));
234 0 : write_success = stream->Write(response);
235 : }
236 0 : if (write_success) {
237 0 : return Status::OK;
238 : } else {
239 0 : return Status(grpc::StatusCode::INTERNAL, "Error writing response.");
240 0 : }
241 : }
242 : };
243 :
244 1 : void RunServer() {
245 1 : std::ostringstream server_address;
246 1 : server_address << "0.0.0.0:" << FLAGS_port;
247 2 : TestServiceImpl service;
248 :
249 2 : SimpleRequest request;
250 2 : SimpleResponse response;
251 :
252 2 : ServerBuilder builder;
253 1 : builder.RegisterService(&service);
254 : std::shared_ptr<ServerCredentials> creds =
255 2 : grpc::testing::CreateInteropServerCredentials();
256 1 : builder.AddListeningPort(server_address.str(), creds);
257 2 : std::unique_ptr<Server> server(builder.BuildAndStart());
258 1 : gpr_log(GPR_INFO, "Server listening on %s", server_address.str().c_str());
259 4 : while (!got_sigint) {
260 2 : sleep(5);
261 1 : }
262 1 : }
263 :
264 1 : static void sigint_handler(int x) { got_sigint = true; }
265 :
266 1 : int main(int argc, char** argv) {
267 1 : grpc::testing::InitTest(&argc, &argv, true);
268 1 : signal(SIGINT, sigint_handler);
269 :
270 1 : GPR_ASSERT(FLAGS_port != 0);
271 1 : RunServer();
272 :
273 1 : return 0;
274 3 : }
|