Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/security/secure_endpoint.h"
35 : #include "src/core/support/string.h"
36 : #include <grpc/support/alloc.h>
37 : #include <grpc/support/log.h>
38 : #include <grpc/support/slice_buffer.h>
39 : #include <grpc/support/slice.h>
40 : #include <grpc/support/sync.h>
41 : #include "src/core/tsi/transport_security_interface.h"
42 : #include "src/core/debug/trace.h"
43 :
44 : #define STAGING_BUFFER_SIZE 8192
45 :
46 : typedef struct {
47 : grpc_endpoint base;
48 : grpc_endpoint *wrapped_ep;
49 : struct tsi_frame_protector *protector;
50 : gpr_mu protector_mu;
51 : /* saved upper level callbacks and user_data. */
52 : grpc_closure *read_cb;
53 : grpc_closure *write_cb;
54 : grpc_closure on_read;
55 : gpr_slice_buffer *read_buffer;
56 : gpr_slice_buffer source_buffer;
57 : /* saved handshaker leftover data to unprotect. */
58 : gpr_slice_buffer leftover_bytes;
59 : /* buffers for read and write */
60 : gpr_slice read_staging_buffer;
61 :
62 : gpr_slice write_staging_buffer;
63 : gpr_slice_buffer output_buffer;
64 :
65 : gpr_refcount ref;
66 : } secure_endpoint;
67 :
68 : int grpc_trace_secure_endpoint = 0;
69 :
70 1245 : static void destroy(grpc_exec_ctx *exec_ctx, secure_endpoint *secure_ep) {
71 1211 : secure_endpoint *ep = secure_ep;
72 1245 : grpc_endpoint_destroy(exec_ctx, ep->wrapped_ep);
73 1245 : tsi_frame_protector_destroy(ep->protector);
74 1245 : gpr_slice_buffer_destroy(&ep->leftover_bytes);
75 1245 : gpr_slice_unref(ep->read_staging_buffer);
76 1245 : gpr_slice_unref(ep->write_staging_buffer);
77 1245 : gpr_slice_buffer_destroy(&ep->output_buffer);
78 1245 : gpr_slice_buffer_destroy(&ep->source_buffer);
79 1245 : gpr_mu_destroy(&ep->protector_mu);
80 1245 : gpr_free(ep);
81 1245 : }
82 :
83 : /*#define GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG*/
84 : #ifdef GRPC_SECURE_ENDPOINT_REFCOUNT_DEBUG
85 : #define SECURE_ENDPOINT_UNREF(exec_ctx, ep, reason) \
86 : secure_endpoint_unref((exec_ctx), (ep), (reason), __FILE__, __LINE__)
87 : #define SECURE_ENDPOINT_REF(ep, reason) \
88 : secure_endpoint_ref((ep), (reason), __FILE__, __LINE__)
89 : static void secure_endpoint_unref(secure_endpoint *ep,
90 : grpc_closure_list *closure_list,
91 : const char *reason, const char *file,
92 : int line) {
93 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP unref %p : %s %d -> %d",
94 : ep, reason, ep->ref.count, ep->ref.count - 1);
95 : if (gpr_unref(&ep->ref)) {
96 : destroy(exec_ctx, ep);
97 : }
98 : }
99 :
100 : static void secure_endpoint_ref(secure_endpoint *ep, const char *reason,
101 : const char *file, int line) {
102 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "SECENDP ref %p : %s %d -> %d",
103 : ep, reason, ep->ref.count, ep->ref.count + 1);
104 : gpr_ref(&ep->ref);
105 : }
106 : #else
107 : #define SECURE_ENDPOINT_UNREF(exec_ctx, ep, reason) \
108 : secure_endpoint_unref((exec_ctx), (ep))
109 : #define SECURE_ENDPOINT_REF(ep, reason) secure_endpoint_ref((ep))
110 660478 : static void secure_endpoint_unref(grpc_exec_ctx *exec_ctx,
111 : secure_endpoint *ep) {
112 660478 : if (gpr_unref(&ep->ref)) {
113 1245 : destroy(exec_ctx, ep);
114 : }
115 660479 : }
116 :
117 659234 : static void secure_endpoint_ref(secure_endpoint *ep) { gpr_ref(&ep->ref); }
118 : #endif
119 :
120 6986 : static void flush_read_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
121 : gpr_uint8 **end) {
122 6986 : gpr_slice_buffer_add(ep->read_buffer, ep->read_staging_buffer);
123 6986 : ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
124 6986 : *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
125 6986 : *end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
126 6986 : }
127 :
128 659234 : static void call_read_cb(grpc_exec_ctx *exec_ctx, secure_endpoint *ep,
129 : int success) {
130 659234 : if (grpc_trace_secure_endpoint) {
131 : size_t i;
132 0 : for (i = 0; i < ep->read_buffer->count; i++) {
133 0 : char *data = gpr_dump_slice(ep->read_buffer->slices[i],
134 : GPR_DUMP_HEX | GPR_DUMP_ASCII);
135 0 : gpr_log(GPR_DEBUG, "READ %p: %s", ep, data);
136 0 : gpr_free(data);
137 : }
138 : }
139 659234 : ep->read_buffer = NULL;
140 659234 : grpc_exec_ctx_enqueue(exec_ctx, ep->read_cb, success);
141 659234 : SECURE_ENDPOINT_UNREF(exec_ctx, ep, "read");
142 659234 : }
143 :
144 659234 : static void on_read(grpc_exec_ctx *exec_ctx, void *user_data, int success) {
145 : unsigned i;
146 658948 : gpr_uint8 keep_looping = 0;
147 658948 : tsi_result result = TSI_OK;
148 658948 : secure_endpoint *ep = (secure_endpoint *)user_data;
149 659234 : gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->read_staging_buffer);
150 659234 : gpr_uint8 *end = GPR_SLICE_END_PTR(ep->read_staging_buffer);
151 :
152 659234 : if (!success) {
153 1175 : gpr_slice_buffer_reset_and_unref(ep->read_buffer);
154 1175 : call_read_cb(exec_ctx, ep, 0);
155 1175 : return;
156 : }
157 :
158 : /* TODO(yangg) check error, maybe bail out early */
159 2354681 : for (i = 0; i < ep->source_buffer.count; i++) {
160 1696874 : gpr_slice encrypted = ep->source_buffer.slices[i];
161 1696874 : gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(encrypted);
162 1696874 : size_t message_size = GPR_SLICE_LENGTH(encrypted);
163 :
164 5538549 : while (message_size > 0 || keep_looping) {
165 2144801 : size_t unprotected_buffer_size_written = (size_t)(end - cur);
166 2144801 : size_t processed_message_size = message_size;
167 2144801 : gpr_mu_lock(&ep->protector_mu);
168 2144800 : result = tsi_frame_protector_unprotect(ep->protector, message_bytes,
169 : &processed_message_size, cur,
170 : &unprotected_buffer_size_written);
171 2144798 : gpr_mu_unlock(&ep->protector_mu);
172 2144801 : if (result != TSI_OK) {
173 0 : gpr_log(GPR_ERROR, "Decryption error: %s",
174 : tsi_result_to_string(result));
175 0 : break;
176 : }
177 2144801 : message_bytes += processed_message_size;
178 2144801 : message_size -= processed_message_size;
179 2144801 : cur += unprotected_buffer_size_written;
180 :
181 2144801 : if (cur == end) {
182 6986 : flush_read_staging_buffer(ep, &cur, &end);
183 : /* Force to enter the loop again to extract buffered bytes in protector.
184 : The bytes could be buffered because of running out of staging_buffer.
185 : If this happens at the end of all slices, doing another unprotect
186 : avoids leaving data in the protector. */
187 6765 : keep_looping = 1;
188 2137815 : } else if (unprotected_buffer_size_written > 0) {
189 440653 : keep_looping = 1;
190 : } else {
191 1696489 : keep_looping = 0;
192 : }
193 : }
194 1696874 : if (result != TSI_OK) break;
195 : }
196 :
197 658059 : if (cur != GPR_SLICE_START_PTR(ep->read_staging_buffer)) {
198 393535 : gpr_slice_buffer_add(
199 : ep->read_buffer,
200 : gpr_slice_split_head(
201 : &ep->read_staging_buffer,
202 393535 : (size_t)(cur - GPR_SLICE_START_PTR(ep->read_staging_buffer))));
203 : }
204 :
205 : /* TODO(yangg) experiment with moving this block after read_cb to see if it
206 : helps latency */
207 658059 : gpr_slice_buffer_reset_and_unref(&ep->source_buffer);
208 :
209 658059 : if (result != TSI_OK) {
210 0 : gpr_slice_buffer_reset_and_unref(ep->read_buffer);
211 0 : call_read_cb(exec_ctx, ep, 0);
212 0 : return;
213 : }
214 :
215 658059 : call_read_cb(exec_ctx, ep, 1);
216 : }
217 :
218 659234 : static void endpoint_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
219 : gpr_slice_buffer *slices, grpc_closure *cb) {
220 658948 : secure_endpoint *ep = (secure_endpoint *)secure_ep;
221 659234 : ep->read_cb = cb;
222 659234 : ep->read_buffer = slices;
223 659234 : gpr_slice_buffer_reset_and_unref(ep->read_buffer);
224 :
225 658948 : SECURE_ENDPOINT_REF(ep, "read");
226 659234 : if (ep->leftover_bytes.count) {
227 108 : gpr_slice_buffer_swap(&ep->leftover_bytes, &ep->source_buffer);
228 108 : GPR_ASSERT(ep->leftover_bytes.count == 0);
229 108 : on_read(exec_ctx, ep, 1);
230 659342 : return;
231 : }
232 :
233 659126 : grpc_endpoint_read(exec_ctx, ep->wrapped_ep, &ep->source_buffer,
234 : &ep->on_read);
235 : }
236 :
237 7957 : static void flush_write_staging_buffer(secure_endpoint *ep, gpr_uint8 **cur,
238 : gpr_uint8 **end) {
239 7957 : gpr_slice_buffer_add(&ep->output_buffer, ep->write_staging_buffer);
240 7957 : ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
241 7957 : *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer);
242 7957 : *end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
243 7957 : }
244 :
245 440735 : static void endpoint_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *secure_ep,
246 : gpr_slice_buffer *slices, grpc_closure *cb) {
247 : unsigned i;
248 440519 : tsi_result result = TSI_OK;
249 440519 : secure_endpoint *ep = (secure_endpoint *)secure_ep;
250 440735 : gpr_uint8 *cur = GPR_SLICE_START_PTR(ep->write_staging_buffer);
251 440735 : gpr_uint8 *end = GPR_SLICE_END_PTR(ep->write_staging_buffer);
252 :
253 440735 : gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
254 :
255 440735 : if (grpc_trace_secure_endpoint) {
256 0 : for (i = 0; i < slices->count; i++) {
257 0 : char *data =
258 0 : gpr_dump_slice(slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
259 0 : gpr_log(GPR_DEBUG, "WRITE %p: %s", ep, data);
260 0 : gpr_free(data);
261 : }
262 : }
263 :
264 3801437 : for (i = 0; i < slices->count; i++) {
265 3360920 : gpr_slice plain = slices->slices[i];
266 3360920 : gpr_uint8 *message_bytes = GPR_SLICE_START_PTR(plain);
267 3360920 : size_t message_size = GPR_SLICE_LENGTH(plain);
268 10087596 : while (message_size > 0) {
269 3365758 : size_t protected_buffer_size_to_send = (size_t)(end - cur);
270 3365758 : size_t processed_message_size = message_size;
271 3365758 : gpr_mu_lock(&ep->protector_mu);
272 3365758 : result = tsi_frame_protector_protect(ep->protector, message_bytes,
273 : &processed_message_size, cur,
274 : &protected_buffer_size_to_send);
275 3365752 : gpr_mu_unlock(&ep->protector_mu);
276 3365756 : if (result != TSI_OK) {
277 0 : gpr_log(GPR_ERROR, "Encryption error: %s",
278 : tsi_result_to_string(result));
279 0 : break;
280 : }
281 3365756 : message_bytes += processed_message_size;
282 3365756 : message_size -= processed_message_size;
283 3365756 : cur += protected_buffer_size_to_send;
284 :
285 3365756 : if (cur == end) {
286 3481 : flush_write_staging_buffer(ep, &cur, &end);
287 : }
288 : }
289 3360918 : if (result != TSI_OK) break;
290 : }
291 440733 : if (result == TSI_OK) {
292 : size_t still_pending_size;
293 : do {
294 445154 : size_t protected_buffer_size_to_send = (size_t)(end - cur);
295 445154 : gpr_mu_lock(&ep->protector_mu);
296 445155 : result = tsi_frame_protector_protect_flush(ep->protector, cur,
297 : &protected_buffer_size_to_send,
298 : &still_pending_size);
299 445156 : gpr_mu_unlock(&ep->protector_mu);
300 445156 : if (result != TSI_OK) break;
301 445156 : cur += protected_buffer_size_to_send;
302 445156 : if (cur == end) {
303 4476 : flush_write_staging_buffer(ep, &cur, &end);
304 : }
305 445156 : } while (still_pending_size > 0);
306 440735 : if (cur != GPR_SLICE_START_PTR(ep->write_staging_buffer)) {
307 440680 : gpr_slice_buffer_add(
308 : &ep->output_buffer,
309 : gpr_slice_split_head(
310 : &ep->write_staging_buffer,
311 440680 : (size_t)(cur - GPR_SLICE_START_PTR(ep->write_staging_buffer))));
312 : }
313 : }
314 :
315 440735 : if (result != TSI_OK) {
316 : /* TODO(yangg) do different things according to the error type? */
317 0 : gpr_slice_buffer_reset_and_unref(&ep->output_buffer);
318 0 : grpc_exec_ctx_enqueue(exec_ctx, cb, 0);
319 440735 : return;
320 : }
321 :
322 440735 : grpc_endpoint_write(exec_ctx, ep->wrapped_ep, &ep->output_buffer, cb);
323 : }
324 :
325 1179 : static void endpoint_shutdown(grpc_exec_ctx *exec_ctx,
326 : grpc_endpoint *secure_ep) {
327 1145 : secure_endpoint *ep = (secure_endpoint *)secure_ep;
328 1179 : grpc_endpoint_shutdown(exec_ctx, ep->wrapped_ep);
329 1179 : }
330 :
331 1244 : static void endpoint_destroy(grpc_exec_ctx *exec_ctx,
332 : grpc_endpoint *secure_ep) {
333 1210 : secure_endpoint *ep = (secure_endpoint *)secure_ep;
334 1244 : SECURE_ENDPOINT_UNREF(exec_ctx, ep, "destroy");
335 1245 : }
336 :
337 305183 : static void endpoint_add_to_pollset(grpc_exec_ctx *exec_ctx,
338 : grpc_endpoint *secure_ep,
339 : grpc_pollset *pollset) {
340 305102 : secure_endpoint *ep = (secure_endpoint *)secure_ep;
341 305183 : grpc_endpoint_add_to_pollset(exec_ctx, ep->wrapped_ep, pollset);
342 305183 : }
343 :
344 585 : static void endpoint_add_to_pollset_set(grpc_exec_ctx *exec_ctx,
345 : grpc_endpoint *secure_ep,
346 : grpc_pollset_set *pollset_set) {
347 568 : secure_endpoint *ep = (secure_endpoint *)secure_ep;
348 585 : grpc_endpoint_add_to_pollset_set(exec_ctx, ep->wrapped_ep, pollset_set);
349 585 : }
350 :
351 1175 : static char *endpoint_get_peer(grpc_endpoint *secure_ep) {
352 1141 : secure_endpoint *ep = (secure_endpoint *)secure_ep;
353 1175 : return grpc_endpoint_get_peer(ep->wrapped_ep);
354 : }
355 :
356 : static const grpc_endpoint_vtable vtable = {
357 : endpoint_read, endpoint_write, endpoint_add_to_pollset,
358 : endpoint_add_to_pollset_set, endpoint_shutdown, endpoint_destroy,
359 : endpoint_get_peer};
360 :
361 1245 : grpc_endpoint *grpc_secure_endpoint_create(
362 : struct tsi_frame_protector *protector, grpc_endpoint *transport,
363 : gpr_slice *leftover_slices, size_t leftover_nslices) {
364 : size_t i;
365 1245 : secure_endpoint *ep = (secure_endpoint *)gpr_malloc(sizeof(secure_endpoint));
366 1245 : ep->base.vtable = &vtable;
367 1245 : ep->wrapped_ep = transport;
368 1245 : ep->protector = protector;
369 1245 : gpr_slice_buffer_init(&ep->leftover_bytes);
370 1353 : for (i = 0; i < leftover_nslices; i++) {
371 108 : gpr_slice_buffer_add(&ep->leftover_bytes,
372 108 : gpr_slice_ref(leftover_slices[i]));
373 : }
374 1245 : ep->write_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
375 1245 : ep->read_staging_buffer = gpr_slice_malloc(STAGING_BUFFER_SIZE);
376 1245 : gpr_slice_buffer_init(&ep->output_buffer);
377 1245 : gpr_slice_buffer_init(&ep->source_buffer);
378 1245 : ep->read_buffer = NULL;
379 1245 : grpc_closure_init(&ep->on_read, on_read, ep);
380 1245 : gpr_mu_init(&ep->protector_mu);
381 1245 : gpr_ref_init(&ep->ref, 1);
382 1245 : return &ep->base;
383 : }
|