LCOV - code coverage report
Current view: top level - core/iomgr - tcp_posix.c (source / functions) Hit Total Coverage
Test: tmp.CaZ6RjdVn2 Lines: 212 212 100.0 %
Date: 2015-12-10 22:15:08 Functions: 17 17 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include <grpc/support/port_platform.h>
      35             : 
      36             : #ifdef GPR_POSIX_SOCKET
      37             : 
      38             : #include "src/core/iomgr/tcp_posix.h"
      39             : 
      40             : #include <errno.h>
      41             : #include <stdlib.h>
      42             : #include <string.h>
      43             : #include <sys/types.h>
      44             : #include <sys/socket.h>
      45             : #include <unistd.h>
      46             : 
      47             : #include <grpc/support/alloc.h>
      48             : #include <grpc/support/log.h>
      49             : #include <grpc/support/slice.h>
      50             : #include <grpc/support/string_util.h>
      51             : #include <grpc/support/sync.h>
      52             : #include <grpc/support/time.h>
      53             : 
      54             : #include "src/core/support/string.h"
      55             : #include "src/core/debug/trace.h"
      56             : #include "src/core/profiling/timers.h"
      57             : 
      58             : #ifdef GPR_HAVE_MSG_NOSIGNAL
      59             : #define SENDMSG_FLAGS MSG_NOSIGNAL
      60             : #else
      61             : #define SENDMSG_FLAGS 0
      62             : #endif
      63             : 
      64             : #ifdef GPR_MSG_IOVLEN_TYPE
      65             : typedef GPR_MSG_IOVLEN_TYPE msg_iovlen_type;
      66             : #else
      67             : typedef size_t msg_iovlen_type;
      68             : #endif
      69             : 
      70             : int grpc_tcp_trace = 0;
      71             : 
      72             : typedef struct {
      73             :   grpc_endpoint base;
      74             :   grpc_fd *em_fd;
      75             :   int fd;
      76             :   int finished_edge;
      77             :   msg_iovlen_type iov_size; /* Number of slices to allocate per read attempt */
      78             :   size_t slice_size;
      79             :   gpr_refcount refcount;
      80             : 
      81             :   /* garbage after the last read */
      82             :   gpr_slice_buffer last_read_buffer;
      83             : 
      84             :   gpr_slice_buffer *incoming_buffer;
      85             :   gpr_slice_buffer *outgoing_buffer;
      86             :   /** slice within outgoing_buffer to write next */
      87             :   size_t outgoing_slice_idx;
      88             :   /** byte within outgoing_buffer->slices[outgoing_slice_idx] to write next */
      89             :   size_t outgoing_byte_idx;
      90             : 
      91             :   grpc_closure *read_cb;
      92             :   grpc_closure *write_cb;
      93             :   grpc_closure *release_fd_cb;
      94             :   int *release_fd;
      95             : 
      96             :   grpc_closure read_closure;
      97             :   grpc_closure write_closure;
      98             : 
      99             :   char *peer_string;
     100             : } grpc_tcp;
     101             : 
     102             : static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
     103             :                             int success);
     104             : static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
     105             :                              int success);
     106             : 
     107        6189 : static void tcp_shutdown(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
     108        6108 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     109        6189 :   grpc_fd_shutdown(exec_ctx, tcp->em_fd);
     110        6189 : }
     111             : 
     112       10963 : static void tcp_free(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
     113       10963 :   grpc_fd_orphan(exec_ctx, tcp->em_fd, tcp->release_fd_cb, tcp->release_fd,
     114             :                  "tcp_unref_orphan");
     115       10963 :   gpr_slice_buffer_destroy(&tcp->last_read_buffer);
     116       10963 :   gpr_free(tcp->peer_string);
     117       10963 :   gpr_free(tcp);
     118       10963 : }
     119             : 
     120             : /*#define GRPC_TCP_REFCOUNT_DEBUG*/
     121             : #ifdef GRPC_TCP_REFCOUNT_DEBUG
     122             : #define TCP_UNREF(cl, tcp, reason) \
     123             :   tcp_unref((cl), (tcp), (reason), __FILE__, __LINE__)
     124             : #define TCP_REF(tcp, reason) tcp_ref((tcp), (reason), __FILE__, __LINE__)
     125             : static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp,
     126             :                       const char *reason, const char *file, int line) {
     127             :   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP unref %p : %s %d -> %d", tcp,
     128             :           reason, tcp->refcount.count, tcp->refcount.count - 1);
     129             :   if (gpr_unref(&tcp->refcount)) {
     130             :     tcp_free(exec_ctx, tcp);
     131             :   }
     132             : }
     133             : 
     134             : static void tcp_ref(grpc_tcp *tcp, const char *reason, const char *file,
     135             :                     int line) {
     136             :   gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "TCP   ref %p : %s %d -> %d", tcp,
     137             :           reason, tcp->refcount.count, tcp->refcount.count + 1);
     138             :   gpr_ref(&tcp->refcount);
     139             : }
     140             : #else
     141             : #define TCP_UNREF(cl, tcp, reason) tcp_unref((cl), (tcp))
     142             : #define TCP_REF(tcp, reason) tcp_ref((tcp))
     143     7474524 : static void tcp_unref(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
     144     7474524 :   if (gpr_unref(&tcp->refcount)) {
     145       10963 :     tcp_free(exec_ctx, tcp);
     146             :   }
     147     7474524 : }
     148             : 
     149     7463558 : static void tcp_ref(grpc_tcp *tcp) { gpr_ref(&tcp->refcount); }
     150             : #endif
     151             : 
     152       10961 : static void tcp_destroy(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep) {
     153       10880 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     154       10961 :   TCP_UNREF(exec_ctx, tcp, "destroy");
     155       10962 : }
     156             : 
     157     7461992 : static void call_read_cb(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp, int success) {
     158     7461992 :   grpc_closure *cb = tcp->read_cb;
     159             : 
     160     7461992 :   if (grpc_tcp_trace) {
     161             :     size_t i;
     162        1294 :     gpr_log(GPR_DEBUG, "read: success=%d", success);
     163        2272 :     for (i = 0; i < tcp->incoming_buffer->count; i++) {
     164         978 :       char *dump = gpr_dump_slice(tcp->incoming_buffer->slices[i],
     165             :                                   GPR_DUMP_HEX | GPR_DUMP_ASCII);
     166         978 :       gpr_log(GPR_DEBUG, "READ %p: %s", tcp, dump);
     167         978 :       gpr_free(dump);
     168             :     }
     169             :   }
     170             : 
     171     7461992 :   tcp->read_cb = NULL;
     172     7461992 :   tcp->incoming_buffer = NULL;
     173     7461992 :   cb->cb(exec_ctx, cb->cb_arg, success);
     174     7461978 : }
     175             : 
     176             : #define MAX_READ_IOVEC 4
     177     9804923 : static void tcp_continue_read(grpc_exec_ctx *exec_ctx, grpc_tcp *tcp) {
     178             :   struct msghdr msg;
     179             :   struct iovec iov[MAX_READ_IOVEC];
     180             :   ssize_t read_bytes;
     181             :   size_t i;
     182             : 
     183     9804923 :   GPR_ASSERT(!tcp->finished_edge);
     184     9804923 :   GPR_ASSERT(tcp->iov_size <= MAX_READ_IOVEC);
     185     9804923 :   GPR_ASSERT(tcp->incoming_buffer->count <= MAX_READ_IOVEC);
     186             :   GPR_TIMER_BEGIN("tcp_continue_read", 0);
     187             : 
     188    39552444 :   while (tcp->incoming_buffer->count < (size_t)tcp->iov_size) {
     189    19943824 :     gpr_slice_buffer_add_indexed(tcp->incoming_buffer,
     190             :                                  gpr_slice_malloc(tcp->slice_size));
     191             :   }
     192    38874101 :   for (i = 0; i < tcp->incoming_buffer->count; i++) {
     193    29070405 :     iov[i].iov_base = GPR_SLICE_START_PTR(tcp->incoming_buffer->slices[i]);
     194    29070405 :     iov[i].iov_len = GPR_SLICE_LENGTH(tcp->incoming_buffer->slices[i]);
     195             :   }
     196             : 
     197     9804939 :   msg.msg_name = NULL;
     198     9804939 :   msg.msg_namelen = 0;
     199     9804939 :   msg.msg_iov = iov;
     200     9804939 :   msg.msg_iovlen = tcp->iov_size;
     201     9804939 :   msg.msg_control = NULL;
     202     9804939 :   msg.msg_controllen = 0;
     203     9804939 :   msg.msg_flags = 0;
     204             : 
     205             :   GPR_TIMER_BEGIN("recvmsg", 1);
     206             :   do {
     207     9804898 :     read_bytes = recvmsg(tcp->fd, &msg, 0);
     208     9804936 :   } while (read_bytes < 0 && errno == EINTR);
     209             :   GPR_TIMER_END("recvmsg", 0);
     210             : 
     211     9804982 :   if (read_bytes < 0) {
     212             :     /* NB: After calling call_read_cb a parallel call of the read handler may
     213             :      * be running. */
     214     2348377 :     if (errno == EAGAIN) {
     215     2348314 :       if (tcp->iov_size > 1) {
     216      122029 :         tcp->iov_size /= 2;
     217             :       }
     218             :       /* We've consumed the edge, request a new one */
     219     2348314 :       grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
     220             :     } else {
     221             :       /* TODO(klempner): Log interesting errors */
     222          66 :       gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
     223          66 :       call_read_cb(exec_ctx, tcp, 0);
     224          66 :       TCP_UNREF(exec_ctx, tcp, "read");
     225             :     }
     226     7456605 :   } else if (read_bytes == 0) {
     227             :     /* 0 read size ==> end of stream */
     228        4943 :     gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
     229        4943 :     call_read_cb(exec_ctx, tcp, 0);
     230        4943 :     TCP_UNREF(exec_ctx, tcp, "read");
     231             :   } else {
     232     7451662 :     GPR_ASSERT((size_t)read_bytes <= tcp->incoming_buffer->length);
     233     7451662 :     if ((size_t)read_bytes < tcp->incoming_buffer->length) {
     234     4912539 :       gpr_slice_buffer_trim_end(
     235             :           tcp->incoming_buffer,
     236     2455967 :           tcp->incoming_buffer->length - (size_t)read_bytes,
     237             :           &tcp->last_read_buffer);
     238     4995090 :     } else if (tcp->iov_size < MAX_READ_IOVEC) {
     239      230062 :       ++tcp->iov_size;
     240             :     }
     241     7451652 :     GPR_ASSERT((size_t)read_bytes == tcp->incoming_buffer->length);
     242     7451652 :     call_read_cb(exec_ctx, tcp, 1);
     243     7451676 :     TCP_UNREF(exec_ctx, tcp, "read");
     244             :   }
     245             : 
     246             :   GPR_TIMER_END("tcp_continue_read", 0);
     247     9805002 : }
     248             : 
     249     9810258 : static void tcp_handle_read(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
     250             :                             int success) {
     251     9808974 :   grpc_tcp *tcp = (grpc_tcp *)arg;
     252     9810258 :   GPR_ASSERT(!tcp->finished_edge);
     253             : 
     254     9810258 :   if (!success) {
     255        5320 :     gpr_slice_buffer_reset_and_unref(tcp->incoming_buffer);
     256        5320 :     call_read_cb(exec_ctx, tcp, 0);
     257        5320 :     TCP_UNREF(exec_ctx, tcp, "read");
     258             :   } else {
     259     9804938 :     tcp_continue_read(exec_ctx, tcp);
     260             :   }
     261     9810246 : }
     262             : 
     263     7462003 : static void tcp_read(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
     264             :                      gpr_slice_buffer *incoming_buffer, grpc_closure *cb) {
     265     7461247 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     266     7462003 :   GPR_ASSERT(tcp->read_cb == NULL);
     267     7462003 :   tcp->read_cb = cb;
     268     7462003 :   tcp->incoming_buffer = incoming_buffer;
     269     7462003 :   gpr_slice_buffer_reset_and_unref(incoming_buffer);
     270     7462004 :   gpr_slice_buffer_swap(incoming_buffer, &tcp->last_read_buffer);
     271     7461246 :   TCP_REF(tcp, "read");
     272     7462004 :   if (tcp->finished_edge) {
     273       10502 :     tcp->finished_edge = 0;
     274       10502 :     grpc_fd_notify_on_read(exec_ctx, tcp->em_fd, &tcp->read_closure);
     275             :   } else {
     276     7451502 :     grpc_exec_ctx_enqueue(exec_ctx, &tcp->read_closure, 1);
     277             :   }
     278     7462003 : }
     279             : 
     280             : typedef enum { FLUSH_DONE, FLUSH_PENDING, FLUSH_ERROR } flush_result;
     281             : 
     282             : #define MAX_WRITE_IOVEC 16
     283     5583628 : static flush_result tcp_flush(grpc_tcp *tcp) {
     284             :   struct msghdr msg;
     285             :   struct iovec iov[MAX_WRITE_IOVEC];
     286             :   msg_iovlen_type iov_size;
     287             :   ssize_t sent_length;
     288             :   size_t sending_length;
     289             :   size_t trailing;
     290             :   size_t unwind_slice_idx;
     291             :   size_t unwind_byte_idx;
     292             : 
     293             :   for (;;) {
     294     5582971 :     sending_length = 0;
     295     5583684 :     unwind_slice_idx = tcp->outgoing_slice_idx;
     296     5583684 :     unwind_byte_idx = tcp->outgoing_byte_idx;
     297    34364178 :     for (iov_size = 0; tcp->outgoing_slice_idx != tcp->outgoing_buffer->count &&
     298             :                            iov_size != MAX_WRITE_IOVEC;
     299    23196810 :          iov_size++) {
     300    23196810 :       iov[iov_size].iov_base =
     301    46392547 :           GPR_SLICE_START_PTR(
     302    46392547 :               tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) +
     303             :           tcp->outgoing_byte_idx;
     304    23196810 :       iov[iov_size].iov_len =
     305    23196810 :           GPR_SLICE_LENGTH(
     306    23196810 :               tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]) -
     307    23194665 :           tcp->outgoing_byte_idx;
     308    23196810 :       sending_length += iov[iov_size].iov_len;
     309    23196810 :       tcp->outgoing_slice_idx++;
     310    23196810 :       tcp->outgoing_byte_idx = 0;
     311             :     }
     312     5583684 :     GPR_ASSERT(iov_size > 0);
     313             : 
     314     5583684 :     msg.msg_name = NULL;
     315     5583684 :     msg.msg_namelen = 0;
     316     5583684 :     msg.msg_iov = iov;
     317     5583684 :     msg.msg_iovlen = iov_size;
     318     5583684 :     msg.msg_control = NULL;
     319     5583684 :     msg.msg_controllen = 0;
     320     5583684 :     msg.msg_flags = 0;
     321             : 
     322             :     GPR_TIMER_BEGIN("sendmsg", 1);
     323             :     do {
     324             :       /* TODO(klempner): Cork if this is a partial write */
     325     5583698 :       sent_length = sendmsg(tcp->fd, &msg, SENDMSG_FLAGS);
     326     5582380 :     } while (sent_length < 0 && errno == EINTR);
     327             :     GPR_TIMER_END("sendmsg", 0);
     328             : 
     329     5582366 :     if (sent_length < 0) {
     330        1718 :       if (errno == EAGAIN) {
     331        1721 :         tcp->outgoing_slice_idx = unwind_slice_idx;
     332        1721 :         tcp->outgoing_byte_idx = unwind_byte_idx;
     333        1721 :         return FLUSH_PENDING;
     334             :       } else {
     335             :         /* TODO(klempner): Log some of these */
     336          81 :         return FLUSH_ERROR;
     337             :       }
     338             :     }
     339             : 
     340     5580648 :     GPR_ASSERT(tcp->outgoing_byte_idx == 0);
     341     5580648 :     trailing = sending_length - (size_t)sent_length;
     342    11162078 :     while (trailing > 0) {
     343             :       size_t slice_length;
     344             : 
     345         958 :       tcp->outgoing_slice_idx--;
     346         958 :       slice_length = GPR_SLICE_LENGTH(
     347             :           tcp->outgoing_buffer->slices[tcp->outgoing_slice_idx]);
     348         958 :       if (slice_length > trailing) {
     349         176 :         tcp->outgoing_byte_idx = slice_length - trailing;
     350         176 :         break;
     351             :       } else {
     352         782 :         trailing -= slice_length;
     353             :       }
     354             :     }
     355             : 
     356     5580648 :     if (tcp->outgoing_slice_idx == tcp->outgoing_buffer->count) {
     357     5024040 :       return FLUSH_DONE;
     358             :     }
     359      555895 :   };
     360             : }
     361             : 
     362        1637 : static void tcp_handle_write(grpc_exec_ctx *exec_ctx, void *arg /* grpc_tcp */,
     363             :                              int success) {
     364        1637 :   grpc_tcp *tcp = (grpc_tcp *)arg;
     365             :   flush_result status;
     366             :   grpc_closure *cb;
     367             : 
     368        1637 :   if (!success) {
     369           3 :     cb = tcp->write_cb;
     370           3 :     tcp->write_cb = NULL;
     371           3 :     cb->cb(exec_ctx, cb->cb_arg, 0);
     372           3 :     TCP_UNREF(exec_ctx, tcp, "write");
     373        1640 :     return;
     374             :   }
     375             : 
     376        1634 :   status = tcp_flush(tcp);
     377        1634 :   if (status == FLUSH_PENDING) {
     378          81 :     grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
     379             :   } else {
     380        1553 :     cb = tcp->write_cb;
     381        1553 :     tcp->write_cb = NULL;
     382             :     GPR_TIMER_BEGIN("tcp_handle_write.cb", 0);
     383        1553 :     cb->cb(exec_ctx, cb->cb_arg, status == FLUSH_DONE);
     384             :     GPR_TIMER_END("tcp_handle_write.cb", 0);
     385        1553 :     TCP_UNREF(exec_ctx, tcp, "write");
     386             :   }
     387             : }
     388             : 
     389     5022810 : static void tcp_write(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
     390             :                       gpr_slice_buffer *buf, grpc_closure *cb) {
     391     5022136 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     392             :   flush_result status;
     393             : 
     394     5022810 :   if (grpc_tcp_trace) {
     395             :     size_t i;
     396             : 
     397       10480 :     for (i = 0; i < buf->count; i++) {
     398        8897 :       char *data =
     399        8897 :           gpr_dump_slice(buf->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII);
     400        8897 :       gpr_log(GPR_DEBUG, "WRITE %p: %s", tcp, data);
     401        8897 :       gpr_free(data);
     402             :     }
     403             :   }
     404             : 
     405             :   GPR_TIMER_BEGIN("tcp_write", 0);
     406     5022810 :   GPR_ASSERT(tcp->write_cb == NULL);
     407             : 
     408     5022810 :   if (buf->length == 0) {
     409             :     GPR_TIMER_END("tcp_write", 0);
     410         595 :     grpc_exec_ctx_enqueue(exec_ctx, cb, 1);
     411     5025909 :     return;
     412             :   }
     413     5022215 :   tcp->outgoing_buffer = buf;
     414     5022215 :   tcp->outgoing_slice_idx = 0;
     415     5022215 :   tcp->outgoing_byte_idx = 0;
     416             : 
     417     5022215 :   status = tcp_flush(tcp);
     418     5024869 :   if (status == FLUSH_PENDING) {
     419        1556 :     TCP_REF(tcp, "write");
     420        1556 :     tcp->write_cb = cb;
     421        1556 :     grpc_fd_notify_on_write(exec_ctx, tcp->em_fd, &tcp->write_closure);
     422             :   } else {
     423     5023313 :     grpc_exec_ctx_enqueue(exec_ctx, cb, status == FLUSH_DONE);
     424             :   }
     425             : 
     426             :   GPR_TIMER_END("tcp_write", 0);
     427             : }
     428             : 
     429     4444556 : static void tcp_add_to_pollset(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
     430             :                                grpc_pollset *pollset) {
     431     4444297 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     432     4444556 :   grpc_pollset_add_fd(exec_ctx, pollset, tcp->em_fd);
     433     4446806 : }
     434             : 
     435        2347 : static void tcp_add_to_pollset_set(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
     436             :                                    grpc_pollset_set *pollset_set) {
     437        2306 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     438        2347 :   grpc_pollset_set_add_fd(exec_ctx, pollset_set, tcp->em_fd);
     439        2347 : }
     440             : 
     441        6038 : static char *tcp_get_peer(grpc_endpoint *ep) {
     442        5956 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     443        6038 :   return gpr_strdup(tcp->peer_string);
     444             : }
     445             : 
     446             : static const grpc_endpoint_vtable vtable = {
     447             :     tcp_read, tcp_write, tcp_add_to_pollset, tcp_add_to_pollset_set,
     448             :     tcp_shutdown, tcp_destroy, tcp_get_peer};
     449             : 
     450       10964 : grpc_endpoint *grpc_tcp_create(grpc_fd *em_fd, size_t slice_size,
     451             :                                const char *peer_string) {
     452       10964 :   grpc_tcp *tcp = (grpc_tcp *)gpr_malloc(sizeof(grpc_tcp));
     453       10964 :   tcp->base.vtable = &vtable;
     454       10964 :   tcp->peer_string = gpr_strdup(peer_string);
     455       10963 :   tcp->fd = em_fd->fd;
     456       10963 :   tcp->read_cb = NULL;
     457       10963 :   tcp->write_cb = NULL;
     458       10963 :   tcp->release_fd_cb = NULL;
     459       10963 :   tcp->release_fd = NULL;
     460       10963 :   tcp->incoming_buffer = NULL;
     461       10963 :   tcp->slice_size = slice_size;
     462       10963 :   tcp->iov_size = 1;
     463       10963 :   tcp->finished_edge = 1;
     464             :   /* paired with unref in grpc_tcp_destroy */
     465       10963 :   gpr_ref_init(&tcp->refcount, 1);
     466       10963 :   tcp->em_fd = em_fd;
     467       10963 :   tcp->read_closure.cb = tcp_handle_read;
     468       10963 :   tcp->read_closure.cb_arg = tcp;
     469       10963 :   tcp->write_closure.cb = tcp_handle_write;
     470       10963 :   tcp->write_closure.cb_arg = tcp;
     471       10963 :   gpr_slice_buffer_init(&tcp->last_read_buffer);
     472             : 
     473       10963 :   return &tcp->base;
     474             : }
     475             : 
     476           1 : void grpc_tcp_destroy_and_release_fd(grpc_exec_ctx *exec_ctx, grpc_endpoint *ep,
     477             :                                      int *fd, grpc_closure *done) {
     478           1 :   grpc_tcp *tcp = (grpc_tcp *)ep;
     479           1 :   GPR_ASSERT(ep->vtable == &vtable);
     480           1 :   tcp->release_fd = fd;
     481           1 :   tcp->release_fd_cb = done;
     482           1 :   TCP_UNREF(exec_ctx, tcp, "destroy");
     483           1 : }
     484             : 
     485             : #endif

Generated by: LCOV version 1.11