LCOV - code coverage report
Current view: top level - src/core/iomgr - udp_server.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 131 179 73.2 %
Date: 2015-10-10 Functions: 11 13 84.6 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
      35             : #ifndef _GNU_SOURCE
      36             : #define _GNU_SOURCE
      37             : #endif
      38             : 
      39             : #include <grpc/support/port_platform.h>
      40             : 
      41             : #ifdef GPR_POSIX_SOCKET
      42             : 
      43             : #include "src/core/iomgr/udp_server.h"
      44             : 
      45             : #include <errno.h>
      46             : #include <fcntl.h>
      47             : #include <limits.h>
      48             : #include <netinet/in.h>
      49             : #include <netinet/tcp.h>
      50             : #include <string.h>
      51             : #include <sys/socket.h>
      52             : #include <sys/stat.h>
      53             : #include <sys/types.h>
      54             : #include <sys/un.h>
      55             : #include <unistd.h>
      56             : 
      57             : #include "src/core/iomgr/fd_posix.h"
      58             : #include "src/core/iomgr/pollset_posix.h"
      59             : #include "src/core/iomgr/resolve_address.h"
      60             : #include "src/core/iomgr/sockaddr_utils.h"
      61             : #include "src/core/iomgr/socket_utils_posix.h"
      62             : #include "src/core/support/string.h"
      63             : #include <grpc/support/alloc.h>
      64             : #include <grpc/support/log.h>
      65             : #include <grpc/support/sync.h>
      66             : #include <grpc/support/string_util.h>
      67             : #include <grpc/support/time.h>
      68             : 
      69             : #define INIT_PORT_CAP 2
      70             : 
      71             : /* one listening port */
      72             : typedef struct {
      73             :   int fd;
      74             :   grpc_fd *emfd;
      75             :   grpc_udp_server *server;
      76             :   union {
      77             :     gpr_uint8 untyped[GRPC_MAX_SOCKADDR_SIZE];
      78             :     struct sockaddr sockaddr;
      79             :     struct sockaddr_un un;
      80             :   } addr;
      81             :   size_t addr_len;
      82             :   grpc_closure read_closure;
      83             :   grpc_closure destroyed_closure;
      84             :   grpc_udp_server_read_cb read_cb;
      85             : } server_port;
      86             : 
      87           0 : static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
      88             :   struct stat st;
      89             : 
      90           0 :   if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) {
      91           0 :     unlink(un->sun_path);
      92             :   }
      93           0 : }
      94             : 
      95             : /* the overall server */
      96             : struct grpc_udp_server {
      97             :   gpr_mu mu;
      98             :   gpr_cv cv;
      99             : 
     100             :   /* active port count: how many ports are actually still listening */
     101             :   size_t active_ports;
     102             :   /* destroyed port count: how many ports are completely destroyed */
     103             :   size_t destroyed_ports;
     104             : 
     105             :   /* is this server shutting down? (boolean) */
     106             :   int shutdown;
     107             : 
     108             :   /* all listening ports */
     109             :   server_port *ports;
     110             :   size_t nports;
     111             :   size_t port_capacity;
     112             : 
     113             :   /* shutdown callback */
     114             :   grpc_closure *shutdown_complete;
     115             : 
     116             :   /* all pollsets interested in new connections */
     117             :   grpc_pollset **pollsets;
     118             :   /* number of pollsets in the pollsets array */
     119             :   size_t pollset_count;
     120             :   /* The parent grpc server */
     121             :   grpc_server *grpc_server;
     122             : };
     123             : 
     124           6 : grpc_udp_server *grpc_udp_server_create(void) {
     125           6 :   grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server));
     126           6 :   gpr_mu_init(&s->mu);
     127           6 :   gpr_cv_init(&s->cv);
     128           6 :   s->active_ports = 0;
     129           6 :   s->destroyed_ports = 0;
     130           6 :   s->shutdown = 0;
     131           6 :   s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
     132           6 :   s->nports = 0;
     133           6 :   s->port_capacity = INIT_PORT_CAP;
     134             : 
     135           6 :   return s;
     136             : }
     137             : 
     138           6 : static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
     139           6 :   grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
     140             : 
     141           6 :   gpr_mu_destroy(&s->mu);
     142           6 :   gpr_cv_destroy(&s->cv);
     143             : 
     144           6 :   gpr_free(s->ports);
     145           6 :   gpr_free(s);
     146           6 : }
     147             : 
     148           4 : static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, int success) {
     149           4 :   grpc_udp_server *s = server;
     150           4 :   gpr_mu_lock(&s->mu);
     151           4 :   s->destroyed_ports++;
     152           4 :   if (s->destroyed_ports == s->nports) {
     153           4 :     gpr_mu_unlock(&s->mu);
     154           4 :     finish_shutdown(exec_ctx, s);
     155             :   } else {
     156           0 :     gpr_mu_unlock(&s->mu);
     157             :   }
     158           4 : }
     159             : 
     160             : /* called when all listening endpoints have been shutdown, so no further
     161             :    events will be received on them - at this point it's safe to destroy
     162             :    things */
     163           6 : static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
     164             :   size_t i;
     165             : 
     166             :   /* delete ALL the things */
     167           6 :   gpr_mu_lock(&s->mu);
     168             : 
     169           6 :   if (!s->shutdown) {
     170           0 :     gpr_mu_unlock(&s->mu);
     171           6 :     return;
     172             :   }
     173             : 
     174           6 :   if (s->nports) {
     175           8 :     for (i = 0; i < s->nports; i++) {
     176           4 :       server_port *sp = &s->ports[i];
     177           4 :       if (sp->addr.sockaddr.sa_family == AF_UNIX) {
     178           0 :         unlink_if_unix_domain_socket(&sp->addr.un);
     179             :       }
     180           4 :       sp->destroyed_closure.cb = destroyed_port;
     181           4 :       sp->destroyed_closure.cb_arg = s;
     182           4 :       grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure,
     183             :                      "udp_listener_shutdown");
     184             :     }
     185           4 :     gpr_mu_unlock(&s->mu);
     186             :   } else {
     187           2 :     gpr_mu_unlock(&s->mu);
     188           2 :     finish_shutdown(exec_ctx, s);
     189             :   }
     190             : }
     191             : 
     192           6 : void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
     193             :                              grpc_closure *on_done) {
     194             :   size_t i;
     195           6 :   gpr_mu_lock(&s->mu);
     196             : 
     197           6 :   GPR_ASSERT(!s->shutdown);
     198           6 :   s->shutdown = 1;
     199             : 
     200           6 :   s->shutdown_complete = on_done;
     201             : 
     202             :   /* shutdown all fd's */
     203           6 :   if (s->active_ports) {
     204           6 :     for (i = 0; i < s->nports; i++) {
     205           3 :       grpc_fd_shutdown(exec_ctx, s->ports[i].emfd);
     206             :     }
     207           3 :     gpr_mu_unlock(&s->mu);
     208             :   } else {
     209           3 :     gpr_mu_unlock(&s->mu);
     210           3 :     deactivated_all_ports(exec_ctx, s);
     211             :   }
     212           6 : }
     213             : 
     214             : /* Prepare a recently-created socket for listening. */
     215           4 : static int prepare_socket(int fd, const struct sockaddr *addr,
     216             :                           size_t addr_len) {
     217             :   struct sockaddr_storage sockname_temp;
     218             :   socklen_t sockname_len;
     219             :   int get_local_ip;
     220             :   int rc;
     221             : 
     222           4 :   if (fd < 0) {
     223           0 :     goto error;
     224             :   }
     225             : 
     226           4 :   if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1)) {
     227           0 :     gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
     228           0 :             strerror(errno));
     229             :   }
     230             : 
     231           4 :   get_local_ip = 1;
     232           4 :   rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
     233             :                   sizeof(get_local_ip));
     234           4 :   if (rc == 0 && addr->sa_family == AF_INET6) {
     235             : #if !defined(__APPLE__)
     236           4 :     rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
     237             :                     sizeof(get_local_ip));
     238             : #endif
     239             :   }
     240             : 
     241           4 :   GPR_ASSERT(addr_len < ~(socklen_t)0);
     242           4 :   if (bind(fd, addr, (socklen_t)addr_len) < 0) {
     243             :     char *addr_str;
     244           0 :     grpc_sockaddr_to_string(&addr_str, addr, 0);
     245           0 :     gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
     246           0 :     gpr_free(addr_str);
     247           0 :     goto error;
     248             :   }
     249             : 
     250           4 :   sockname_len = sizeof(sockname_temp);
     251           4 :   if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) {
     252           0 :     goto error;
     253             :   }
     254             : 
     255           4 :   return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
     256             : 
     257             : error:
     258           0 :   if (fd >= 0) {
     259           0 :     close(fd);
     260             :   }
     261           0 :   return -1;
     262             : }
     263             : 
     264             : /* event manager callback when reads are ready */
     265          14 : static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
     266          14 :   server_port *sp = arg;
     267             : 
     268          14 :   if (success == 0) {
     269           3 :     gpr_mu_lock(&sp->server->mu);
     270           3 :     if (0 == --sp->server->active_ports) {
     271           3 :       gpr_mu_unlock(&sp->server->mu);
     272           3 :       deactivated_all_ports(exec_ctx, sp->server);
     273             :     } else {
     274           0 :       gpr_mu_unlock(&sp->server->mu);
     275             :     }
     276          17 :     return;
     277             :   }
     278             : 
     279             :   /* Tell the registered callback that data is available to read. */
     280          11 :   GPR_ASSERT(sp->read_cb);
     281          11 :   sp->read_cb(exec_ctx, sp->emfd, sp->server->grpc_server);
     282             : 
     283             :   /* Re-arm the notification event so we get another chance to read. */
     284          11 :   grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
     285             : }
     286             : 
     287           4 : static int add_socket_to_server(grpc_udp_server *s, int fd,
     288             :                                 const struct sockaddr *addr, size_t addr_len,
     289             :                                 grpc_udp_server_read_cb read_cb) {
     290             :   server_port *sp;
     291             :   int port;
     292             :   char *addr_str;
     293             :   char *name;
     294             : 
     295           4 :   port = prepare_socket(fd, addr, addr_len);
     296           4 :   if (port >= 0) {
     297           4 :     grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
     298           4 :     gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
     299           4 :     gpr_free(addr_str);
     300           4 :     gpr_mu_lock(&s->mu);
     301             :     /* append it to the list under a lock */
     302           4 :     if (s->nports == s->port_capacity) {
     303           0 :       s->port_capacity *= 2;
     304           0 :       s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity);
     305             :     }
     306           4 :     sp = &s->ports[s->nports++];
     307           4 :     sp->server = s;
     308           4 :     sp->fd = fd;
     309           4 :     sp->emfd = grpc_fd_create(fd, name);
     310           4 :     memcpy(sp->addr.untyped, addr, addr_len);
     311           4 :     sp->addr_len = addr_len;
     312           4 :     sp->read_cb = read_cb;
     313           4 :     GPR_ASSERT(sp->emfd);
     314           4 :     gpr_mu_unlock(&s->mu);
     315           4 :     gpr_free(name);
     316             :   }
     317             : 
     318           4 :   return port;
     319             : }
     320             : 
     321           4 : int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
     322             :                              size_t addr_len, grpc_udp_server_read_cb read_cb) {
     323           4 :   int allocated_port1 = -1;
     324           4 :   int allocated_port2 = -1;
     325             :   unsigned i;
     326             :   int fd;
     327             :   grpc_dualstack_mode dsmode;
     328             :   struct sockaddr_in6 addr6_v4mapped;
     329             :   struct sockaddr_in wild4;
     330             :   struct sockaddr_in6 wild6;
     331             :   struct sockaddr_in addr4_copy;
     332           4 :   struct sockaddr *allocated_addr = NULL;
     333             :   struct sockaddr_storage sockname_temp;
     334             :   socklen_t sockname_len;
     335             :   int port;
     336             : 
     337           4 :   if (((struct sockaddr *)addr)->sa_family == AF_UNIX) {
     338           0 :     unlink_if_unix_domain_socket(addr);
     339             :   }
     340             : 
     341             :   /* Check if this is a wildcard port, and if so, try to keep the port the same
     342             :      as some previously created listener. */
     343           4 :   if (grpc_sockaddr_get_port(addr) == 0) {
     344           4 :     for (i = 0; i < s->nports; i++) {
     345           0 :       sockname_len = sizeof(sockname_temp);
     346           0 :       if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp,
     347             :                            &sockname_len)) {
     348           0 :         port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
     349           0 :         if (port > 0) {
     350           0 :           allocated_addr = malloc(addr_len);
     351           0 :           memcpy(allocated_addr, addr, addr_len);
     352           0 :           grpc_sockaddr_set_port(allocated_addr, port);
     353           0 :           addr = allocated_addr;
     354           0 :           break;
     355             :         }
     356             :       }
     357             :     }
     358             :   }
     359             : 
     360           4 :   if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
     361           4 :     addr = (const struct sockaddr *)&addr6_v4mapped;
     362           4 :     addr_len = sizeof(addr6_v4mapped);
     363             :   }
     364             : 
     365             :   /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
     366           4 :   if (grpc_sockaddr_is_wildcard(addr, &port)) {
     367           4 :     grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
     368             : 
     369             :     /* Try listening on IPv6 first. */
     370           4 :     addr = (struct sockaddr *)&wild6;
     371           4 :     addr_len = sizeof(wild6);
     372           4 :     fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
     373           4 :     allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
     374           4 :     if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
     375           4 :       goto done;
     376             :     }
     377             : 
     378             :     /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
     379           0 :     if (port == 0 && allocated_port1 > 0) {
     380           0 :       grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1);
     381             :     }
     382           0 :     addr = (struct sockaddr *)&wild4;
     383           0 :     addr_len = sizeof(wild4);
     384             :   }
     385             : 
     386           0 :   fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
     387           0 :   if (fd < 0) {
     388           0 :     gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
     389             :   }
     390           0 :   if (dsmode == GRPC_DSMODE_IPV4 &&
     391           0 :       grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
     392           0 :     addr = (struct sockaddr *)&addr4_copy;
     393           0 :     addr_len = sizeof(addr4_copy);
     394             :   }
     395           0 :   allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
     396             : 
     397             : done:
     398           4 :   gpr_free(allocated_addr);
     399           4 :   return allocated_port1 >= 0 ? allocated_port1 : allocated_port2;
     400             : }
     401             : 
     402           2 : int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) {
     403           2 :   return (port_index < s->nports) ? s->ports[port_index].fd : -1;
     404             : }
     405             : 
     406           4 : void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
     407             :                            grpc_pollset **pollsets, size_t pollset_count,
     408             :                            grpc_server *server) {
     409             :   size_t i, j;
     410           4 :   gpr_mu_lock(&s->mu);
     411           4 :   GPR_ASSERT(s->active_ports == 0);
     412           4 :   s->pollsets = pollsets;
     413           4 :   s->grpc_server = server;
     414           7 :   for (i = 0; i < s->nports; i++) {
     415           5 :     for (j = 0; j < pollset_count; j++) {
     416           2 :       grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd);
     417             :     }
     418           3 :     s->ports[i].read_closure.cb = on_read;
     419           3 :     s->ports[i].read_closure.cb_arg = &s->ports[i];
     420           3 :     grpc_fd_notify_on_read(exec_ctx, s->ports[i].emfd,
     421           3 :                            &s->ports[i].read_closure);
     422           3 :     s->active_ports++;
     423             :   }
     424           4 :   gpr_mu_unlock(&s->mu);
     425           4 : }
     426             : 
     427             : /* TODO(rjshade): Add a test for this method. */
     428           0 : void grpc_udp_server_write(server_port *sp, const char *buffer, size_t buf_len,
     429             :                            const struct sockaddr *peer_address) {
     430             :   ssize_t rc;
     431           0 :   rc = sendto(sp->fd, buffer, buf_len, 0, peer_address, sizeof(peer_address));
     432           0 :   if (rc < 0) {
     433           0 :     gpr_log(GPR_ERROR, "Unable to send data: %s", strerror(errno));
     434             :   }
     435           0 : }
     436             : 
     437             : #endif

Generated by: LCOV version 1.10