Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : /* FIXME: "posix" files shouldn't be depending on _GNU_SOURCE */
35 : #ifndef _GNU_SOURCE
36 : #define _GNU_SOURCE
37 : #endif
38 :
39 : #include <grpc/support/port_platform.h>
40 :
41 : #ifdef GPR_POSIX_SOCKET
42 :
43 : #include "src/core/iomgr/udp_server.h"
44 :
45 : #include <errno.h>
46 : #include <fcntl.h>
47 : #include <limits.h>
48 : #include <netinet/in.h>
49 : #include <netinet/tcp.h>
50 : #include <string.h>
51 : #include <sys/socket.h>
52 : #include <sys/stat.h>
53 : #include <sys/types.h>
54 : #include <sys/un.h>
55 : #include <unistd.h>
56 :
57 : #include "src/core/iomgr/fd_posix.h"
58 : #include "src/core/iomgr/pollset_posix.h"
59 : #include "src/core/iomgr/resolve_address.h"
60 : #include "src/core/iomgr/sockaddr_utils.h"
61 : #include "src/core/iomgr/socket_utils_posix.h"
62 : #include "src/core/support/string.h"
63 : #include <grpc/support/alloc.h>
64 : #include <grpc/support/log.h>
65 : #include <grpc/support/sync.h>
66 : #include <grpc/support/string_util.h>
67 : #include <grpc/support/time.h>
68 :
69 : #define INIT_PORT_CAP 2
70 :
71 : /* one listening port */
72 : typedef struct {
73 : int fd;
74 : grpc_fd *emfd;
75 : grpc_udp_server *server;
76 : union {
77 : gpr_uint8 untyped[GRPC_MAX_SOCKADDR_SIZE];
78 : struct sockaddr sockaddr;
79 : struct sockaddr_un un;
80 : } addr;
81 : size_t addr_len;
82 : grpc_closure read_closure;
83 : grpc_closure destroyed_closure;
84 : grpc_udp_server_read_cb read_cb;
85 : } server_port;
86 :
87 0 : static void unlink_if_unix_domain_socket(const struct sockaddr_un *un) {
88 : struct stat st;
89 :
90 0 : if (stat(un->sun_path, &st) == 0 && (st.st_mode & S_IFMT) == S_IFSOCK) {
91 0 : unlink(un->sun_path);
92 : }
93 0 : }
94 :
95 : /* the overall server */
96 : struct grpc_udp_server {
97 : gpr_mu mu;
98 : gpr_cv cv;
99 :
100 : /* active port count: how many ports are actually still listening */
101 : size_t active_ports;
102 : /* destroyed port count: how many ports are completely destroyed */
103 : size_t destroyed_ports;
104 :
105 : /* is this server shutting down? (boolean) */
106 : int shutdown;
107 :
108 : /* all listening ports */
109 : server_port *ports;
110 : size_t nports;
111 : size_t port_capacity;
112 :
113 : /* shutdown callback */
114 : grpc_closure *shutdown_complete;
115 :
116 : /* all pollsets interested in new connections */
117 : grpc_pollset **pollsets;
118 : /* number of pollsets in the pollsets array */
119 : size_t pollset_count;
120 : /* The parent grpc server */
121 : grpc_server *grpc_server;
122 : };
123 :
124 6 : grpc_udp_server *grpc_udp_server_create(void) {
125 6 : grpc_udp_server *s = gpr_malloc(sizeof(grpc_udp_server));
126 6 : gpr_mu_init(&s->mu);
127 6 : gpr_cv_init(&s->cv);
128 6 : s->active_ports = 0;
129 6 : s->destroyed_ports = 0;
130 6 : s->shutdown = 0;
131 6 : s->ports = gpr_malloc(sizeof(server_port) * INIT_PORT_CAP);
132 6 : s->nports = 0;
133 6 : s->port_capacity = INIT_PORT_CAP;
134 :
135 6 : return s;
136 : }
137 :
138 6 : static void finish_shutdown(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
139 6 : grpc_exec_ctx_enqueue(exec_ctx, s->shutdown_complete, 1);
140 :
141 6 : gpr_mu_destroy(&s->mu);
142 6 : gpr_cv_destroy(&s->cv);
143 :
144 6 : gpr_free(s->ports);
145 6 : gpr_free(s);
146 6 : }
147 :
148 4 : static void destroyed_port(grpc_exec_ctx *exec_ctx, void *server, int success) {
149 4 : grpc_udp_server *s = server;
150 4 : gpr_mu_lock(&s->mu);
151 4 : s->destroyed_ports++;
152 4 : if (s->destroyed_ports == s->nports) {
153 4 : gpr_mu_unlock(&s->mu);
154 4 : finish_shutdown(exec_ctx, s);
155 : } else {
156 0 : gpr_mu_unlock(&s->mu);
157 : }
158 4 : }
159 :
160 : /* called when all listening endpoints have been shutdown, so no further
161 : events will be received on them - at this point it's safe to destroy
162 : things */
163 6 : static void deactivated_all_ports(grpc_exec_ctx *exec_ctx, grpc_udp_server *s) {
164 : size_t i;
165 :
166 : /* delete ALL the things */
167 6 : gpr_mu_lock(&s->mu);
168 :
169 6 : if (!s->shutdown) {
170 0 : gpr_mu_unlock(&s->mu);
171 6 : return;
172 : }
173 :
174 6 : if (s->nports) {
175 8 : for (i = 0; i < s->nports; i++) {
176 4 : server_port *sp = &s->ports[i];
177 4 : if (sp->addr.sockaddr.sa_family == AF_UNIX) {
178 0 : unlink_if_unix_domain_socket(&sp->addr.un);
179 : }
180 4 : sp->destroyed_closure.cb = destroyed_port;
181 4 : sp->destroyed_closure.cb_arg = s;
182 4 : grpc_fd_orphan(exec_ctx, sp->emfd, &sp->destroyed_closure, NULL,
183 : "udp_listener_shutdown");
184 : }
185 4 : gpr_mu_unlock(&s->mu);
186 : } else {
187 2 : gpr_mu_unlock(&s->mu);
188 2 : finish_shutdown(exec_ctx, s);
189 : }
190 : }
191 :
192 6 : void grpc_udp_server_destroy(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
193 : grpc_closure *on_done) {
194 : size_t i;
195 6 : gpr_mu_lock(&s->mu);
196 :
197 6 : GPR_ASSERT(!s->shutdown);
198 6 : s->shutdown = 1;
199 :
200 6 : s->shutdown_complete = on_done;
201 :
202 : /* shutdown all fd's */
203 6 : if (s->active_ports) {
204 6 : for (i = 0; i < s->nports; i++) {
205 3 : grpc_fd_shutdown(exec_ctx, s->ports[i].emfd);
206 : }
207 3 : gpr_mu_unlock(&s->mu);
208 : } else {
209 3 : gpr_mu_unlock(&s->mu);
210 3 : deactivated_all_ports(exec_ctx, s);
211 : }
212 6 : }
213 :
214 : /* Prepare a recently-created socket for listening. */
215 4 : static int prepare_socket(int fd, const struct sockaddr *addr,
216 : size_t addr_len) {
217 : struct sockaddr_storage sockname_temp;
218 : socklen_t sockname_len;
219 : int get_local_ip;
220 : int rc;
221 :
222 4 : if (fd < 0) {
223 0 : goto error;
224 : }
225 :
226 4 : if (!grpc_set_socket_nonblocking(fd, 1) || !grpc_set_socket_cloexec(fd, 1)) {
227 0 : gpr_log(GPR_ERROR, "Unable to configure socket %d: %s", fd,
228 0 : strerror(errno));
229 : }
230 :
231 4 : get_local_ip = 1;
232 4 : rc = setsockopt(fd, IPPROTO_IP, IP_PKTINFO, &get_local_ip,
233 : sizeof(get_local_ip));
234 4 : if (rc == 0 && addr->sa_family == AF_INET6) {
235 : #if !defined(__APPLE__)
236 4 : rc = setsockopt(fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &get_local_ip,
237 : sizeof(get_local_ip));
238 : #endif
239 : }
240 :
241 4 : GPR_ASSERT(addr_len < ~(socklen_t)0);
242 4 : if (bind(fd, addr, (socklen_t)addr_len) < 0) {
243 : char *addr_str;
244 0 : grpc_sockaddr_to_string(&addr_str, addr, 0);
245 0 : gpr_log(GPR_ERROR, "bind addr=%s: %s", addr_str, strerror(errno));
246 0 : gpr_free(addr_str);
247 0 : goto error;
248 : }
249 :
250 4 : sockname_len = sizeof(sockname_temp);
251 4 : if (getsockname(fd, (struct sockaddr *)&sockname_temp, &sockname_len) < 0) {
252 0 : goto error;
253 : }
254 :
255 4 : return grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
256 :
257 : error:
258 0 : if (fd >= 0) {
259 0 : close(fd);
260 : }
261 0 : return -1;
262 : }
263 :
264 : /* event manager callback when reads are ready */
265 14 : static void on_read(grpc_exec_ctx *exec_ctx, void *arg, int success) {
266 14 : server_port *sp = arg;
267 :
268 14 : if (success == 0) {
269 3 : gpr_mu_lock(&sp->server->mu);
270 3 : if (0 == --sp->server->active_ports) {
271 3 : gpr_mu_unlock(&sp->server->mu);
272 3 : deactivated_all_ports(exec_ctx, sp->server);
273 : } else {
274 0 : gpr_mu_unlock(&sp->server->mu);
275 : }
276 17 : return;
277 : }
278 :
279 : /* Tell the registered callback that data is available to read. */
280 11 : GPR_ASSERT(sp->read_cb);
281 11 : sp->read_cb(exec_ctx, sp->emfd, sp->server->grpc_server);
282 :
283 : /* Re-arm the notification event so we get another chance to read. */
284 11 : grpc_fd_notify_on_read(exec_ctx, sp->emfd, &sp->read_closure);
285 : }
286 :
287 4 : static int add_socket_to_server(grpc_udp_server *s, int fd,
288 : const struct sockaddr *addr, size_t addr_len,
289 : grpc_udp_server_read_cb read_cb) {
290 : server_port *sp;
291 : int port;
292 : char *addr_str;
293 : char *name;
294 :
295 4 : port = prepare_socket(fd, addr, addr_len);
296 4 : if (port >= 0) {
297 4 : grpc_sockaddr_to_string(&addr_str, (struct sockaddr *)&addr, 1);
298 4 : gpr_asprintf(&name, "udp-server-listener:%s", addr_str);
299 4 : gpr_free(addr_str);
300 4 : gpr_mu_lock(&s->mu);
301 : /* append it to the list under a lock */
302 4 : if (s->nports == s->port_capacity) {
303 0 : s->port_capacity *= 2;
304 0 : s->ports = gpr_realloc(s->ports, sizeof(server_port) * s->port_capacity);
305 : }
306 4 : sp = &s->ports[s->nports++];
307 4 : sp->server = s;
308 4 : sp->fd = fd;
309 4 : sp->emfd = grpc_fd_create(fd, name);
310 4 : memcpy(sp->addr.untyped, addr, addr_len);
311 4 : sp->addr_len = addr_len;
312 4 : sp->read_cb = read_cb;
313 4 : GPR_ASSERT(sp->emfd);
314 4 : gpr_mu_unlock(&s->mu);
315 4 : gpr_free(name);
316 : }
317 :
318 4 : return port;
319 : }
320 :
321 4 : int grpc_udp_server_add_port(grpc_udp_server *s, const void *addr,
322 : size_t addr_len, grpc_udp_server_read_cb read_cb) {
323 4 : int allocated_port1 = -1;
324 4 : int allocated_port2 = -1;
325 : unsigned i;
326 : int fd;
327 : grpc_dualstack_mode dsmode;
328 : struct sockaddr_in6 addr6_v4mapped;
329 : struct sockaddr_in wild4;
330 : struct sockaddr_in6 wild6;
331 : struct sockaddr_in addr4_copy;
332 4 : struct sockaddr *allocated_addr = NULL;
333 : struct sockaddr_storage sockname_temp;
334 : socklen_t sockname_len;
335 : int port;
336 :
337 4 : if (((struct sockaddr *)addr)->sa_family == AF_UNIX) {
338 0 : unlink_if_unix_domain_socket(addr);
339 : }
340 :
341 : /* Check if this is a wildcard port, and if so, try to keep the port the same
342 : as some previously created listener. */
343 4 : if (grpc_sockaddr_get_port(addr) == 0) {
344 4 : for (i = 0; i < s->nports; i++) {
345 0 : sockname_len = sizeof(sockname_temp);
346 0 : if (0 == getsockname(s->ports[i].fd, (struct sockaddr *)&sockname_temp,
347 : &sockname_len)) {
348 0 : port = grpc_sockaddr_get_port((struct sockaddr *)&sockname_temp);
349 0 : if (port > 0) {
350 0 : allocated_addr = malloc(addr_len);
351 0 : memcpy(allocated_addr, addr, addr_len);
352 0 : grpc_sockaddr_set_port(allocated_addr, port);
353 0 : addr = allocated_addr;
354 0 : break;
355 : }
356 : }
357 : }
358 : }
359 :
360 4 : if (grpc_sockaddr_to_v4mapped(addr, &addr6_v4mapped)) {
361 4 : addr = (const struct sockaddr *)&addr6_v4mapped;
362 4 : addr_len = sizeof(addr6_v4mapped);
363 : }
364 :
365 : /* Treat :: or 0.0.0.0 as a family-agnostic wildcard. */
366 4 : if (grpc_sockaddr_is_wildcard(addr, &port)) {
367 4 : grpc_sockaddr_make_wildcards(port, &wild4, &wild6);
368 :
369 : /* Try listening on IPv6 first. */
370 4 : addr = (struct sockaddr *)&wild6;
371 4 : addr_len = sizeof(wild6);
372 4 : fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
373 4 : allocated_port1 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
374 4 : if (fd >= 0 && dsmode == GRPC_DSMODE_DUALSTACK) {
375 4 : goto done;
376 : }
377 :
378 : /* If we didn't get a dualstack socket, also listen on 0.0.0.0. */
379 0 : if (port == 0 && allocated_port1 > 0) {
380 0 : grpc_sockaddr_set_port((struct sockaddr *)&wild4, allocated_port1);
381 : }
382 0 : addr = (struct sockaddr *)&wild4;
383 0 : addr_len = sizeof(wild4);
384 : }
385 :
386 0 : fd = grpc_create_dualstack_socket(addr, SOCK_DGRAM, IPPROTO_UDP, &dsmode);
387 0 : if (fd < 0) {
388 0 : gpr_log(GPR_ERROR, "Unable to create socket: %s", strerror(errno));
389 : }
390 0 : if (dsmode == GRPC_DSMODE_IPV4 &&
391 0 : grpc_sockaddr_is_v4mapped(addr, &addr4_copy)) {
392 0 : addr = (struct sockaddr *)&addr4_copy;
393 0 : addr_len = sizeof(addr4_copy);
394 : }
395 0 : allocated_port2 = add_socket_to_server(s, fd, addr, addr_len, read_cb);
396 :
397 : done:
398 4 : gpr_free(allocated_addr);
399 4 : return allocated_port1 >= 0 ? allocated_port1 : allocated_port2;
400 : }
401 :
402 2 : int grpc_udp_server_get_fd(grpc_udp_server *s, unsigned port_index) {
403 2 : return (port_index < s->nports) ? s->ports[port_index].fd : -1;
404 : }
405 :
406 4 : void grpc_udp_server_start(grpc_exec_ctx *exec_ctx, grpc_udp_server *s,
407 : grpc_pollset **pollsets, size_t pollset_count,
408 : grpc_server *server) {
409 : size_t i, j;
410 4 : gpr_mu_lock(&s->mu);
411 4 : GPR_ASSERT(s->active_ports == 0);
412 4 : s->pollsets = pollsets;
413 4 : s->grpc_server = server;
414 7 : for (i = 0; i < s->nports; i++) {
415 5 : for (j = 0; j < pollset_count; j++) {
416 2 : grpc_pollset_add_fd(exec_ctx, pollsets[j], s->ports[i].emfd);
417 : }
418 3 : s->ports[i].read_closure.cb = on_read;
419 3 : s->ports[i].read_closure.cb_arg = &s->ports[i];
420 3 : grpc_fd_notify_on_read(exec_ctx, s->ports[i].emfd,
421 3 : &s->ports[i].read_closure);
422 3 : s->active_ports++;
423 : }
424 4 : gpr_mu_unlock(&s->mu);
425 4 : }
426 :
427 : /* TODO(rjshade): Add a test for this method. */
428 0 : void grpc_udp_server_write(server_port *sp, const char *buffer, size_t buf_len,
429 : const struct sockaddr *peer_address) {
430 : ssize_t rc;
431 0 : rc = sendto(sp->fd, buffer, buf_len, 0, peer_address, sizeof(peer_address));
432 0 : if (rc < 0) {
433 0 : gpr_log(GPR_ERROR, "Unable to send data: %s", strerror(errno));
434 : }
435 0 : }
436 :
437 : #endif
|