Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/surface/completion_queue.h"
35 :
36 : #include "src/core/iomgr/iomgr.h"
37 : #include <grpc/support/alloc.h>
38 : #include <grpc/support/log.h>
39 : #include <grpc/support/thd.h>
40 : #include <grpc/support/time.h>
41 : #include <grpc/support/useful.h>
42 : #include "test/core/util/test_config.h"
43 :
44 : #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
45 :
46 129 : static void *create_test_tag(void) {
47 : static gpr_intptr i = 0;
48 129 : return (void *)(++i);
49 : }
50 :
51 : /* helper for tests to shutdown correctly and tersely */
52 4 : static void shutdown_and_destroy(grpc_completion_queue *cc) {
53 : grpc_event ev;
54 4 : grpc_completion_queue_shutdown(cc);
55 4 : ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
56 4 : GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
57 4 : grpc_completion_queue_destroy(cc);
58 4 : }
59 :
60 : /* ensure we can create and destroy a completion channel */
61 1 : static void test_no_op(void) {
62 1 : LOG_TEST("test_no_op");
63 1 : shutdown_and_destroy(grpc_completion_queue_create(NULL));
64 1 : }
65 :
66 1 : static void test_wait_empty(void) {
67 : grpc_completion_queue *cc;
68 : grpc_event event;
69 :
70 1 : LOG_TEST("test_wait_empty");
71 :
72 1 : cc = grpc_completion_queue_create(NULL);
73 1 : event = grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), NULL);
74 1 : GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
75 1 : shutdown_and_destroy(cc);
76 1 : }
77 :
78 257 : static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg,
79 257 : grpc_cq_completion *c) {}
80 :
81 1 : static void test_cq_end_op(void) {
82 : grpc_event ev;
83 : grpc_completion_queue *cc;
84 : grpc_cq_completion completion;
85 1 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
86 1 : void *tag = create_test_tag();
87 :
88 1 : LOG_TEST("test_cq_end_op");
89 :
90 1 : cc = grpc_completion_queue_create(NULL);
91 :
92 1 : grpc_cq_begin_op(cc);
93 1 : grpc_cq_end_op(&exec_ctx, cc, tag, 1, do_nothing_end_completion, NULL,
94 : &completion);
95 :
96 1 : ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
97 1 : GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
98 1 : GPR_ASSERT(ev.tag == tag);
99 1 : GPR_ASSERT(ev.success);
100 :
101 1 : shutdown_and_destroy(cc);
102 1 : grpc_exec_ctx_finish(&exec_ctx);
103 1 : }
104 :
105 1 : static void test_shutdown_then_next_polling(void) {
106 : grpc_completion_queue *cc;
107 : grpc_event event;
108 1 : LOG_TEST("test_shutdown_then_next_polling");
109 :
110 1 : cc = grpc_completion_queue_create(NULL);
111 1 : grpc_completion_queue_shutdown(cc);
112 1 : event =
113 1 : grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
114 1 : GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
115 1 : grpc_completion_queue_destroy(cc);
116 1 : }
117 :
118 1 : static void test_shutdown_then_next_with_timeout(void) {
119 : grpc_completion_queue *cc;
120 : grpc_event event;
121 1 : LOG_TEST("test_shutdown_then_next_with_timeout");
122 :
123 1 : cc = grpc_completion_queue_create(NULL);
124 1 : grpc_completion_queue_shutdown(cc);
125 1 : event =
126 1 : grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
127 1 : GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
128 1 : grpc_completion_queue_destroy(cc);
129 1 : }
130 :
131 1 : static void test_pluck(void) {
132 : grpc_event ev;
133 : grpc_completion_queue *cc;
134 : void *tags[128];
135 : grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
136 1 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
137 : unsigned i, j;
138 :
139 1 : LOG_TEST("test_pluck");
140 :
141 129 : for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
142 128 : tags[i] = create_test_tag();
143 8256 : for (j = 0; j < i; j++) {
144 8128 : GPR_ASSERT(tags[i] != tags[j]);
145 : }
146 : }
147 :
148 1 : cc = grpc_completion_queue_create(NULL);
149 :
150 129 : for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
151 128 : grpc_cq_begin_op(cc);
152 128 : grpc_cq_end_op(&exec_ctx, cc, tags[i], 1, do_nothing_end_completion, NULL,
153 : &completions[i]);
154 : }
155 :
156 129 : for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
157 128 : ev = grpc_completion_queue_pluck(cc, tags[i],
158 : gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
159 128 : GPR_ASSERT(ev.tag == tags[i]);
160 : }
161 :
162 129 : for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
163 128 : grpc_cq_begin_op(cc);
164 128 : grpc_cq_end_op(&exec_ctx, cc, tags[i], 1, do_nothing_end_completion, NULL,
165 : &completions[i]);
166 : }
167 :
168 129 : for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
169 128 : ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
170 : gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
171 128 : GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
172 : }
173 :
174 1 : shutdown_and_destroy(cc);
175 1 : grpc_exec_ctx_finish(&exec_ctx);
176 1 : }
177 :
178 : #define TEST_THREAD_EVENTS 10000
179 :
180 : typedef struct test_thread_options {
181 : gpr_event on_started;
182 : gpr_event *phase1;
183 : gpr_event on_phase1_done;
184 : gpr_event *phase2;
185 : gpr_event on_finished;
186 : size_t events_triggered;
187 : int id;
188 : grpc_completion_queue *cc;
189 : } test_thread_options;
190 :
191 220242 : gpr_timespec ten_seconds_time(void) {
192 220242 : return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
193 : }
194 :
195 220000 : static void free_completion(grpc_exec_ctx *exec_ctx, void *arg,
196 : grpc_cq_completion *completion) {
197 220000 : gpr_free(completion);
198 220000 : }
199 :
200 22 : static void producer_thread(void *arg) {
201 22 : test_thread_options *opt = arg;
202 : int i;
203 22 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
204 :
205 22 : gpr_log(GPR_INFO, "producer %d started", opt->id);
206 22 : gpr_event_set(&opt->on_started, (void *)(gpr_intptr)1);
207 22 : GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
208 :
209 22 : gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
210 220022 : for (i = 0; i < TEST_THREAD_EVENTS; i++) {
211 220000 : grpc_cq_begin_op(opt->cc);
212 : }
213 :
214 22 : gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
215 22 : gpr_event_set(&opt->on_phase1_done, (void *)(gpr_intptr)1);
216 22 : GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
217 :
218 22 : gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
219 220022 : for (i = 0; i < TEST_THREAD_EVENTS; i++) {
220 220000 : grpc_cq_end_op(&exec_ctx, opt->cc, (void *)(gpr_intptr)1, 1,
221 : free_completion, NULL,
222 220000 : gpr_malloc(sizeof(grpc_cq_completion)));
223 220000 : opt->events_triggered++;
224 220000 : grpc_exec_ctx_finish(&exec_ctx);
225 : }
226 :
227 22 : gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id);
228 22 : gpr_event_set(&opt->on_finished, (void *)(gpr_intptr)1);
229 22 : grpc_exec_ctx_finish(&exec_ctx);
230 22 : }
231 :
232 22 : static void consumer_thread(void *arg) {
233 22 : test_thread_options *opt = arg;
234 : grpc_event ev;
235 :
236 22 : gpr_log(GPR_INFO, "consumer %d started", opt->id);
237 22 : gpr_event_set(&opt->on_started, (void *)(gpr_intptr)1);
238 22 : GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
239 :
240 22 : gpr_log(GPR_INFO, "consumer %d phase 1", opt->id);
241 :
242 22 : gpr_log(GPR_INFO, "consumer %d phase 1 done", opt->id);
243 22 : gpr_event_set(&opt->on_phase1_done, (void *)(gpr_intptr)1);
244 22 : GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
245 :
246 22 : gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
247 : for (;;) {
248 220022 : ev = grpc_completion_queue_next(opt->cc, ten_seconds_time(), NULL);
249 220022 : switch (ev.type) {
250 : case GRPC_OP_COMPLETE:
251 220000 : GPR_ASSERT(ev.success);
252 220000 : opt->events_triggered++;
253 220000 : break;
254 : case GRPC_QUEUE_SHUTDOWN:
255 22 : gpr_log(GPR_INFO, "consumer %d phase 2 done", opt->id);
256 22 : gpr_event_set(&opt->on_finished, (void *)(gpr_intptr)1);
257 22 : return;
258 : case GRPC_QUEUE_TIMEOUT:
259 0 : gpr_log(GPR_ERROR, "Invalid timeout received");
260 0 : abort();
261 : }
262 220000 : }
263 : }
264 :
265 4 : static void test_threading(size_t producers, size_t consumers) {
266 4 : test_thread_options *options =
267 4 : gpr_malloc((producers + consumers) * sizeof(test_thread_options));
268 4 : gpr_event phase1 = GPR_EVENT_INIT;
269 4 : gpr_event phase2 = GPR_EVENT_INIT;
270 4 : grpc_completion_queue *cc = grpc_completion_queue_create(NULL);
271 : size_t i;
272 4 : size_t total_consumed = 0;
273 : static int optid = 101;
274 :
275 4 : gpr_log(GPR_INFO, "%s: %d producers, %d consumers", "test_threading",
276 : producers, consumers);
277 :
278 : /* start all threads: they will wait for phase1 */
279 48 : for (i = 0; i < producers + consumers; i++) {
280 : gpr_thd_id id;
281 44 : gpr_event_init(&options[i].on_started);
282 44 : gpr_event_init(&options[i].on_phase1_done);
283 44 : gpr_event_init(&options[i].on_finished);
284 44 : options[i].phase1 = &phase1;
285 44 : options[i].phase2 = &phase2;
286 44 : options[i].events_triggered = 0;
287 44 : options[i].cc = cc;
288 44 : options[i].id = optid++;
289 44 : GPR_ASSERT(gpr_thd_new(&id,
290 : i < producers ? producer_thread : consumer_thread,
291 : options + i, NULL));
292 44 : gpr_event_wait(&options[i].on_started, ten_seconds_time());
293 : }
294 :
295 : /* start phase1: producers will pre-declare all operations they will
296 : complete */
297 4 : gpr_log(GPR_INFO, "start phase 1");
298 4 : gpr_event_set(&phase1, (void *)(gpr_intptr)1);
299 :
300 4 : gpr_log(GPR_INFO, "wait phase 1");
301 48 : for (i = 0; i < producers + consumers; i++) {
302 44 : GPR_ASSERT(gpr_event_wait(&options[i].on_phase1_done, ten_seconds_time()));
303 : }
304 4 : gpr_log(GPR_INFO, "done phase 1");
305 :
306 : /* start phase2: operations will complete, and consumers will consume them */
307 4 : gpr_log(GPR_INFO, "start phase 2");
308 4 : gpr_event_set(&phase2, (void *)(gpr_intptr)1);
309 :
310 : /* in parallel, we shutdown the completion channel - all events should still
311 : be consumed */
312 4 : grpc_completion_queue_shutdown(cc);
313 :
314 : /* join all threads */
315 4 : gpr_log(GPR_INFO, "wait phase 2");
316 48 : for (i = 0; i < producers + consumers; i++) {
317 44 : GPR_ASSERT(gpr_event_wait(&options[i].on_finished, ten_seconds_time()));
318 : }
319 4 : gpr_log(GPR_INFO, "done phase 2");
320 :
321 : /* destroy the completion channel */
322 4 : grpc_completion_queue_destroy(cc);
323 :
324 : /* verify that everything was produced and consumed */
325 48 : for (i = 0; i < producers + consumers; i++) {
326 44 : if (i < producers) {
327 22 : GPR_ASSERT(options[i].events_triggered == TEST_THREAD_EVENTS);
328 : } else {
329 22 : total_consumed += options[i].events_triggered;
330 : }
331 : }
332 4 : GPR_ASSERT(total_consumed == producers * TEST_THREAD_EVENTS);
333 :
334 4 : gpr_free(options);
335 4 : }
336 :
337 1 : int main(int argc, char **argv) {
338 1 : grpc_test_init(argc, argv);
339 1 : grpc_init();
340 1 : test_no_op();
341 1 : test_wait_empty();
342 1 : test_shutdown_then_next_polling();
343 1 : test_shutdown_then_next_with_timeout();
344 1 : test_cq_end_op();
345 1 : test_pluck();
346 1 : test_threading(1, 1);
347 1 : test_threading(1, 10);
348 1 : test_threading(10, 1);
349 1 : test_threading(10, 10);
350 1 : grpc_shutdown();
351 1 : return 0;
352 : }
|