Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #ifndef GRPCXX_SERVER_H
35 : #define GRPCXX_SERVER_H
36 :
37 : #include <list>
38 : #include <memory>
39 :
40 : #include <grpc++/completion_queue.h>
41 : #include <grpc++/impl/call.h>
42 : #include <grpc++/impl/grpc_library.h>
43 : #include <grpc++/impl/sync.h>
44 : #include <grpc++/security/server_credentials.h>
45 : #include <grpc++/support/channel_arguments.h>
46 : #include <grpc++/support/config.h>
47 : #include <grpc++/support/status.h>
48 : #include <grpc/compression.h>
49 :
50 : struct grpc_server;
51 :
52 : namespace grpc {
53 :
54 : class AsynchronousService;
55 : class GenericServerContext;
56 : class AsyncGenericService;
57 : class RpcService;
58 : class RpcServiceMethod;
59 : class ServerAsyncStreamingInterface;
60 : class ServerContext;
61 : class ThreadPoolInterface;
62 :
63 : /// Models a gRPC server.
64 : ///
65 : /// Servers are configured and started via \a grpc::ServerBuilder.
66 : class Server GRPC_FINAL : public GrpcLibrary, private CallHook {
67 : public:
68 : ~Server();
69 :
70 : /// Shutdown the server, blocking until all rpc processing finishes.
71 : /// Forcefully terminate pending calls after \a deadline expires.
72 : ///
73 : /// \param deadline How long to wait until pending rpcs are forcefully
74 : /// terminated.
75 : template <class T>
76 4 : void Shutdown(const T& deadline) {
77 4 : ShutdownInternal(TimePoint<T>(deadline).raw_time());
78 4 : }
79 :
80 : /// Shutdown the server, waiting for all rpc processing to finish.
81 156 : void Shutdown() { ShutdownInternal(gpr_inf_future(GPR_CLOCK_MONOTONIC)); }
82 :
83 : /// Block waiting for all work to complete.
84 : ///
85 : /// \warning The server must be either shutting down or some other thread must
86 : /// call \a Shutdown for this function to ever return.
87 : void Wait();
88 :
89 : /// Global Callbacks
90 : ///
91 : /// Can be set exactly once per application to install hooks whenever
92 : /// a server event occurs
93 : class GlobalCallbacks {
94 : public:
95 16 : virtual ~GlobalCallbacks() {}
96 : /// Called before application callback for each synchronous server request
97 : virtual void PreSynchronousRequest(ServerContext* context) = 0;
98 : /// Called after application callback for each synchronous server request
99 : virtual void PostSynchronousRequest(ServerContext* context) = 0;
100 : };
101 : /// Set the global callback object. Can only be called once. Does not take
102 : /// ownership of callbacks, and expects the pointed to object to be alive
103 : /// until all server objects in the process have been destroyed.
104 : static void SetGlobalCallbacks(GlobalCallbacks* callbacks);
105 :
106 : private:
107 : friend class AsyncGenericService;
108 : friend class AsynchronousService;
109 : friend class ServerBuilder;
110 :
111 : class SyncRequest;
112 : class AsyncRequest;
113 : class ShutdownRequest;
114 :
115 : /// Server constructors. To be used by \a ServerBuilder only.
116 : ///
117 : /// \param thread_pool The threadpool instance to use for call processing.
118 : /// \param thread_pool_owned Does the server own the \a thread_pool instance?
119 : /// \param max_message_size Maximum message length that the channel can
120 : /// receive.
121 : Server(ThreadPoolInterface* thread_pool, bool thread_pool_owned,
122 : int max_message_size, const ChannelArguments& args);
123 :
124 : /// Register a service. This call does not take ownership of the service.
125 : /// The service must exist for the lifetime of the Server instance.
126 : bool RegisterService(const grpc::string* host, RpcService* service);
127 :
128 : /// Register an asynchronous service. This call does not take ownership of the
129 : /// service. The service must exist for the lifetime of the Server instance.
130 : bool RegisterAsyncService(const grpc::string* host,
131 : AsynchronousService* service);
132 :
133 : /// Register a generic service. This call does not take ownership of the
134 : /// service. The service must exist for the lifetime of the Server instance.
135 : void RegisterAsyncGenericService(AsyncGenericService* service);
136 :
137 : /// Tries to bind \a server to the given \a addr.
138 : ///
139 : /// It can be invoked multiple times.
140 : ///
141 : /// \param addr The address to try to bind to the server (eg, localhost:1234,
142 : /// 192.168.1.1:31416, [::1]:27182, etc.).
143 : /// \params creds The credentials associated with the server.
144 : ///
145 : /// \return bound port number on sucess, 0 on failure.
146 : ///
147 : /// \warning It's an error to call this method on an already started server.
148 : int AddListeningPort(const grpc::string& addr, ServerCredentials* creds);
149 :
150 : /// Start the server.
151 : ///
152 : /// \param cqs Completion queues for handling asynchronous services. The
153 : /// caller is required to keep all completion queues live until the server is
154 : /// destroyed.
155 : /// \param num_cqs How many completion queues does \a cqs hold.
156 : ///
157 : /// \return true on a successful shutdown.
158 : bool Start(ServerCompletionQueue** cqs, size_t num_cqs);
159 :
160 : void HandleQueueClosed();
161 :
162 : /// Process one or more incoming calls.
163 : void RunRpc();
164 :
165 : /// Schedule \a RunRpc to run in the threadpool.
166 : void ScheduleCallback();
167 :
168 : void PerformOpsOnCall(CallOpSetInterface* ops, Call* call) GRPC_OVERRIDE;
169 :
170 : void ShutdownInternal(gpr_timespec deadline);
171 :
172 : class BaseAsyncRequest : public CompletionQueueTag {
173 : public:
174 : BaseAsyncRequest(Server* server, ServerContext* context,
175 : ServerAsyncStreamingInterface* stream,
176 : CompletionQueue* call_cq, void* tag,
177 : bool delete_on_finalize);
178 : virtual ~BaseAsyncRequest();
179 :
180 : bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
181 :
182 : protected:
183 : Server* const server_;
184 : ServerContext* const context_;
185 : ServerAsyncStreamingInterface* const stream_;
186 : CompletionQueue* const call_cq_;
187 : void* const tag_;
188 : const bool delete_on_finalize_;
189 : grpc_call* call_;
190 : grpc_metadata_array initial_metadata_array_;
191 : };
192 :
193 1508013 : class RegisteredAsyncRequest : public BaseAsyncRequest {
194 : public:
195 : RegisteredAsyncRequest(Server* server, ServerContext* context,
196 : ServerAsyncStreamingInterface* stream,
197 : CompletionQueue* call_cq, void* tag);
198 :
199 : // uses BaseAsyncRequest::FinalizeResult
200 :
201 : protected:
202 : void IssueRequest(void* registered_method, grpc_byte_buffer** payload,
203 : ServerCompletionQueue* notification_cq);
204 : };
205 :
206 81914 : class NoPayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
207 : public:
208 40185 : NoPayloadAsyncRequest(void* registered_method, Server* server,
209 : ServerContext* context,
210 : ServerAsyncStreamingInterface* stream,
211 : CompletionQueue* call_cq,
212 : ServerCompletionQueue* notification_cq, void* tag)
213 40185 : : RegisteredAsyncRequest(server, context, stream, call_cq, tag) {
214 41590 : IssueRequest(registered_method, nullptr, notification_cq);
215 41602 : }
216 :
217 : // uses RegisteredAsyncRequest::FinalizeResult
218 : };
219 :
220 : template <class Message>
221 2934227 : class PayloadAsyncRequest GRPC_FINAL : public RegisteredAsyncRequest {
222 : public:
223 1465630 : PayloadAsyncRequest(void* registered_method, Server* server,
224 : ServerContext* context,
225 : ServerAsyncStreamingInterface* stream,
226 : CompletionQueue* call_cq,
227 : ServerCompletionQueue* notification_cq, void* tag,
228 : Message* request)
229 : : RegisteredAsyncRequest(server, context, stream, call_cq, tag),
230 1465630 : request_(request) {
231 1466651 : IssueRequest(registered_method, &payload_, notification_cq);
232 1467261 : }
233 :
234 1467377 : bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE {
235 : bool serialization_status =
236 5778824 : *status && payload_ &&
237 : SerializationTraits<Message>::Deserialize(
238 4371587 : payload_, request_, server_->max_message_size_).ok();
239 1466014 : bool ret = RegisteredAsyncRequest::FinalizeResult(tag, status);
240 1467263 : *status = serialization_status&&* status;
241 1467263 : return ret;
242 : }
243 :
244 : private:
245 : grpc_byte_buffer* payload_;
246 : Message* const request_;
247 : };
248 :
249 62 : class GenericAsyncRequest : public BaseAsyncRequest {
250 : public:
251 : GenericAsyncRequest(Server* server, GenericServerContext* context,
252 : ServerAsyncStreamingInterface* stream,
253 : CompletionQueue* call_cq,
254 : ServerCompletionQueue* notification_cq, void* tag,
255 : bool delete_on_finalize);
256 :
257 : bool FinalizeResult(void** tag, bool* status) GRPC_OVERRIDE;
258 :
259 : private:
260 : grpc_call_details call_details_;
261 : };
262 :
263 : class UnimplementedAsyncRequestContext;
264 : class UnimplementedAsyncRequest;
265 : class UnimplementedAsyncResponse;
266 :
267 : template <class Message>
268 1467034 : void RequestAsyncCall(void* registered_method, ServerContext* context,
269 : ServerAsyncStreamingInterface* stream,
270 : CompletionQueue* call_cq,
271 : ServerCompletionQueue* notification_cq, void* tag,
272 : Message* message) {
273 1467034 : new PayloadAsyncRequest<Message>(registered_method, this, context, stream,
274 : call_cq, notification_cq, tag, message);
275 1467275 : }
276 :
277 41597 : void RequestAsyncCall(void* registered_method, ServerContext* context,
278 : ServerAsyncStreamingInterface* stream,
279 : CompletionQueue* call_cq,
280 : ServerCompletionQueue* notification_cq, void* tag) {
281 : new NoPayloadAsyncRequest(registered_method, this, context, stream, call_cq,
282 41597 : notification_cq, tag);
283 41602 : }
284 :
285 12 : void RequestAsyncGenericCall(GenericServerContext* context,
286 : ServerAsyncStreamingInterface* stream,
287 : CompletionQueue* call_cq,
288 : ServerCompletionQueue* notification_cq,
289 : void* tag) {
290 : new GenericAsyncRequest(this, context, stream, call_cq, notification_cq,
291 12 : tag, true);
292 12 : }
293 :
294 : const int max_message_size_;
295 :
296 : // Completion queue.
297 : CompletionQueue cq_;
298 :
299 : // Sever status
300 : grpc::mutex mu_;
301 : bool started_;
302 : bool shutdown_;
303 : // The number of threads which are running callbacks.
304 : int num_running_cb_;
305 : grpc::condition_variable callback_cv_;
306 :
307 : std::list<SyncRequest>* sync_methods_;
308 : std::unique_ptr<RpcServiceMethod> unknown_method_;
309 : bool has_generic_service_;
310 :
311 : // Pointer to the c grpc server.
312 : grpc_server* const server_;
313 :
314 : ThreadPoolInterface* thread_pool_;
315 : // Whether the thread pool is created and owned by the server.
316 : bool thread_pool_owned_;
317 : };
318 :
319 : } // namespace grpc
320 :
321 : #endif // GRPCXX_SERVER_H
|