Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <grpc/support/port_platform.h>
35 :
36 : #ifdef GPR_POSIX_SOCKET
37 :
38 : #include "src/core/iomgr/pollset_posix.h"
39 :
40 : #include <errno.h>
41 : #include <poll.h>
42 : #include <stdlib.h>
43 : #include <string.h>
44 :
45 : #include "src/core/iomgr/fd_posix.h"
46 : #include "src/core/iomgr/iomgr_internal.h"
47 : #include "src/core/support/block_annotate.h"
48 : #include <grpc/support/alloc.h>
49 : #include <grpc/support/log.h>
50 : #include <grpc/support/useful.h>
51 :
52 : typedef struct {
53 : /* all polled fds */
54 : size_t fd_count;
55 : size_t fd_capacity;
56 : grpc_fd **fds;
57 : /* fds that have been removed from the pollset explicitly */
58 : size_t del_count;
59 : size_t del_capacity;
60 : grpc_fd **dels;
61 : } pollset_hdr;
62 :
63 1986 : static void multipoll_with_poll_pollset_add_fd(grpc_exec_ctx *exec_ctx,
64 : grpc_pollset *pollset,
65 : grpc_fd *fd,
66 : int and_unlock_pollset) {
67 : size_t i;
68 1986 : pollset_hdr *h = pollset->data.ptr;
69 : /* TODO(ctiller): this is O(num_fds^2); maybe switch to a hash set here */
70 5484 : for (i = 0; i < h->fd_count; i++) {
71 5074 : if (h->fds[i] == fd) goto exit;
72 : }
73 410 : if (h->fd_count == h->fd_capacity) {
74 292 : h->fd_capacity = GPR_MAX(h->fd_capacity + 8, h->fd_count * 3 / 2);
75 292 : h->fds = gpr_realloc(h->fds, sizeof(grpc_fd *) * h->fd_capacity);
76 : }
77 410 : h->fds[h->fd_count++] = fd;
78 410 : GRPC_FD_REF(fd, "multipoller");
79 : exit:
80 1986 : if (and_unlock_pollset) {
81 1741 : gpr_mu_unlock(&pollset->mu);
82 : }
83 1986 : }
84 :
85 0 : static void multipoll_with_poll_pollset_del_fd(grpc_exec_ctx *exec_ctx,
86 : grpc_pollset *pollset,
87 : grpc_fd *fd,
88 : int and_unlock_pollset) {
89 : /* will get removed next poll cycle */
90 0 : pollset_hdr *h = pollset->data.ptr;
91 0 : if (h->del_count == h->del_capacity) {
92 0 : h->del_capacity = GPR_MAX(h->del_capacity + 8, h->del_count * 3 / 2);
93 0 : h->dels = gpr_realloc(h->dels, sizeof(grpc_fd *) * h->del_capacity);
94 : }
95 0 : h->dels[h->del_count++] = fd;
96 0 : GRPC_FD_REF(fd, "multipoller_del");
97 0 : if (and_unlock_pollset) {
98 0 : gpr_mu_unlock(&pollset->mu);
99 : }
100 0 : }
101 :
102 2465 : static void multipoll_with_poll_pollset_maybe_work_and_unlock(
103 : grpc_exec_ctx *exec_ctx, grpc_pollset *pollset, grpc_pollset_worker *worker,
104 : gpr_timespec deadline, gpr_timespec now) {
105 : #define POLLOUT_CHECK (POLLOUT | POLLHUP | POLLERR)
106 : #define POLLIN_CHECK (POLLIN | POLLHUP | POLLERR)
107 :
108 : int timeout;
109 : int r;
110 : size_t i, j, fd_count;
111 : nfds_t pfd_count;
112 : pollset_hdr *h;
113 : /* TODO(ctiller): inline some elements to avoid an allocation */
114 : grpc_fd_watcher *watchers;
115 : struct pollfd *pfds;
116 :
117 2465 : h = pollset->data.ptr;
118 2465 : timeout = grpc_poll_deadline_to_millis_timeout(deadline, now);
119 : /* TODO(ctiller): perform just one malloc here if we exceed the inline case */
120 2465 : pfds = gpr_malloc(sizeof(*pfds) * (h->fd_count + 2));
121 2465 : watchers = gpr_malloc(sizeof(*watchers) * (h->fd_count + 2));
122 2465 : fd_count = 0;
123 2465 : pfd_count = 2;
124 2465 : pfds[0].fd = GRPC_WAKEUP_FD_GET_READ_FD(&grpc_global_wakeup_fd);
125 2465 : pfds[0].events = POLLIN;
126 2465 : pfds[0].revents = 0;
127 2465 : pfds[1].fd = GRPC_WAKEUP_FD_GET_READ_FD(&worker->wakeup_fd);
128 2465 : pfds[1].events = POLLIN;
129 2465 : pfds[1].revents = 0;
130 9809 : for (i = 0; i < h->fd_count; i++) {
131 7344 : int remove = grpc_fd_is_orphaned(h->fds[i]);
132 7344 : for (j = 0; !remove && j < h->del_count; j++) {
133 0 : if (h->fds[i] == h->dels[j]) remove = 1;
134 : }
135 7344 : if (remove) {
136 148 : GRPC_FD_UNREF(h->fds[i], "multipoller");
137 : } else {
138 7196 : h->fds[fd_count++] = h->fds[i];
139 7196 : watchers[pfd_count].fd = h->fds[i];
140 7196 : pfds[pfd_count].fd = h->fds[i]->fd;
141 7196 : pfds[pfd_count].revents = 0;
142 7196 : pfd_count++;
143 : }
144 : }
145 2465 : for (j = 0; j < h->del_count; j++) {
146 0 : GRPC_FD_UNREF(h->dels[j], "multipoller_del");
147 : }
148 2465 : h->del_count = 0;
149 2465 : h->fd_count = fd_count;
150 2465 : gpr_mu_unlock(&pollset->mu);
151 :
152 9661 : for (i = 2; i < pfd_count; i++) {
153 14392 : pfds[i].events = (short)grpc_fd_begin_poll(watchers[i].fd, pollset, worker,
154 7196 : POLLIN, POLLOUT, &watchers[i]);
155 : }
156 :
157 : /* TODO(vpai): Consider first doing a 0 timeout poll here to avoid
158 : even going into the blocking annotation if possible */
159 : GRPC_SCHEDULING_START_BLOCKING_REGION;
160 2465 : r = grpc_poll_function(pfds, pfd_count, timeout);
161 : GRPC_SCHEDULING_END_BLOCKING_REGION;
162 :
163 2465 : if (r < 0) {
164 0 : gpr_log(GPR_ERROR, "poll() failed: %s", strerror(errno));
165 0 : for (i = 2; i < pfd_count; i++) {
166 0 : grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0);
167 : }
168 2465 : } else if (r == 0) {
169 831 : for (i = 2; i < pfd_count; i++) {
170 606 : grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0);
171 : }
172 : } else {
173 2240 : if (pfds[0].revents & POLLIN_CHECK) {
174 4 : grpc_wakeup_fd_consume_wakeup(&grpc_global_wakeup_fd);
175 : }
176 2240 : if (pfds[1].revents & POLLIN_CHECK) {
177 0 : grpc_wakeup_fd_consume_wakeup(&worker->wakeup_fd);
178 : }
179 8830 : for (i = 2; i < pfd_count; i++) {
180 6590 : if (watchers[i].fd == NULL) {
181 0 : grpc_fd_end_poll(exec_ctx, &watchers[i], 0, 0);
182 0 : continue;
183 : }
184 6590 : grpc_fd_end_poll(exec_ctx, &watchers[i], pfds[i].revents & POLLIN_CHECK,
185 6590 : pfds[i].revents & POLLOUT_CHECK);
186 : }
187 : }
188 :
189 2465 : gpr_free(pfds);
190 2465 : gpr_free(watchers);
191 2465 : }
192 :
193 604 : static void multipoll_with_poll_pollset_finish_shutdown(grpc_pollset *pollset) {
194 : size_t i;
195 604 : pollset_hdr *h = pollset->data.ptr;
196 1470 : for (i = 0; i < h->fd_count; i++) {
197 866 : GRPC_FD_UNREF(h->fds[i], "multipoller");
198 : }
199 604 : for (i = 0; i < h->del_count; i++) {
200 0 : GRPC_FD_UNREF(h->dels[i], "multipoller_del");
201 : }
202 604 : h->fd_count = 0;
203 604 : h->del_count = 0;
204 604 : }
205 :
206 302 : static void multipoll_with_poll_pollset_destroy(grpc_pollset *pollset) {
207 302 : pollset_hdr *h = pollset->data.ptr;
208 302 : multipoll_with_poll_pollset_finish_shutdown(pollset);
209 302 : gpr_free(h->fds);
210 302 : gpr_free(h->dels);
211 302 : gpr_free(h);
212 302 : }
213 :
214 : static const grpc_pollset_vtable multipoll_with_poll_pollset = {
215 : multipoll_with_poll_pollset_add_fd, multipoll_with_poll_pollset_del_fd,
216 : multipoll_with_poll_pollset_maybe_work_and_unlock,
217 : multipoll_with_poll_pollset_finish_shutdown,
218 : multipoll_with_poll_pollset_destroy};
219 :
220 302 : void grpc_poll_become_multipoller(grpc_exec_ctx *exec_ctx,
221 : grpc_pollset *pollset, grpc_fd **fds,
222 : size_t nfds) {
223 : size_t i;
224 302 : pollset_hdr *h = gpr_malloc(sizeof(pollset_hdr));
225 302 : pollset->vtable = &multipoll_with_poll_pollset;
226 302 : pollset->data.ptr = h;
227 302 : h->fd_count = nfds;
228 302 : h->fd_capacity = nfds;
229 302 : h->fds = gpr_malloc(nfds * sizeof(grpc_fd *));
230 302 : h->del_count = 0;
231 302 : h->del_capacity = 0;
232 302 : h->dels = NULL;
233 906 : for (i = 0; i < nfds; i++) {
234 604 : h->fds[i] = fds[i];
235 604 : GRPC_FD_REF(fds[i], "multipoller");
236 : }
237 302 : }
238 :
239 : #endif /* GPR_POSIX_SOCKET */
240 :
241 : #ifdef GPR_POSIX_MULTIPOLL_WITH_POLL
242 : grpc_platform_become_multipoller_type grpc_platform_become_multipoller =
243 : grpc_poll_become_multipoller;
244 : #endif
|