Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #ifndef GRPCXX_SERVER_H
35 : #define GRPCXX_SERVER_H
36 :
37 : #include <list>
38 : #include <memory>
39 :
40 : #include <grpc/compression.h>
41 : #include <grpc++/completion_queue.h>
42 : #include <grpc++/impl/call.h>
43 : #include <grpc++/impl/grpc_library.h>
44 : #include <grpc++/impl/sync.h>
45 : #include <grpc++/security/server_credentials.h>
46 : #include <grpc++/support/config.h>
47 : #include <grpc++/support/status.h>
48 :
49 : struct grpc_server;
50 :
51 : namespace grpc {
52 :
53 : class AsynchronousService;
54 : class GenericServerContext;
55 : class AsyncGenericService;
56 : class RpcService;
57 : class RpcServiceMethod;
58 : class ServerAsyncStreamingInterface;
59 : class ThreadPoolInterface;
60 :
61 : /// Models a gRPC server.
62 : ///
63 : /// Servers are configured and started via \a grpc::ServerBuilder.
64 : class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
65 : public:
66 : ~Server();
67 :
68 : /// Shutdown the server, blocking until all rpc processing finishes.
69 : /// Forcefully terminate pending calls after \a deadline expires.
70 : ///
71 : /// \param deadline How long to wait until pending rpcs are forcefully
72 : /// terminated.
73 : template <class T>
74 5 : void Shutdown(const T& deadline) {
75 5 : ShutdownInternal(TimePoint<T>(deadline).raw_time());
76 5 : }
77 :
78 : /// Shutdown the server, waiting for all rpc processing to finish.
79 154 : void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); }
80 :
81 : /// Block waiting for all work to complete.
82 : ///
83 : /// \warning The server must be either shutting down or some other thread must
84 : /// call \a Shutdown for this function to ever return.
85 : void Wait();
86 :
87 : private:
88 : friend class AsyncGenericService;
89 : friend class AsynchronousService;
90 : friend class ServerBuilder;
91 :
92 : class SyncRequest;
93 : class AsyncRequest;
94 : class ShutdownRequest;
95 :
96 : /// Server constructors. To be used by \a ServerBuilder only.
97 : ///
98 : /// \param thread_pool The threadpool instance to use for call processing.
99 : /// \param thread_pool_owned Does the server own the \a thread_pool instance?
100 : /// \param max_message_size Maximum message length that the channel can
101 : /// receive.
102 : Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
103 : int max_message_size, grpc_compression_options compression_options);
104 :
105 : /// Register a service. This call does not take ownership of the service.
106 : /// The service must exist for the lifetime of the Server instance.
107 : bool RegisterService(const grpc::string* host, RpcService* service);
108 :
109 : /// Register an asynchronous service. This call does not take ownership of the
110 : /// service. The service must exist for the lifetime of the Server instance.
111 : bool RegisterAsyncService(const grpc::string* host,
112 : AsynchronousService* service);
113 :
114 : /// Register a generic service. This call does not take ownership of the
115 : /// service. The service must exist for the lifetime of the Server instance.
116 : void RegisterAsyncGenericService(AsyncGenericService* service);
117 :
118 : /// Tries to bind \a server to the given \a addr.
119 : ///
120 : /// It can be invoked multiple times.
121 : ///
122 : /// \param addr The address to try to bind to the server (eg, localhost:1234,
123 : /// 192.168.1.1:31416, [::1]:27182, etc.).
124 : /// \params creds The credentials associated with the server.
125 : ///
126 : /// \return bound port number on sucess, 0 on failure.
127 : ///
128 : /// \warning It's an error to call this method on an already started server.
129 : int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
130 :
131 : /// Start the server.
132 : ///
133 : /// \param cqs Completion queues for handling asynchronous services. The
134 : /// caller is required to keep all completion queues live until the server is
135 : /// destroyed.
136 : /// \param num_cqs How many completion queues does \a cqs hold.
137 : ///
138 : /// \return true on a successful shutdown.
139 : bool Start(ServerCompletionQueue** cqs, size_t num_cqs);
140 :
141 : void HandleQueueClosed();
142 :
143 : /// Process one or more incoming calls.
144 : void RunRpc();
145 :
146 : /// Schedule \a RunRpc to run in the threadpool.
147 : void ScheduleCallback();
148 :
149 : void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
150 :
151 : void ShutdownInternal(gpr_timespec deadline);
152 :
153 : class BaseAsyncRequest : public CompletionQueueTag {
154 : public:
155 : BaseAsyncRequest(Server* server, ServerContext* context,
156 : ServerAsyncStreamingInterface* stream,
157 : CompletionQueue* call_cq, void* tag,
158 : bool delete_on_finalize);
159 : virtual ~BaseAsyncRequest();
160 :
161 : bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
162 :
163 : protected:
164 : Server* const server_;
165 : ServerContext* const context_;
166 : ServerAsyncStreamingInterface* const stream_;
167 : CompletionQueue* const call_cq_;
168 : void* const tag_;
169 : const bool delete_on_finalize_;
170 : grpc_call* call_;
171 : grpc_metadata_array initial_metadata_array_;
172 : };
173 :
174 1245724 : class RegisteredAsyncRequest : public BaseAsyncRequest {
175 : public:
176 : RegisteredAsyncRequest(Server* server, ServerContext* context,
177 : ServerAsyncStreamingInterface* stream,
178 : CompletionQueue* call_cq, void* tag);
179 :
180 : // uses BaseAsyncRequest::FinalizeResult
181 :
182 : protected:
183 : void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
184 : ServerCompletionQueue* notification_cq);
185 : };
186 :
187 130367 : class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
188 : public:
189 65218 : NoPayloadAsyncRequest(void* registered_method, Server* server,
190 : ServerContext* context,
191 : ServerAsyncStreamingInterface* stream,
192 : CompletionQueue* call_cq,
193 : ServerCompletionQueue* notification_cq, void* tag)
194 65218 : : RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
195 65145 : IssueRequest(registered_method, nullptr, notification_cq);
196 65260 : }
197 :
198 : // uses RegisteredAsyncRequest::FinalizeResult
199 : };
200 :
201 : template <class Message>
202 2361182 : class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
203 : public:
204 1180579 : PayloadAsyncRequest(void* registered_method, Server* server,
205 : ServerContext* context,
206 : ServerAsyncStreamingInterface* stream,
207 : CompletionQueue* call_cq,
208 : ServerCompletionQueue* notification_cq, void* tag,
209 : Message* request)
210 : : RegisteredAsyncRequest(server, context, stream, call_cq, tag),
211 1180579 : request_(request) {
212 1180287 : IssueRequest(registered_method, &payload_, notification_cq);
213 1180721 : }
214 :
215 1180705 : bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
216 : bool serialization_status =
217 4599544 : *status && payload_ &&
218 : SerializationTraits<Message>::Deserialize(
219 3500995 : payload_, request_, server_->max_message_size_).ok();
220 1180689 : bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status);
221 1180708 : *status = serialization_status&&* status;
222 1180708 : return ret;
223 : }
224 :
225 : private:
226 : grpc_byte_buffer* payload_;
227 : Message* const request_;
228 : };
229 :
230 66 : class GenericAsyncRequest : public BaseAsyncRequest {
231 : public:
232 : GenericAsyncRequest(Server* server, GenericServerContext* context,
233 : ServerAsyncStreamingInterface* stream,
234 : CompletionQueue* call_cq,
235 : ServerCompletionQueue* notification_cq, void* tag,
236 : bool delete_on_finalize);
237 :
238 : bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
239 :
240 : private:
241 : grpc_call_details call_details_;
242 : };
243 :
244 : class UnimplementedAsyncRequestContext;
245 : class UnimplementedAsyncRequest;
246 : class UnimplementedAsyncResponse;
247 :
248 : template <class Message>
249 1180583 : void RequestAsyncCall(void* registered_method, ServerContext* context,
250 : ServerAsyncStreamingInterface* stream,
251 : CompletionQueue* call_cq,
252 : ServerCompletionQueue* notification_cq, void* tag,
253 : Message* message) {
254 1180583 : new PayloadAsyncRequest<Message>(registered_method, this, context, stream,
255 : call_cq, notification_cq, tag, message);
256 1180723 : }
257 :
258 65140 : void RequestAsyncCall(void* registered_method, ServerContext* context,
259 : ServerAsyncStreamingInterface* stream,
260 : CompletionQueue* call_cq,
261 : ServerCompletionQueue* notification_cq, void* tag) {
262 : new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq,
263 65140 : notification_cq, tag);
264 65260 : }
265 :
266 12 : void RequestAsyncGenericCall(GenericServerContext* context,
267 : ServerAsyncStreamingInterface* stream,
268 : CompletionQueue* call_cq,
269 : ServerCompletionQueue* notification_cq,
270 : void* tag) {
271 : new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
272 12 : tag, true);
273 12 : }
274 :
275 : const int max_message_size_;
276 :
277 : // Completion queue.
278 : CompletionQueue cq_;
279 :
280 : // Sever status
281 : grpc::mutex mu_;
282 : bool started_;
283 : bool shutdown_;
284 : // The number of threads which are running callbacks.
285 : int num_running_cb_;
286 : grpc::condition_variable callback_cv_;
287 :
288 : std::list<SyncRequest>* sync_methods_;
289 : std::unique_ptr<RpcServiceMethod> unknown_method_;
290 : bool has_generic_service_;
291 :
292 : // Pointer to the c grpc server.
293 : grpc_server* const server_;
294 :
295 : ThreadPoolInterface* thread_pool_;
296 : // Whether the thread pool is created and owned by the server.
297 : bool thread_pool_owned_;
298 : };
299 :
300 : } // namespace grpc
301 :
302 : #endif // GRPCXX_SERVER_H
|