Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #ifndef GRPCXX_IMPL_RPC_SERVICE_METHOD_H
35 : #define GRPCXX_IMPL_RPC_SERVICE_METHOD_H
36 :
37 : #include <functional>
38 : #include <map>
39 : #include <memory>
40 : #include <vector>
41 :
42 : #include <grpc++/impl/rpc_method.h>
43 : #include <grpc++/support/config.h>
44 : #include <grpc++/support/status.h>
45 : #include <grpc++/support/sync_stream.h>
46 :
47 : namespace grpc {
48 : class ServerContext;
49 : class StreamContextInterface;
50 :
51 : // TODO(rocking): we might need to split this file into multiple ones.
52 :
53 : // Base class for running an RPC handler.
54 1202 : class MethodHandler {
55 : public:
56 1202 : virtual ~MethodHandler() {}
57 : struct HandlerParameter {
58 159512 : HandlerParameter(Call* c, ServerContext* context, grpc_byte_buffer* req,
59 : int max_size)
60 : : call(c),
61 : server_context(context),
62 : request(req),
63 159512 : max_message_size(max_size) {}
64 : Call* call;
65 : ServerContext* server_context;
66 : // Handler required to grpc_byte_buffer_destroy this
67 : grpc_byte_buffer* request;
68 : int max_message_size;
69 : };
70 : virtual void RunHandler(const HandlerParameter& param) = 0;
71 : };
72 :
73 : // A wrapper class of an application provided rpc method handler.
74 : template <class ServiceType, class RequestType, class ResponseType>
75 940 : class RpcMethodHandler : public MethodHandler {
76 : public:
77 470 : RpcMethodHandler(
78 : std::function<Status(ServiceType*, ServerContext*, const RequestType*,
79 : ResponseType*)> func,
80 : ServiceType* service)
81 470 : : func_(func), service_(service) {}
82 :
83 159475 : void RunHandler(const HandlerParameter& param) GRPC_FINAL {
84 159475 : RequestType req;
85 : Status status = SerializationTraits<RequestType>::Deserialize(
86 318949 : param.request, &req, param.max_message_size);
87 318943 : ResponseType rsp;
88 159475 : if (status.ok()) {
89 159473 : status = func_(service_, param.server_context, &req, &rsp);
90 : }
91 :
92 159474 : GPR_ASSERT(!param.server_context->sent_initial_metadata_);
93 : CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
94 318949 : CallOpServerSendStatus> ops;
95 159474 : ops.SendInitialMetadata(param.server_context->initial_metadata_);
96 159475 : if (status.ok()) {
97 159457 : status = ops.SendMessage(rsp);
98 : }
99 159474 : ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
100 159475 : param.call->PerformOps(&ops);
101 318949 : param.call->cq()->Pluck(&ops);
102 159475 : }
103 :
104 : private:
105 : // Application provided rpc handler function.
106 : std::function<Status(ServiceType*, ServerContext*, const RequestType*,
107 : ResponseType*)> func_;
108 : // The class the above handler function lives in.
109 : ServiceType* service_;
110 : };
111 :
112 : // A wrapper class of an application provided client streaming handler.
113 : template <class ServiceType, class RequestType, class ResponseType>
114 386 : class ClientStreamingHandler : public MethodHandler {
115 : public:
116 193 : ClientStreamingHandler(
117 : std::function<Status(ServiceType*, ServerContext*,
118 : ServerReader<RequestType>*, ResponseType*)> func,
119 : ServiceType* service)
120 193 : : func_(func), service_(service) {}
121 :
122 8 : void RunHandler(const HandlerParameter& param) GRPC_FINAL {
123 8 : ServerReader<RequestType> reader(param.call, param.server_context);
124 16 : ResponseType rsp;
125 16 : Status status = func_(service_, param.server_context, &reader, &rsp);
126 :
127 8 : GPR_ASSERT(!param.server_context->sent_initial_metadata_);
128 : CallOpSet<CallOpSendInitialMetadata, CallOpSendMessage,
129 16 : CallOpServerSendStatus> ops;
130 8 : ops.SendInitialMetadata(param.server_context->initial_metadata_);
131 8 : if (status.ok()) {
132 6 : status = ops.SendMessage(rsp);
133 : }
134 8 : ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
135 8 : param.call->PerformOps(&ops);
136 16 : param.call->cq()->Pluck(&ops);
137 8 : }
138 :
139 : private:
140 : std::function<Status(ServiceType*, ServerContext*, ServerReader<RequestType>*,
141 : ResponseType*)> func_;
142 : ServiceType* service_;
143 : };
144 :
145 : // A wrapper class of an application provided server streaming handler.
146 : template <class ServiceType, class RequestType, class ResponseType>
147 386 : class ServerStreamingHandler : public MethodHandler {
148 : public:
149 193 : ServerStreamingHandler(
150 : std::function<Status(ServiceType*, ServerContext*, const RequestType*,
151 : ServerWriter<ResponseType>*)> func,
152 : ServiceType* service)
153 193 : : func_(func), service_(service) {}
154 :
155 5 : void RunHandler(const HandlerParameter& param) GRPC_FINAL {
156 5 : RequestType req;
157 : Status status = SerializationTraits<RequestType>::Deserialize(
158 10 : param.request, &req, param.max_message_size);
159 :
160 5 : if (status.ok()) {
161 5 : ServerWriter<ResponseType> writer(param.call, param.server_context);
162 5 : status = func_(service_, param.server_context, &req, &writer);
163 : }
164 :
165 10 : CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
166 5 : if (!param.server_context->sent_initial_metadata_) {
167 0 : ops.SendInitialMetadata(param.server_context->initial_metadata_);
168 : }
169 5 : ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
170 5 : param.call->PerformOps(&ops);
171 10 : param.call->cq()->Pluck(&ops);
172 5 : }
173 :
174 : private:
175 : std::function<Status(ServiceType*, ServerContext*, const RequestType*,
176 : ServerWriter<ResponseType>*)> func_;
177 : ServiceType* service_;
178 : };
179 :
180 : // A wrapper class of an application provided bidi-streaming handler.
181 : template <class ServiceType, class RequestType, class ResponseType>
182 440 : class BidiStreamingHandler : public MethodHandler {
183 : public:
184 220 : BidiStreamingHandler(
185 : std::function<Status(ServiceType*, ServerContext*,
186 : ServerReaderWriter<ResponseType, RequestType>*)>
187 : func,
188 : ServiceType* service)
189 220 : : func_(func), service_(service) {}
190 :
191 22 : void RunHandler(const HandlerParameter& param) GRPC_FINAL {
192 : ServerReaderWriter<ResponseType, RequestType> stream(param.call,
193 22 : param.server_context);
194 44 : Status status = func_(service_, param.server_context, &stream);
195 :
196 44 : CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
197 22 : if (!param.server_context->sent_initial_metadata_) {
198 2 : ops.SendInitialMetadata(param.server_context->initial_metadata_);
199 : }
200 22 : ops.ServerSendStatus(param.server_context->trailing_metadata_, status);
201 22 : param.call->PerformOps(&ops);
202 44 : param.call->cq()->Pluck(&ops);
203 22 : }
204 :
205 : private:
206 : std::function<Status(ServiceType*, ServerContext*,
207 : ServerReaderWriter<ResponseType, RequestType>*)> func_;
208 : ServiceType* service_;
209 : };
210 :
211 : // Handle unknown method by returning UNIMPLEMENTED error.
212 378 : class UnknownMethodHandler : public MethodHandler {
213 : public:
214 : template <class T>
215 4 : static void FillOps(ServerContext* context, T* ops) {
216 4 : Status status(StatusCode::UNIMPLEMENTED, "");
217 4 : if (!context->sent_initial_metadata_) {
218 4 : ops->SendInitialMetadata(context->initial_metadata_);
219 4 : context->sent_initial_metadata_ = true;
220 : }
221 4 : ops->ServerSendStatus(context->trailing_metadata_, status);
222 4 : }
223 :
224 2 : void RunHandler(const HandlerParameter& param) GRPC_FINAL {
225 2 : CallOpSet<CallOpSendInitialMetadata, CallOpServerSendStatus> ops;
226 2 : FillOps(param.server_context, &ops);
227 2 : param.call->PerformOps(&ops);
228 2 : param.call->cq()->Pluck(&ops);
229 2 : }
230 : };
231 :
232 : // Server side rpc method class
233 1202 : class RpcServiceMethod : public RpcMethod {
234 : public:
235 : // Takes ownership of the handler
236 1202 : RpcServiceMethod(const char* name, RpcMethod::RpcType type,
237 : MethodHandler* handler)
238 1202 : : RpcMethod(name, type), handler_(handler) {}
239 :
240 159401 : MethodHandler* handler() { return handler_.get(); }
241 :
242 : private:
243 : std::unique_ptr<MethodHandler> handler_;
244 : };
245 :
246 : // This class contains all the method information for an rpc service. It is
247 : // used for registering a service on a grpc server.
248 578 : class RpcService {
249 : public:
250 : // Takes ownership.
251 1076 : void AddMethod(RpcServiceMethod* method) { methods_.emplace_back(method); }
252 :
253 1076 : RpcServiceMethod* GetMethod(int i) { return methods_[i].get(); }
254 1365 : int GetMethodCount() const { return methods_.size(); }
255 :
256 : private:
257 : std::vector<std::unique_ptr<RpcServiceMethod>> methods_;
258 : };
259 :
260 : } // namespace grpc
261 :
262 : #endif // GRPCXX_IMPL_RPC_SERVICE_METHOD_H
|