Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #ifndef GRPCXX_IMPL_RPC_SERVICE_METHOD_H
35 : #define GRPCXX_IMPL_RPC_SERVICE_METHOD_H
36 :
37 : #include <climits>
38 : #include <functional>
39 : #include <map>
40 : #include <memory>
41 : #include <vector>
42 :
43 : #include <grpc++/impl/rpc_method.h>
44 : #include <grpc++/support/config.h>
45 : #include <grpc++/support/status.h>
46 : #include <grpc++/support/sync_stream.h>
47 :
48 : namespace grpc {
49 : class ServerContext;
50 : class StreamContextInterface;
51 :
52 : // TODO(rocking): we might need to split this file into multiple ones.
53 :
54 : // Base class for running an RPC handler.
55 1347 : class MethodHandler {
56 : public:
57 1347 : virtual ~MethodHandler() {}
58 : struct HandlerParameter {
59 209491 : HandlerParameter(Call* c, ServerContext* context, grpc_byte_buffer* req,
60 : int max_size)
61 : : call(c),
62 : server_context(context),
63 : request(req),
64 209491 : max_message_size(max_size) {}
65 : Call* call;
66 : ServerContext* server_context;
67 : // Handler required to grpc_byte_buffer_destroy this
68 : grpc_byte_buffer* request;
69 : int max_message_size;
70 : };
71 : virtual void RunHandler(const HandlerParameter& param) = 0;
72 : };
73 :
74 : // A wrapper class of an application provided rpc method handler.
75 : template <class ServiceType, class RequestType, class ResponseType>
76 1056 : class RpcMethodHandler : public MethodHandler {
77 : public:
78 528 : RpcMethodHandler(
79 : std::function<Status(ServiceType*, ServerContext*, const RequestType*,
80 : ResponseType*)> func,
81 : ServiceType* service)
82 528 : : func_(func), service_(service) {}
83 :
84 209432 : void RunHandler(const HandlerParameter& param) GRPC_FINAL {
85 209432 : RequestType req;
86 : Status status = SerializationTraits<RequestType>::Deserialize(
87 418909 : param.request, &req, param.max_message_size);
88 418908 : ResponseType rsp;
89 209455 : if (status.ok()) {
90 209454 : status = func_(service_, param.server_context, &req, &rsp);
91 : }
92 :
93 209454 : GPR_ASSERT(!param.server_context->sent_initial_metadata_);
94 : CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
95 418907 : CallOpServerSendStatus> ops;
96 209453 : ops.SendInitialMetadata(param.server_context->initial_metadata_);
97 209455 : if (status.ok()) {
98 209438 : status = ops.SendMessage(rsp);
99 : }
100 209455 : ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
101 209455 : param.call->PerformOps(&ops);
102 418910 : param.call->cq()->Pluck(&ops);
103 209455 : }
104 :
105 : private:
106 : // Application provided rpc handler function.
107 : std::function<Status(ServiceType*, ServerContext*, const RequestType*,
108 : ResponseType*)> func_;
109 : // The class the above handler function lives in.
110 : ServiceType* service_;
111 : };
112 :
113 : // A wrapper class of an application provided client streaming handler.
114 : template <class ServiceType, class RequestType, class ResponseType>
115 442 : class ClientStreamingHandler : public MethodHandler {
116 : public:
117 221 : ClientStreamingHandler(
118 : std::function<Status(ServiceType*, ServerContext*,
119 : ServerReader<RequestType>*, ResponseType*)> func,
120 : ServiceType* service)
121 221 : : func_(func), service_(service) {}
122 :
123 8 : void RunHandler(const HandlerParameter& param) GRPC_FINAL {
124 8 : ServerReader<RequestType> reader(param.call, param.server_context);
125 16 : ResponseType rsp;
126 16 : Status status = func_(service_, param.server_context, &reader, &rsp);
127 :
128 8 : GPR_ASSERT(!param.server_context->sent_initial_metadata_);
129 : CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
130 16 : CallOpServerSendStatus> ops;
131 8 : ops.SendInitialMetadata(param.server_context->initial_metadata_);
132 8 : if (status.ok()) {
133 6 : status = ops.SendMessage(rsp);
134 : }
135 8 : ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
136 8 : param.call->PerformOps(&ops);
137 16 : param.call->cq()->Pluck(&ops);
138 8 : }
139 :
140 : private:
141 : std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
142 : ResponseType*)> func_;
143 : ServiceType* service_;
144 : };
145 :
146 : // A wrapper class of an application provided server streaming handler.
147 : template <class ServiceType, class RequestType, class ResponseType>
148 442 : class ServerStreamingHandler : public MethodHandler {
149 : public:
150 221 : ServerStreamingHandler(
151 : std::function<Status(ServiceType*, ServerContext*, const RequestType*,
152 : ServerWriter<ResponseType>*)> func,
153 : ServiceType* service)
154 221 : : func_(func), service_(service) {}
155 :
156 5 : void RunHandler(const HandlerParameter& param) GRPC_FINAL {
157 5 : RequestType req;
158 : Status status = SerializationTraits<RequestType>::Deserialize(
159 10 : param.request, &req, param.max_message_size);
160 :
161 5 : if (status.ok()) {
162 5 : ServerWriter<ResponseType> writer(param.call, param.server_context);
163 5 : status = func_(service_, param.server_context, &req, &writer);
164 : }
165 :
166 10 : CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
167 5 : if (!param.server_context->sent_initial_metadata_) {
168 0 : ops.SendInitialMetadata(param.server_context->initial_metadata_);
169 : }
170 5 : ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
171 5 : param.call->PerformOps(&ops);
172 10 : param.call->cq()->Pluck(&ops);
173 5 : }
174 :
175 : private:
176 : std::function<Status(ServiceType*, ServerContext*, const RequestType*,
177 : ServerWriter<ResponseType>*)> func_;
178 : ServiceType* service_;
179 : };
180 :
181 : // A wrapper class of an application provided bidi-streaming handler.
182 : template <class ServiceType, class RequestType, class ResponseType>
183 498 : class BidiStreamingHandler : public MethodHandler {
184 : public:
185 249 : BidiStreamingHandler(
186 : std::function<Status(ServiceType*, ServerContext*,
187 : ServerReaderWriter<ResponseType, RequestType>*)>
188 : func,
189 : ServiceType* service)
190 249 : : func_(func), service_(service) {}
191 :
192 22 : void RunHandler(const HandlerParameter& param) GRPC_FINAL {
193 : ServerReaderWriter<ResponseType, RequestType> stream(param.call,
194 22 : param.server_context);
195 44 : Status status = func_(service_, param.server_context, &stream);
196 :
197 44 : CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
198 22 : if (!param.server_context->sent_initial_metadata_) {
199 2 : ops.SendInitialMetadata(param.server_context->initial_metadata_);
200 : }
201 22 : ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
202 22 : param.call->PerformOps(&ops);
203 44 : param.call->cq()->Pluck(&ops);
204 22 : }
205 :
206 : private:
207 : std::function<Status(ServiceType*, ServerContext*,
208 : ServerReaderWriter<ResponseType, RequestType>*)> func_;
209 : ServiceType* service_;
210 : };
211 :
212 : // Handle unknown method by returning UNIMPLEMENTED error.
213 384 : class UnknownMethodHandler : public MethodHandler {
214 : public:
215 : template <class T>
216 4 : static void FillOps(ServerContext* context, T* ops) {
217 4 : Status status(StatusCode::UNIMPLEMENTED, "");
218 4 : if (!context->sent_initial_metadata_) {
219 4 : ops->SendInitialMetadata(context->initial_metadata_);
220 4 : context->sent_initial_metadata_ = true;
221 : }
222 4 : ops->ServerSendStatus(context->trailing_metadata_, status);
223 4 : }
224 :
225 2 : void RunHandler(const HandlerParameter& param) GRPC_FINAL {
226 2 : CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
227 2 : FillOps(param.server_context, &ops);
228 2 : param.call->PerformOps(&ops);
229 2 : param.call->cq()->Pluck(&ops);
230 2 : }
231 : };
232 :
233 : // Server side rpc method class
234 1347 : class RpcServiceMethod : public RpcMethod {
235 : public:
236 : // Takes ownership of the handler
237 1347 : RpcServiceMethod(const char* name, RpcMethod::RpcType type,
238 : MethodHandler* handler)
239 1347 : : RpcMethod(name, type), handler_(handler) {}
240 :
241 209491 : MethodHandler* handler() { return handler_.get(); }
242 :
243 : private:
244 : std::unique_ptr<MethodHandler> handler_;
245 : };
246 :
247 : // This class contains all the method information for an rpc service. It is
248 : // used for registering a service on a grpc server.
249 638 : class RpcService {
250 : public:
251 : // Takes ownership.
252 1219 : void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); }
253 :
254 1089 : RpcServiceMethod* GetMethod(int i) { return methods_[i].get(); }
255 1382 : int GetMethodCount() const {
256 : // On win x64, int is only 32bit
257 1382 : GPR_ASSERT(methods_.size() <= INT_MAX);
258 1382 : return (int)methods_.size();
259 : }
260 :
261 : private:
262 : std::vector<std::unique_ptr<RpcServiceMethod>> methods_;
263 : };
264 :
265 : } // namespace grpc
266 :
267 : #endif // GRPCXX_IMPL_RPC_SERVICE_METHOD_H
|