LCOV - code coverage report
Current view: top level - src/core/iomgr - tcp_posix.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 203 203 100.0 %
Date: 2015-10-10 Functions: 16 16 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include <grpc/support/port_platform.h>
      35             : 
      36             : #ifdef GPR_POSIX_SOCKET
      37             : 
      38             : #include "src/core/iomgr/tcp_posix.h"
      39             : 
      40             : #include <errno.h>
      41             : #include <stdlib.h>
      42             : #include <string.h>
      43             : #include <sys/types.h>
      44             : #include <sys/socket.h>
      45             : #include <unistd.h>
      46             : 
      47             : #include <grpc/support/alloc.h>
      48             : #include <grpc/support/log.h>
      49             : #include <grpc/support/slice.h>
      50             : #include <grpc/support/string_util.h>
      51             : #include <grpc/support/sync.h>
      52             : #include <grpc/support/time.h>
      53             : 
      54             : #include "src/core/support/string.h"
      55             : #include "src/core/debug/trace.h"
      56             : #include "src/core/profiling/timers.h"
      57             : 
      58             : #ifdef GPR_HAVE_MSG_NOSIGNAL
      59             : #define SENDMSG_FLAGS MSG_NOSIGNAL
      60             : #else
      61             : #define SENDMSG_FLAGS 0
      62             : #endif
      63             : 
      64             : #ifdef GPR_MSG_IOVLEN_TYPE
      65             : typedef GPR_MSG_IOVLEN_TYPE msg_iovlen_type;
      66             : #else
      67             : typedef size_t msg_iovlen_type;
      68             : #endif
      69             : 
      70             : int grpc_tcp_trace = 0;
      71             : 
      72             : typedef struct {
      73             :   grpc_endpoint base;
      74             :   grpc_fd *em_fd;
      75             :   int fd;
      76             :   int finished_edge;
      77             :   msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
      78             :   size_t slice_size;
      79             :   gpr_refcount refcount;
      80             : 
      81             :   /* garbage after the last read */
      82             :   gpr_slice_buffer last_read_buffer;
      83             : 
      84             :   gpr_slice_buffer *incoming_buffer;
      85             :   gpr_slice_buffer *outgoing_buffer;
      86             :   /** slice within outgoing_buffer to write next */
      87             :   size_t outgoing_slice_idx;
      88             :   /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
      89             :   size_t outgoing_byte_idx;
      90             : 
      91             :   grpc_closure *read_cb;
      92             :   grpc_closure *write_cb;
      93             : 
      94             :   grpc_closure read_closure;
      95             :   grpc_closure write_closure;
      96             : 
      97             :   char *peer_string;
      98             : } grpc_tcp;
      99             : 
     100             : static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
     101             :                             int success);
     102             : static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
     103             :                              int success);
     104             : 
     105        4097 : static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
     106        4097 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     107        4097 :   grpc_fd_shutdown(exec_ctx, tcp->em_fd);
     108        4097 : }
     109             : 
     110        7434 : static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
     111        7434 :   grpc_fd_orphan(exec_ctx, tcp->em_fd, NULL, "tcp_unref_orphan");
     112        7432 :   gpr_slice_buffer_destroy(&tcp->last_read_buffer);
     113        7432 :   gpr_free(tcp->peer_string);
     114        7432 :   gpr_free(tcp);
     115        7432 : }
     116             : 
     117             : /*#define GRPC_TCP_REFCOUNT_DEBUG*/
     118             : #ifdef GRPC_TCP_REFCOUNT_DEBUG
     119             : #define TCP_UNREF(cl, tcp, reason) \
     120             :   tcp_unref((cl), (tcp), (reason), __FILE__, __LINE__)
     121             : #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
     122             : static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
     123             :                       const char *reason, const char *file, int line) {
     124             :   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
     125             :           reason, tcp->refcount.count, tcp->refcount.count - 1);
     126             :   if (gpr_unref(&tcp->refcount)) {
     127             :     tcp_free(exec_ctx, tcp);
     128             :   }
     129             : }
     130             : 
     131             : static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
     132             :                     int line) {
     133             :   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP   ref %p : %s %d -> %d", tcp,
     134             :           reason, tcp->refcount.count, tcp->refcount.count + 1);
     135             :   gpr_ref(&tcp->refcount);
     136             : }
     137             : #else
     138             : #define TCP_UNREF(cl, tcp, reason) tcp_unref((cl), (tcp))
     139             : #define TCP_REF(tcp, reason) tcp_ref((tcp))
     140     3419458 : static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
     141     3419458 :   if (gpr_unref(&tcp->refcount)) {
     142        7434 :     tcp_free(exec_ctx, tcp);
     143             :   }
     144     3419456 : }
     145             : 
     146     3412013 : static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
     147             : #endif
     148             : 
     149        7434 : static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
     150        7434 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     151        7434 :   TCP_UNREF(exec_ctx, tcp, "destroy");
     152        7434 : }
     153             : 
     154     3410390 : static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) {
     155     3410390 :   grpc_closure *cb = tcp->read_cb;
     156             : 
     157     3410390 :   if (grpc_tcp_trace) {
     158             :     size_t i;
     159        1272 :     gpr_log(GPR_DEBUG, "read: success=%d", success);
     160        2260 :     for (i = 0; i < tcp->incoming_buffer->count; i++) {
     161         988 :       char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
     162             :                                   GPR_DUMP_HEX | GPR_DUMP_ASCII);
     163         988 :       gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
     164         988 :       gpr_free(dump);
     165             :     }
     166             :   }
     167             : 
     168     3410390 :   tcp->read_cb = NULL;
     169     3410390 :   tcp->incoming_buffer = NULL;
     170     3410390 :   cb->cb(exec_ctx, cb->cb_arg, success);
     171     3410468 : }
     172             : 
     173             : #define MAX_READ_IOVEC 4
     174     4622284 : static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
     175             :   struct msghdr msg;
     176             :   struct iovec iov[MAX_READ_IOVEC];
     177             :   ssize_t read_bytes;
     178             :   size_t i;
     179             : 
     180     4622284 :   GPR_ASSERT(!tcp->finished_edge);
     181     4622284 :   GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
     182     4622284 :   GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
     183             :   GRPC_TIMER_BEGIN(GRPC_PTAG_HANDLE_READ, 0);
     184             : 
     185    17188739 :   while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
     186     7943662 :     gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
     187             :                                  gpr_slice_malloc(tcp->slice_size));
     188             :   }
     189    17573604 :   for (i = 0; i < tcp->incoming_buffer->count; i++) {
     190    12950811 :     iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
     191    12950811 :     iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
     192             :   }
     193             : 
     194     4622793 :   msg.msg_name = NULL;
     195     4622793 :   msg.msg_namelen = 0;
     196     4622793 :   msg.msg_iov = iov;
     197     4622793 :   msg.msg_iovlen = tcp->iov_size;
     198     4622793 :   msg.msg_control = NULL;
     199     4622793 :   msg.msg_controllen = 0;
     200     4622793 :   msg.msg_flags = 0;
     201             : 
     202             :   GRPC_TIMER_BEGIN(GRPC_PTAG_RECVMSG, 0);
     203             :   do {
     204     4622774 :     read_bytes = recvmsg(tcp->fd, &msg, 0);
     205     4622969 :   } while (read_bytes < 0 && errno == EINTR);
     206             :   GRPC_TIMER_END(GRPC_PTAG_RECVMSG, 0);
     207             : 
     208     4623073 :   if (read_bytes < 0) {
     209             :     /* NB: After calling call_read_cb a parallel call of the read handler may
     210             :      * be running. */
     211     1216261 :     if (errno == EAGAIN) {
     212     1216202 :       if (tcp->iov_size > 1) {
     213       11316 :         tcp->iov_size /= 2;
     214             :       }
     215             :       /* We've consumed the edge, request a new one */
     216     1216202 :       grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
     217             :     } else {
     218             :       /* TODO(klempner): Log interesting errors */
     219          27 :       gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
     220          27 :       call_read_cb(exec_ctx, tcp, 0);
     221          27 :       TCP_UNREF(exec_ctx, tcp, "read");
     222             :     }
     223     3406812 :   } else if (read_bytes == 0) {
     224             :     /* 0 read size ==> end of stream */
     225        3359 :     gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
     226        3359 :     call_read_cb(exec_ctx, tcp, 0);
     227        3359 :     TCP_UNREF(exec_ctx, tcp, "read");
     228             :   } else {
     229     3403453 :     GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
     230     3403453 :     if ((size_t)read_bytes < tcp->incoming_buffer->length) {
     231     2831788 :       gpr_slice_buffer_trim_end(
     232             :           tcp->incoming_buffer,
     233     1415894 :           tcp->incoming_buffer->length - (size_t)read_bytes,
     234             :           &tcp->last_read_buffer);
     235     1987559 :     } else if (tcp->iov_size < MAX_READ_IOVEC) {
     236       14935 :       ++tcp->iov_size;
     237             :     }
     238     3403483 :     GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
     239     3403483 :     call_read_cb(exec_ctx, tcp, 1);
     240     3403558 :     TCP_UNREF(exec_ctx, tcp, "read");
     241             :   }
     242             : 
     243             :   GRPC_TIMER_END(GRPC_PTAG_HANDLE_READ, 0);
     244     4623203 : }
     245             : 
     246     4625020 : static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
     247             :                             int success) {
     248     4625020 :   grpc_tcp *tcp = (grpc_tcp *)arg;
     249     4625020 :   GPR_ASSERT(!tcp->finished_edge);
     250             : 
     251     4625020 :   if (!success) {
     252        3522 :     gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
     253        3522 :     call_read_cb(exec_ctx, tcp, 0);
     254        3522 :     TCP_UNREF(exec_ctx, tcp, "read");
     255             :   } else {
     256     4621498 :     tcp_continue_read(exec_ctx, tcp);
     257             :   }
     258     4626325 : }
     259             : 
     260     3410462 : static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
     261             :                      gpr_slice_buffer *incoming_buffer, grpc_closure *cb) {
     262     3410462 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     263     3410462 :   GPR_ASSERT(tcp->read_cb == NULL);
     264     3410462 :   tcp->read_cb = cb;
     265     3410462 :   tcp->incoming_buffer = incoming_buffer;
     266     3410462 :   gpr_slice_buffer_reset_and_unref(incoming_buffer);
     267     3410461 :   gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
     268     3410434 :   TCP_REF(tcp, "read");
     269     3410464 :   if (tcp->finished_edge) {
     270        7041 :     tcp->finished_edge = 0;
     271        7041 :     grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
     272             :   } else {
     273     3403423 :     grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, 1);
     274             :   }
     275     3410460 : }
     276             : 
     277             : typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result;
     278             : 
     279             : #define MAX_WRITE_IOVEC 16
     280     4084757 : static flush_result tcp_flush(grpc_tcp *tcp) {
     281             :   struct msghdr msg;
     282             :   struct iovec iov[MAX_WRITE_IOVEC];
     283             :   msg_iovlen_type iov_size;
     284             :   ssize_t sent_length;
     285             :   size_t sending_length;
     286             :   size_t trailing;
     287             :   size_t unwind_slice_idx;
     288             :   size_t unwind_byte_idx;
     289             : 
     290             :   for (;;) {
     291     4084757 :     sending_length = 0;
     292     4084757 :     unwind_slice_idx = tcp->outgoing_slice_idx;
     293     4084757 :     unwind_byte_idx = tcp->outgoing_byte_idx;
     294    21747830 :     for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
     295             :                            iov_size != MAX_WRITE_IOVEC;
     296    13578316 :          iov_size++) {
     297    13578316 :       iov[iov_size].iov_base =
     298    27156632 :           GPR_SLICE_START_PTR(
     299    27156632 :               tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
     300             :           tcp->outgoing_byte_idx;
     301    13578316 :       iov[iov_size].iov_len =
     302    13578316 :           GPR_SLICE_LENGTH(
     303    13578316 :               tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
     304    13578316 :           tcp->outgoing_byte_idx;
     305    13578316 :       sending_length += iov[iov_size].iov_len;
     306    13578316 :       tcp->outgoing_slice_idx++;
     307    13578316 :       tcp->outgoing_byte_idx = 0;
     308             :     }
     309     4084757 :     GPR_ASSERT(iov_size > 0);
     310             : 
     311     4084757 :     msg.msg_name = NULL;
     312     4084757 :     msg.msg_namelen = 0;
     313     4084757 :     msg.msg_iov = iov;
     314     4084757 :     msg.msg_iovlen = iov_size;
     315     4084757 :     msg.msg_control = NULL;
     316     4084757 :     msg.msg_controllen = 0;
     317     4084757 :     msg.msg_flags = 0;
     318             : 
     319             :     GRPC_TIMER_BEGIN(GRPC_PTAG_SENDMSG, 0);
     320             :     do {
     321             :       /* TODO(klempner): Cork if this is a partial write */
     322     4085091 :       sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
     323     4084069 :     } while (sent_length < 0 && errno == EINTR);
     324             :     GRPC_TIMER_END(GRPC_PTAG_SENDMSG, 0);
     325             : 
     326     4083735 :     if (sent_length < 0) {
     327        1662 :       if (errno == EAGAIN) {
     328        2806 :         tcp->outgoing_slice_idx = unwind_slice_idx;
     329        2806 :         tcp->outgoing_byte_idx = unwind_byte_idx;
     330        2806 :         return FLUSH_PENDING;
     331             :       } else {
     332             :         /* TODO(klempner): Log some of these */
     333          25 :         return FLUSH_ERROR;
     334             :       }
     335             :     }
     336             : 
     337     4082073 :     GPR_ASSERT(tcp->outgoing_byte_idx == 0);
     338     4082073 :     trailing = sending_length - (size_t)sent_length;
     339     8164928 :     while (trailing > 0) {
     340             :       size_t slice_length;
     341             : 
     342         958 :       tcp->outgoing_slice_idx--;
     343         958 :       slice_length = GPR_SLICE_LENGTH(
     344             :           tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
     345         958 :       if (slice_length > trailing) {
     346         176 :         tcp->outgoing_byte_idx = slice_length - trailing;
     347         176 :         break;
     348             :       } else {
     349         782 :         trailing -= slice_length;
     350             :       }
     351             :     }
     352             : 
     353     4082073 :     if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
     354     3841473 :       return FLUSH_DONE;
     355             :     }
     356      240600 :   };
     357             : }
     358             : 
     359        1637 : static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
     360             :                              int success) {
     361        1637 :   grpc_tcp *tcp = (grpc_tcp *)arg;
     362             :   flush_result status;
     363             :   grpc_closure *cb;
     364             : 
     365        1637 :   if (!success) {
     366           3 :     cb = tcp->write_cb;
     367           3 :     tcp->write_cb = NULL;
     368           3 :     cb->cb(exec_ctx, cb->cb_arg, 0);
     369           3 :     TCP_UNREF(exec_ctx, tcp, "write");
     370        1640 :     return;
     371             :   }
     372             : 
     373             :   GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_CB_WRITE, 0);
     374        1634 :   status = tcp_flush(tcp);
     375        1634 :   if (status == FLUSH_PENDING) {
     376          81 :     grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
     377             :   } else {
     378        1553 :     cb = tcp->write_cb;
     379        1553 :     tcp->write_cb = NULL;
     380        1553 :     cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE);
     381        1553 :     TCP_UNREF(exec_ctx, tcp, "write");
     382             :   }
     383             :   GRPC_TIMER_END(GRPC_PTAG_TCP_CB_WRITE, 0);
     384             : }
     385             : 
     386     3841775 : static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
     387             :                       gpr_slice_buffer *buf, grpc_closure *cb) {
     388     3841775 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     389             :   flush_result status;
     390             : 
     391     3841775 :   if (grpc_tcp_trace) {
     392             :     size_t i;
     393             : 
     394        7447 :     for (i = 0; i < buf->count; i++) {
     395        5870 :       char *data =
     396        5870 :           gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
     397        5870 :       gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
     398        5870 :       gpr_free(data);
     399             :     }
     400             :   }
     401             : 
     402             :   GRPC_TIMER_BEGIN(GRPC_PTAG_TCP_WRITE, 0);
     403     3841775 :   GPR_ASSERT(tcp->write_cb == NULL);
     404             : 
     405     3841775 :   if (buf->length == 0) {
     406             :     GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
     407         451 :     grpc_exec_ctx_enqueue(exec_ctx, cb, 1);
     408     3843684 :     return;
     409             :   }
     410     3841324 :   tcp->outgoing_buffer = buf;
     411     3841324 :   tcp->outgoing_slice_idx = 0;
     412     3841324 :   tcp->outgoing_byte_idx = 0;
     413             : 
     414     3841324 :   status = tcp_flush(tcp);
     415     3839057 :   if (status == FLUSH_PENDING) {
     416        1556 :     TCP_REF(tcp, "write");
     417        1556 :     tcp->write_cb = cb;
     418        1556 :     grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
     419             :   } else {
     420     3837501 :     grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE);
     421             :   }
     422             : 
     423             :   GRPC_TIMER_END(GRPC_PTAG_TCP_WRITE, 0);
     424             : }
     425             : 
     426     2705177 : static void tcp_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
     427             :                                grpc_pollset *pollset) {
     428     2705177 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     429     2705177 :   grpc_pollset_add_fd(exec_ctx, pollset, tcp->em_fd);
     430     2706490 : }
     431             : 
     432        1547 : static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
     433             :                                    grpc_pollset_set *pollset_set) {
     434        1547 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     435        1547 :   grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd);
     436        1548 : }
     437             : 
     438        4014 : static char *tcp_get_peer(grpc_endpoint *ep) {
     439        4014 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     440        4014 :   return gpr_strdup(tcp->peer_string);
     441             : }
     442             : 
     443             : static const grpc_endpoint_vtable vtable = {
     444             :     tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
     445             :     tcp_shutdown, tcp_destroy, tcp_get_peer};
     446             : 
     447        7432 : grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
     448             :                                const char *peer_string) {
     449        7432 :   grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
     450        7433 :   tcp->base.vtable = &vtable;
     451        7433 :   tcp->peer_string = gpr_strdup(peer_string);
     452        7434 :   tcp->fd = em_fd->fd;
     453        7434 :   tcp->read_cb = NULL;
     454        7434 :   tcp->write_cb = NULL;
     455        7434 :   tcp->incoming_buffer = NULL;
     456        7434 :   tcp->slice_size = slice_size;
     457        7434 :   tcp->iov_size = 1;
     458        7434 :   tcp->finished_edge = 1;
     459             :   /* paired with unref in grpc_tcp_destroy */
     460        7434 :   gpr_ref_init(&tcp->refcount, 1);
     461        7434 :   tcp->em_fd = em_fd;
     462        7434 :   tcp->read_closure.cb = tcp_handle_read;
     463        7434 :   tcp->read_closure.cb_arg = tcp;
     464        7434 :   tcp->write_closure.cb = tcp_handle_write;
     465        7434 :   tcp->write_closure.cb_arg = tcp;
     466        7434 :   gpr_slice_buffer_init(&tcp->last_read_buffer);
     467             : 
     468        7434 :   return &tcp->base;
     469             : }
     470             : 
     471             : #endif

Generated by: LCOV version 1.10