Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : /* Test of gpr synchronization support. */
35 :
36 : #include <stdio.h>
37 : #include <stdlib.h>
38 : #include <grpc/support/alloc.h>
39 : #include <grpc/support/log.h>
40 : #include <grpc/support/sync.h>
41 : #include <grpc/support/thd.h>
42 : #include <grpc/support/time.h>
43 : #include "test/core/util/test_config.h"
44 :
45 : /* ==================Example use of interface===================
46 :
47 : A producer-consumer queue of up to N integers,
48 : illustrating the use of the calls in this interface. */
49 :
50 : #define N 4
51 :
52 : typedef struct queue {
53 : gpr_cv non_empty; /* Signalled when length becomes non-zero. */
54 : gpr_cv non_full; /* Signalled when length becomes non-N. */
55 : gpr_mu mu; /* Protects all fields below.
56 : (That is, except during initialization or
57 : destruction, the fields below should be accessed
58 : only by a thread that holds mu.) */
59 : int head; /* Index of head of queue 0..N-1. */
60 : int length; /* Number of valid elements in queue 0..N. */
61 : int elem[N]; /* elem[head .. head+length-1] are queue elements. */
62 : } queue;
63 :
64 : /* Initialize *q. */
65 50 : void queue_init(queue *q) {
66 50 : gpr_mu_init(&q->mu);
67 50 : gpr_cv_init(&q->non_empty);
68 50 : gpr_cv_init(&q->non_full);
69 50 : q->head = 0;
70 50 : q->length = 0;
71 50 : }
72 :
73 : /* Free storage associated with *q. */
74 50 : void queue_destroy(queue *q) {
75 50 : gpr_mu_destroy(&q->mu);
76 50 : gpr_cv_destroy(&q->non_empty);
77 50 : gpr_cv_destroy(&q->non_full);
78 50 : }
79 :
80 : /* Wait until there is room in *q, then append x to *q. */
81 645073 : void queue_append(queue *q, int x) {
82 645073 : gpr_mu_lock(&q->mu);
83 : /* To wait for a predicate without a deadline, loop on the negation of the
84 : predicate, and use gpr_cv_wait(..., gpr_inf_future(GPR_CLOCK_REALTIME))
85 : inside the loop
86 : to release the lock, wait, and reacquire on each iteration. Code that
87 : makes the condition true should use gpr_cv_broadcast() on the
88 : corresponding condition variable. The predicate must be on state
89 : protected by the lock. */
90 2975353 : while (q->length == N) {
91 1685113 : gpr_cv_wait(&q->non_full, &q->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
92 : }
93 645120 : if (q->length == 0) { /* Wake threads blocked in queue_remove(). */
94 : /* It's normal to use gpr_cv_broadcast() or gpr_signal() while
95 : holding the lock. */
96 104471 : gpr_cv_broadcast(&q->non_empty);
97 : }
98 645120 : q->elem[(q->head + q->length) % N] = x;
99 645120 : q->length++;
100 645120 : gpr_mu_unlock(&q->mu);
101 645092 : }
102 :
103 : /* If it can be done without blocking, append x to *q and return non-zero.
104 : Otherwise return 0. */
105 150853538 : int queue_try_append(queue *q, int x) {
106 150853538 : int result = 0;
107 150853538 : if (gpr_mu_trylock(&q->mu)) {
108 51986552 : if (q->length != N) {
109 645120 : if (q->length == 0) { /* Wake threads blocked in queue_remove(). */
110 31564 : gpr_cv_broadcast(&q->non_empty);
111 : }
112 645120 : q->elem[(q->head + q->length) % N] = x;
113 645120 : q->length++;
114 645120 : result = 1;
115 : }
116 51986552 : gpr_mu_unlock(&q->mu);
117 : }
118 149670282 : return result;
119 : }
120 :
121 : /* Wait until the *q is non-empty or deadline abs_deadline passes. If the
122 : queue is non-empty, remove its head entry, place it in *head, and return
123 : non-zero. Otherwise return 0. */
124 1290246 : int queue_remove(queue *q, int *head, gpr_timespec abs_deadline) {
125 1290246 : int result = 0;
126 1290246 : gpr_mu_lock(&q->mu);
127 : /* To wait for a predicate with a deadline, loop on the negation of the
128 : predicate or until gpr_cv_wait() returns true. Code that makes
129 : the condition true should use gpr_cv_broadcast() on the corresponding
130 : condition variable. The predicate must be on state protected by the
131 : lock. */
132 1290246 : while (q->length == 0 && !gpr_cv_wait(&q->non_empty, &q->mu, abs_deadline)) {
133 : }
134 1290246 : if (q->length != 0) { /* Queue is non-empty. */
135 1290240 : result = 1;
136 1290240 : if (q->length == N) { /* Wake threads blocked in queue_append(). */
137 778601 : gpr_cv_broadcast(&q->non_full);
138 : }
139 1290240 : *head = q->elem[q->head];
140 1290240 : q->head = (q->head + 1) % N;
141 1290240 : q->length--;
142 : } /* else deadline exceeded */
143 1290246 : gpr_mu_unlock(&q->mu);
144 1290246 : return result;
145 : }
146 :
147 : /* ------------------------------------------------- */
148 : /* Tests for gpr_mu and gpr_cv, and the queue example. */
149 : struct test {
150 : int threads; /* number of threads */
151 :
152 : gpr_int64 iterations; /* number of iterations per thread */
153 : gpr_int64 counter;
154 : int thread_count; /* used to allocate thread ids */
155 : int done; /* threads not yet completed */
156 :
157 : gpr_mu mu; /* protects iterations, counter, thread_count, done */
158 :
159 : gpr_cv cv; /* signalling depends on test */
160 :
161 : gpr_cv done_cv; /* signalled when done == 0 */
162 :
163 : queue q;
164 :
165 : gpr_stats_counter stats_counter;
166 :
167 : gpr_refcount refcount;
168 : gpr_refcount thread_refcount;
169 : gpr_event event;
170 : };
171 :
172 : /* Return pointer to a new struct test. */
173 50 : static struct test *test_new(int threads, gpr_int64 iterations) {
174 50 : struct test *m = gpr_malloc(sizeof(*m));
175 50 : m->threads = threads;
176 50 : m->iterations = iterations;
177 50 : m->counter = 0;
178 50 : m->thread_count = 0;
179 50 : m->done = threads;
180 50 : gpr_mu_init(&m->mu);
181 50 : gpr_cv_init(&m->cv);
182 50 : gpr_cv_init(&m->done_cv);
183 50 : queue_init(&m->q);
184 50 : gpr_stats_init(&m->stats_counter, 0);
185 50 : gpr_ref_init(&m->refcount, 0);
186 50 : gpr_ref_init(&m->thread_refcount, threads);
187 50 : gpr_event_init(&m->event);
188 50 : return m;
189 : }
190 :
191 : /* Return pointer to a new struct test. */
192 50 : static void test_destroy(struct test *m) {
193 50 : gpr_mu_destroy(&m->mu);
194 50 : gpr_cv_destroy(&m->cv);
195 50 : gpr_cv_destroy(&m->done_cv);
196 50 : queue_destroy(&m->q);
197 50 : gpr_free(m);
198 50 : }
199 :
200 : /* Create m->threads threads, each running (*body)(m) */
201 50 : static void test_create_threads(struct test *m, void (*body)(void *arg)) {
202 : gpr_thd_id id;
203 : int i;
204 550 : for (i = 0; i != m->threads; i++) {
205 500 : GPR_ASSERT(gpr_thd_new(&id, body, m, NULL));
206 : }
207 50 : }
208 :
209 : /* Wait until all threads report done. */
210 50 : static void test_wait(struct test *m) {
211 50 : gpr_mu_lock(&m->mu);
212 150 : while (m->done != 0) {
213 50 : gpr_cv_wait(&m->done_cv, &m->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
214 : }
215 50 : gpr_mu_unlock(&m->mu);
216 50 : }
217 :
218 : /* Get an integer thread id in the raneg 0..threads-1 */
219 90 : static int thread_id(struct test *m) {
220 : int id;
221 90 : gpr_mu_lock(&m->mu);
222 90 : id = m->thread_count++;
223 90 : gpr_mu_unlock(&m->mu);
224 90 : return id;
225 : }
226 :
227 : /* Indicate that a thread is done, by decrementing m->done
228 : and signalling done_cv if m->done==0. */
229 517 : static void mark_thread_done(struct test *m) {
230 517 : gpr_mu_lock(&m->mu);
231 517 : GPR_ASSERT(m->done != 0);
232 517 : m->done--;
233 517 : if (m->done == 0) {
234 50 : gpr_cv_signal(&m->done_cv);
235 : }
236 517 : gpr_mu_unlock(&m->mu);
237 517 : }
238 :
239 : /* Test several threads running (*body)(struct test *m) for increasing settings
240 : of m->iterations, until about timeout_s to 2*timeout_s seconds have elapsed.
241 : If extra!=NULL, run (*extra)(m) in an additional thread. */
242 8 : static void test(const char *name, void (*body)(void *m),
243 : void (*extra)(void *m), int timeout_s) {
244 8 : gpr_int64 iterations = 1024;
245 : struct test *m;
246 8 : gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
247 : gpr_timespec time_taken;
248 8 : gpr_timespec deadline = gpr_time_add(
249 8 : start, gpr_time_from_micros(timeout_s * 1000000, GPR_TIMESPAN));
250 8 : fprintf(stderr, "%s:", name);
251 66 : while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0) {
252 50 : iterations <<= 1;
253 50 : fprintf(stderr, " %ld", (long)iterations);
254 50 : m = test_new(10, iterations);
255 50 : if (extra != NULL) {
256 : gpr_thd_id id;
257 17 : GPR_ASSERT(gpr_thd_new(&id, extra, m, NULL));
258 17 : m->done++; /* one more thread to wait for */
259 : }
260 50 : test_create_threads(m, body);
261 50 : test_wait(m);
262 50 : if (m->counter != m->threads * m->iterations) {
263 0 : fprintf(stderr, "counter %ld threads %d iterations %ld\n",
264 : (long)m->counter, m->threads, (long)m->iterations);
265 0 : GPR_ASSERT(0);
266 : }
267 50 : test_destroy(m);
268 : }
269 8 : time_taken = gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), start);
270 8 : fprintf(stderr, " done %ld.%09d s\n", (long)time_taken.tv_sec,
271 : (int)time_taken.tv_nsec);
272 8 : }
273 :
274 : /* Increment m->counter on each iteration; then mark thread as done. */
275 19890 : static void inc(void *v /*=m*/) {
276 19890 : struct test *m = v;
277 : gpr_int64 i;
278 5074320 : for (i = 0; i != m->iterations; i++) {
279 5074240 : gpr_mu_lock(&m->mu);
280 5222400 : m->counter++;
281 5222400 : gpr_mu_unlock(&m->mu);
282 : }
283 80 : mark_thread_done(m);
284 80 : }
285 :
286 : /* Increment m->counter under lock acquired with trylock, m->iterations times;
287 : then mark thread as done. */
288 3993238 : static void inctry(void *v /*=m*/) {
289 3993238 : struct test *m = v;
290 : gpr_int64 i;
291 28757281 : for (i = 0; i != m->iterations;) {
292 24763963 : if (gpr_mu_trylock(&m->mu)) {
293 5222400 : m->counter++;
294 5222400 : gpr_mu_unlock(&m->mu);
295 4872161 : i++;
296 : }
297 : }
298 80 : mark_thread_done(m);
299 80 : }
300 :
301 : /* Increment counter only when (m->counter%m->threads)==m->thread_id; then mark
302 : thread as done. */
303 30 : static void inc_by_turns(void *v /*=m*/) {
304 30 : struct test *m = v;
305 : gpr_int64 i;
306 30 : int id = thread_id(m);
307 143390 : for (i = 0; i != m->iterations; i++) {
308 143360 : gpr_mu_lock(&m->mu);
309 970012 : while ((m->counter % m->threads) != id) {
310 683292 : gpr_cv_wait(&m->cv, &m->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
311 : }
312 143360 : m->counter++;
313 143360 : gpr_cv_broadcast(&m->cv);
314 143360 : gpr_mu_unlock(&m->mu);
315 : }
316 30 : mark_thread_done(m);
317 30 : }
318 :
319 : /* Wait a millisecond and increment counter on each iteration;
320 : then mark thread as done. */
321 10 : static void inc_with_1ms_delay(void *v /*=m*/) {
322 10 : struct test *m = v;
323 : gpr_int64 i;
324 20490 : for (i = 0; i != m->iterations; i++) {
325 : gpr_timespec deadline;
326 20480 : gpr_mu_lock(&m->mu);
327 20480 : deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
328 : gpr_time_from_micros(1000, GPR_TIMESPAN));
329 20480 : while (!gpr_cv_wait(&m->cv, &m->mu, deadline)) {
330 : }
331 20480 : m->counter++;
332 20480 : gpr_mu_unlock(&m->mu);
333 : }
334 10 : mark_thread_done(m);
335 10 : }
336 :
337 : /* Wait a millisecond and increment counter on each iteration, using an event
338 : for timing; then mark thread as done. */
339 10 : static void inc_with_1ms_delay_event(void *v /*=m*/) {
340 10 : struct test *m = v;
341 : gpr_int64 i;
342 20490 : for (i = 0; i != m->iterations; i++) {
343 : gpr_timespec deadline;
344 20480 : deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
345 : gpr_time_from_micros(1000, GPR_TIMESPAN));
346 20478 : GPR_ASSERT(gpr_event_wait(&m->event, deadline) == NULL);
347 20472 : gpr_mu_lock(&m->mu);
348 20480 : m->counter++;
349 20480 : gpr_mu_unlock(&m->mu);
350 : }
351 10 : mark_thread_done(m);
352 10 : }
353 :
354 : /* Produce m->iterations elements on queue m->q, then mark thread as done.
355 : Even threads use queue_append(), and odd threads use queue_try_append()
356 : until it succeeds. */
357 60 : static void many_producers(void *v /*=m*/) {
358 60 : struct test *m = v;
359 : gpr_int64 i;
360 60 : int x = thread_id(m);
361 60 : if ((x & 1) == 0) {
362 645108 : for (i = 0; i != m->iterations; i++) {
363 645081 : queue_append(&m->q, 1);
364 : }
365 : } else {
366 645150 : for (i = 0; i != m->iterations; i++) {
367 645120 : while (!queue_try_append(&m->q, 1)) {
368 : }
369 : }
370 : }
371 57 : mark_thread_done(m);
372 60 : }
373 :
374 : /* Consume elements from m->q until m->threads*m->iterations are seen,
375 : wait an extra second to confirm that no more elements are arriving,
376 : then mark thread as done. */
377 6 : static void consumer(void *v /*=m*/) {
378 6 : struct test *m = v;
379 6 : gpr_int64 n = m->iterations * m->threads;
380 : gpr_int64 i;
381 : int value;
382 1290246 : for (i = 0; i != n; i++) {
383 1290240 : queue_remove(&m->q, &value, gpr_inf_future(GPR_CLOCK_REALTIME));
384 : }
385 6 : gpr_mu_lock(&m->mu);
386 6 : m->counter = n;
387 6 : gpr_mu_unlock(&m->mu);
388 6 : GPR_ASSERT(
389 : !queue_remove(&m->q, &value,
390 : gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
391 : gpr_time_from_micros(1000000, GPR_TIMESPAN))));
392 6 : mark_thread_done(m);
393 6 : }
394 :
395 : /* Increment m->stats_counter m->iterations times, transfer counter value to
396 : m->counter, then mark thread as done. */
397 32635663 : static void statsinc(void *v /*=m*/) {
398 32635663 : struct test *m = v;
399 : gpr_int64 i;
400 72098071 : for (i = 0; i != m->iterations; i++) {
401 72097951 : gpr_stats_inc(&m->stats_counter, 1);
402 : }
403 120 : gpr_mu_lock(&m->mu);
404 120 : m->counter = gpr_stats_read(&m->stats_counter);
405 120 : gpr_mu_unlock(&m->mu);
406 120 : mark_thread_done(m);
407 120 : }
408 :
409 : /* Increment m->refcount m->iterations times, decrement m->thread_refcount
410 : once, and if it reaches zero, set m->event to (void*)1; then mark thread as
411 : done. */
412 12254538 : static void refinc(void *v /*=m*/) {
413 12254538 : struct test *m = v;
414 : gpr_int64 i;
415 38503679 : for (i = 0; i != m->iterations; i++) {
416 38503569 : gpr_ref(&m->refcount);
417 : }
418 110 : if (gpr_unref(&m->thread_refcount)) {
419 11 : gpr_event_set(&m->event, (void *)1);
420 : }
421 110 : mark_thread_done(m);
422 110 : }
423 :
424 : /* Wait until m->event is set to (void *)1, then decrement m->refcount
425 : m->stats_counter m->iterations times, and ensure that the last decrement
426 : caused the counter to reach zero, then mark thread as done. */
427 11 : static void refcheck(void *v /*=m*/) {
428 11 : struct test *m = v;
429 11 : gpr_int64 n = m->iterations * m->threads;
430 : gpr_int64 i;
431 11 : GPR_ASSERT(gpr_event_wait(&m->event, gpr_inf_future(GPR_CLOCK_REALTIME)) ==
432 : (void *)1);
433 11 : GPR_ASSERT(gpr_event_get(&m->event) == (void *)1);
434 41922560 : for (i = 1; i != n; i++) {
435 41922549 : GPR_ASSERT(!gpr_unref(&m->refcount));
436 41922549 : m->counter++;
437 : }
438 11 : GPR_ASSERT(gpr_unref(&m->refcount));
439 11 : m->counter++;
440 11 : mark_thread_done(m);
441 11 : }
442 :
443 : /* ------------------------------------------------- */
444 :
445 1 : int main(int argc, char *argv[]) {
446 1 : grpc_test_init(argc, argv);
447 1 : test("mutex", &inc, NULL, 1);
448 1 : test("mutex try", &inctry, NULL, 1);
449 1 : test("cv", &inc_by_turns, NULL, 1);
450 1 : test("timedcv", &inc_with_1ms_delay, NULL, 1);
451 1 : test("queue", &many_producers, &consumer, 10);
452 1 : test("stats_counter", &statsinc, NULL, 1);
453 1 : test("refcount", &refinc, &refcheck, 1);
454 1 : test("timedevent", &inc_with_1ms_delay_event, NULL, 1);
455 1 : return 0;
456 : }
|