Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <grpc/support/port_platform.h>
35 :
36 : #ifdef GPR_POSIX_SOCKET
37 :
38 : #include "src/core/iomgr/tcp_posix.h"
39 :
40 : #include <errno.h>
41 : #include <stdlib.h>
42 : #include <string.h>
43 : #include <sys/types.h>
44 : #include <sys/socket.h>
45 : #include <unistd.h>
46 :
47 : #include <grpc/support/alloc.h>
48 : #include <grpc/support/log.h>
49 : #include <grpc/support/slice.h>
50 : #include <grpc/support/string_util.h>
51 : #include <grpc/support/sync.h>
52 : #include <grpc/support/time.h>
53 :
54 : #include "src/core/support/string.h"
55 : #include "src/core/debug/trace.h"
56 : #include "src/core/profiling/timers.h"
57 :
58 : #ifdef GPR_HAVE_MSG_NOSIGNAL
59 : #define SENDMSG_FLAGS MSG_NOSIGNAL
60 : #else
61 : #define SENDMSG_FLAGS 0
62 : #endif
63 :
64 : #ifdef GPR_MSG_IOVLEN_TYPE
65 : typedef GPR_MSG_IOVLEN_TYPE msg_iovlen_type;
66 : #else
67 : typedef size_t msg_iovlen_type;
68 : #endif
69 :
70 : int grpc_tcp_trace = 0;
71 :
72 : typedef struct {
73 : grpc_endpoint base;
74 : grpc_fd *em_fd;
75 : int fd;
76 : int finished_edge;
77 : msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
78 : size_t slice_size;
79 : gpr_refcount refcount;
80 :
81 : /* garbage after the last read */
82 : gpr_slice_buffer last_read_buffer;
83 :
84 : gpr_slice_buffer *incoming_buffer;
85 : gpr_slice_buffer *outgoing_buffer;
86 : /** slice within outgoing_buffer to write next */
87 : size_t outgoing_slice_idx;
88 : /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
89 : size_t outgoing_byte_idx;
90 :
91 : grpc_closure *read_cb;
92 : grpc_closure *write_cb;
93 :
94 : grpc_closure read_closure;
95 : grpc_closure write_closure;
96 :
97 : char *peer_string;
98 : } grpc_tcp;
99 :
100 : static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
101 : int success);
102 : static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
103 : int success);
104 :
105 4097 : static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
106 4097 : grpc_tcp *tcp = (grpc_tcp *)ep;
107 4097 : grpc_fd_shutdown(exec_ctx, tcp->em_fd);
108 4097 : }
109 :
110 7434 : static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
111 7434 : grpc_fd_orphan(exec_ctx, tcp->em_fd, NULL, "tcp_unref_orphan");
112 7432 : gpr_slice_buffer_destroy(&tcp->last_read_buffer);
113 7432 : gpr_free(tcp->peer_string);
114 7432 : gpr_free(tcp);
115 7432 : }
116 :
117 : /*#define GRPC_TCP_REFCOUNT_DEBUG*/
118 : #ifdef GRPC_TCP_REFCOUNT_DEBUG
119 : #define TCP_UNREF(cl, tcp, reason) \
120 : tcp_unref((cl), (tcp), (reason), __FILE__, __LINE__)
121 : #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
122 : static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
123 : const char *reason, const char *file, int line) {
124 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
125 : reason, tcp->refcount.count, tcp->refcount.count - 1);
126 : if (gpr_unref(&tcp->refcount)) {
127 : tcp_free(exec_ctx, tcp);
128 : }
129 : }
130 :
131 : static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
132 : int line) {
133 : gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP ref %p : %s %d -> %d", tcp,
134 : reason, tcp->refcount.count, tcp->refcount.count + 1);
135 : gpr_ref(&tcp->refcount);
136 : }
137 : #else
138 : #define TCP_UNREF(cl, tcp, reason) tcp_unref((cl), (tcp))
139 : #define TCP_REF(tcp, reason) tcp_ref((tcp))
140 3419458 : static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
141 3419458 : if (gpr_unref(&tcp->refcount)) {
142 7434 : tcp_free(exec_ctx, tcp);
143 : }
144 3419456 : }
145 :
146 3412013 : static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
147 : #endif
148 :
149 7434 : static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
150 7434 : grpc_tcp *tcp = (grpc_tcp *)ep;
151 7434 : TCP_UNREF(exec_ctx, tcp, "destroy");
152 7434 : }
153 :
154 3410390 : static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) {
155 3410390 : grpc_closure *cb = tcp->read_cb;
156 :
157 3410390 : if (grpc_tcp_trace) {
158 : size_t i;
159 1272 : gpr_log(GPR_DEBUG, "read: success=%d", success);
160 2260 : for (i = 0; i < tcp->incoming_buffer->count; i++) {
161 988 : char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
162 : GPR_DUMP_HEX | GPR_DUMP_ASCII);
163 988 : gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
164 988 : gpr_free(dump);
165 : }
166 : }
167 :
168 3410390 : tcp->read_cb = NULL;
169 3410390 : tcp->incoming_buffer = NULL;
170 3410390 : cb->cb(exec_ctx, cb->cb_arg, success);
171 3410468 : }
172 :
173 : #define MAX_READ_IOVEC 4
174 4622284 : static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
175 : struct msghdr msg;
176 : struct iovec iov[MAX_READ_IOVEC];
177 : ssize_t read_bytes;
178 : size_t i;
179 :
180 4622284 : GPR_ASSERT(!tcp->finished_edge);
181 4622284 : GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
182 4622284 : GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
183 : GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
184 :
185 17188739 : while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
186 7943662 : gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
187 : gpr_slice_malloc(tcp->slice_size));
188 : }
189 17573604 : for (i = 0; i < tcp->incoming_buffer->count; i++) {
190 12950811 : iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
191 12950811 : iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
192 : }
193 :
194 4622793 : msg.msg_name = NULL;
195 4622793 : msg.msg_namelen = 0;
196 4622793 : msg.msg_iov = iov;
197 4622793 : msg.msg_iovlen = tcp->iov_size;
198 4622793 : msg.msg_control = NULL;
199 4622793 : msg.msg_controllen = 0;
200 4622793 : msg.msg_flags = 0;
201 :
202 : GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0);
203 : do {
204 4622774 : read_bytes = recvmsg(tcp->fd, &msg, 0);
205 4622969 : } while (read_bytes < 0 && errno == EINTR);
206 : GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
207 :
208 4623073 : if (read_bytes < 0) {
209 : /* NB: After calling call_read_cb a parallel call of the read handler may
210 : * be running. */
211 1216261 : if (errno == EAGAIN) {
212 1216202 : if (tcp->iov_size > 1) {
213 11316 : tcp->iov_size /= 2;
214 : }
215 : /* We've consumed the edge, request a new one */
216 1216202 : grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
217 : } else {
218 : /* TODO(klempner): Log interesting errors */
219 27 : gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
220 27 : call_read_cb(exec_ctx, tcp, 0);
221 27 : TCP_UNREF(exec_ctx, tcp, "read");
222 : }
223 3406812 : } else if (read_bytes == 0) {
224 : /* 0 read size ==> end of stream */
225 3359 : gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
226 3359 : call_read_cb(exec_ctx, tcp, 0);
227 3359 : TCP_UNREF(exec_ctx, tcp, "read");
228 : } else {
229 3403453 : GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
230 3403453 : if ((size_t)read_bytes < tcp->incoming_buffer->length) {
231 2831788 : gpr_slice_buffer_trim_end(
232 : tcp->incoming_buffer,
233 1415894 : tcp->incoming_buffer->length - (size_t)read_bytes,
234 : &tcp->last_read_buffer);
235 1987559 : } else if (tcp->iov_size < MAX_READ_IOVEC) {
236 14935 : ++tcp->iov_size;
237 : }
238 3403483 : GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
239 3403483 : call_read_cb(exec_ctx, tcp, 1);
240 3403558 : TCP_UNREF(exec_ctx, tcp, "read");
241 : }
242 :
243 : GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
244 4623203 : }
245 :
246 4625020 : static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
247 : int success) {
248 4625020 : grpc_tcp *tcp = (grpc_tcp *)arg;
249 4625020 : GPR_ASSERT(!tcp->finished_edge);
250 :
251 4625020 : if (!success) {
252 3522 : gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
253 3522 : call_read_cb(exec_ctx, tcp, 0);
254 3522 : TCP_UNREF(exec_ctx, tcp, "read");
255 : } else {
256 4621498 : tcp_continue_read(exec_ctx, tcp);
257 : }
258 4626325 : }
259 :
260 3410462 : static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
261 : gpr_slice_buffer *incoming_buffer, grpc_closure *cb) {
262 3410462 : grpc_tcp *tcp = (grpc_tcp *)ep;
263 3410462 : GPR_ASSERT(tcp->read_cb == NULL);
264 3410462 : tcp->read_cb = cb;
265 3410462 : tcp->incoming_buffer = incoming_buffer;
266 3410462 : gpr_slice_buffer_reset_and_unref(incoming_buffer);
267 3410461 : gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
268 3410434 : TCP_REF(tcp, "read");
269 3410464 : if (tcp->finished_edge) {
270 7041 : tcp->finished_edge = 0;
271 7041 : grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
272 : } else {
273 3403423 : grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, 1);
274 : }
275 3410460 : }
276 :
277 : typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result;
278 :
279 : #define MAX_WRITE_IOVEC 16
280 4084757 : static flush_result tcp_flush(grpc_tcp *tcp) {
281 : struct msghdr msg;
282 : struct iovec iov[MAX_WRITE_IOVEC];
283 : msg_iovlen_type iov_size;
284 : ssize_t sent_length;
285 : size_t sending_length;
286 : size_t trailing;
287 : size_t unwind_slice_idx;
288 : size_t unwind_byte_idx;
289 :
290 : for (;;) {
291 4084757 : sending_length = 0;
292 4084757 : unwind_slice_idx = tcp->outgoing_slice_idx;
293 4084757 : unwind_byte_idx = tcp->outgoing_byte_idx;
294 21747830 : for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
295 : iov_size != MAX_WRITE_IOVEC;
296 13578316 : iov_size++) {
297 13578316 : iov[iov_size].iov_base =
298 27156632 : GPR_SLICE_START_PTR(
299 27156632 : tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
300 : tcp->outgoing_byte_idx;
301 13578316 : iov[iov_size].iov_len =
302 13578316 : GPR_SLICE_LENGTH(
303 13578316 : tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
304 13578316 : tcp->outgoing_byte_idx;
305 13578316 : sending_length += iov[iov_size].iov_len;
306 13578316 : tcp->outgoing_slice_idx++;
307 13578316 : tcp->outgoing_byte_idx = 0;
308 : }
309 4084757 : GPR_ASSERT(iov_size > 0);
310 :
311 4084757 : msg.msg_name = NULL;
312 4084757 : msg.msg_namelen = 0;
313 4084757 : msg.msg_iov = iov;
314 4084757 : msg.msg_iovlen = iov_size;
315 4084757 : msg.msg_control = NULL;
316 4084757 : msg.msg_controllen = 0;
317 4084757 : msg.msg_flags = 0;
318 :
319 : GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 0);
320 : do {
321 : /* TODO(klempner): Cork if this is a partial write */
322 4085091 : sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
323 4084069 : } while (sent_length < 0 && errno == EINTR);
324 : GRPC_TIMER_END(GRPC_PTAG_SENDMSG, 0);
325 :
326 4083735 : if (sent_length < 0) {
327 1662 : if (errno == EAGAIN) {
328 2806 : tcp->outgoing_slice_idx = unwind_slice_idx;
329 2806 : tcp->outgoing_byte_idx = unwind_byte_idx;
330 2806 : return FLUSH_PENDING;
331 : } else {
332 : /* TODO(klempner): Log some of these */
333 25 : return FLUSH_ERROR;
334 : }
335 : }
336 :
337 4082073 : GPR_ASSERT(tcp->outgoing_byte_idx == 0);
338 4082073 : trailing = sending_length - (size_t)sent_length;
339 8164928 : while (trailing > 0) {
340 : size_t slice_length;
341 :
342 958 : tcp->outgoing_slice_idx--;
343 958 : slice_length = GPR_SLICE_LENGTH(
344 : tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
345 958 : if (slice_length > trailing) {
346 176 : tcp->outgoing_byte_idx = slice_length - trailing;
347 176 : break;
348 : } else {
349 782 : trailing -= slice_length;
350 : }
351 : }
352 :
353 4082073 : if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
354 3841473 : return FLUSH_DONE;
355 : }
356 240600 : };
357 : }
358 :
359 1637 : static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
360 : int success) {
361 1637 : grpc_tcp *tcp = (grpc_tcp *)arg;
362 : flush_result status;
363 : grpc_closure *cb;
364 :
365 1637 : if (!success) {
366 3 : cb = tcp->write_cb;
367 3 : tcp->write_cb = NULL;
368 3 : cb->cb(exec_ctx, cb->cb_arg, 0);
369 3 : TCP_UNREF(exec_ctx, tcp, "write");
370 1640 : return;
371 : }
372 :
373 : GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0);
374 1634 : status = tcp_flush(tcp);
375 1634 : if (status == FLUSH_PENDING) {
376 81 : grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
377 : } else {
378 1553 : cb = tcp->write_cb;
379 1553 : tcp->write_cb = NULL;
380 1553 : cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE);
381 1553 : TCP_UNREF(exec_ctx, tcp, "write");
382 : }
383 : GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0);
384 : }
385 :
386 3841775 : static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
387 : gpr_slice_buffer *buf, grpc_closure *cb) {
388 3841775 : grpc_tcp *tcp = (grpc_tcp *)ep;
389 : flush_result status;
390 :
391 3841775 : if (grpc_tcp_trace) {
392 : size_t i;
393 :
394 7447 : for (i = 0; i < buf->count; i++) {
395 5870 : char *data =
396 5870 : gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
397 5870 : gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
398 5870 : gpr_free(data);
399 : }
400 : }
401 :
402 : GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0);
403 3841775 : GPR_ASSERT(tcp->write_cb == NULL);
404 :
405 3841775 : if (buf->length == 0) {
406 : GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
407 451 : grpc_exec_ctx_enqueue(exec_ctx, cb, 1);
408 3843684 : return;
409 : }
410 3841324 : tcp->outgoing_buffer = buf;
411 3841324 : tcp->outgoing_slice_idx = 0;
412 3841324 : tcp->outgoing_byte_idx = 0;
413 :
414 3841324 : status = tcp_flush(tcp);
415 3839057 : if (status == FLUSH_PENDING) {
416 1556 : TCP_REF(tcp, "write");
417 1556 : tcp->write_cb = cb;
418 1556 : grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
419 : } else {
420 3837501 : grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE);
421 : }
422 :
423 : GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
424 : }
425 :
426 2705177 : static void tcp_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
427 : grpc_pollset *pollset) {
428 2705177 : grpc_tcp *tcp = (grpc_tcp *)ep;
429 2705177 : grpc_pollset_add_fd(exec_ctx, pollset, tcp->em_fd);
430 2706490 : }
431 :
432 1547 : static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
433 : grpc_pollset_set *pollset_set) {
434 1547 : grpc_tcp *tcp = (grpc_tcp *)ep;
435 1547 : grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd);
436 1548 : }
437 :
438 4014 : static char *tcp_get_peer(grpc_endpoint *ep) {
439 4014 : grpc_tcp *tcp = (grpc_tcp *)ep;
440 4014 : return gpr_strdup(tcp->peer_string);
441 : }
442 :
443 : static const grpc_endpoint_vtable vtable = {
444 : tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
445 : tcp_shutdown, tcp_destroy, tcp_get_peer};
446 :
447 7432 : grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
448 : const char *peer_string) {
449 7432 : grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
450 7433 : tcp->base.vtable = &vtable;
451 7433 : tcp->peer_string = gpr_strdup(peer_string);
452 7434 : tcp->fd = em_fd->fd;
453 7434 : tcp->read_cb = NULL;
454 7434 : tcp->write_cb = NULL;
455 7434 : tcp->incoming_buffer = NULL;
456 7434 : tcp->slice_size = slice_size;
457 7434 : tcp->iov_size = 1;
458 7434 : tcp->finished_edge = 1;
459 : /* paired with unref in grpc_tcp_destroy */
460 7434 : gpr_ref_init(&tcp->refcount, 1);
461 7434 : tcp->em_fd = em_fd;
462 7434 : tcp->read_closure.cb = tcp_handle_read;
463 7434 : tcp->read_closure.cb_arg = tcp;
464 7434 : tcp->write_closure.cb = tcp_handle_write;
465 7434 : tcp->write_closure.cb_arg = tcp;
466 7434 : gpr_slice_buffer_init(&tcp->last_read_buffer);
467 :
468 7434 : return &tcp->base;
469 : }
470 :
471 : #endif
|