LCOV - code coverage report
Current view: top level - test/core/support - sync_test.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 222 224 99.1 %
Date: 2015-10-10 Functions: 23 23 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : /* Test of gpr synchronization support. */
      35             : 
      36             : #include <stdio.h>
      37             : #include <stdlib.h>
      38             : #include <grpc/support/alloc.h>
      39             : #include <grpc/support/log.h>
      40             : #include <grpc/support/sync.h>
      41             : #include <grpc/support/thd.h>
      42             : #include <grpc/support/time.h>
      43             : #include "test/core/util/test_config.h"
      44             : 
      45             : /* ==================Example use of interface===================
      46             : 
      47             :    A producer-consumer queue of up to N integers,
      48             :    illustrating the use of the calls in this interface.  */
      49             : 
      50             : #define N 4
      51             : 
      52             : typedef struct queue {
      53             :   gpr_cv non_empty; /* Signalled when length becomes non-zero. */
      54             :   gpr_cv non_full;  /* Signalled when length becomes non-N. */
      55             :   gpr_mu mu;        /* Protects all fields below.
      56             :                        (That is, except during initialization or
      57             :                        destruction, the fields below should be accessed
      58             :                        only by a thread that holds mu.) */
      59             :   int head;         /* Index of head of queue 0..N-1. */
      60             :   int length;       /* Number of valid elements in queue 0..N. */
      61             :   int elem[N];      /* elem[head .. head+length-1] are queue elements. */
      62             : } queue;
      63             : 
      64             : /* Initialize *q. */
      65          50 : void queue_init(queue *q) {
      66          50 :   gpr_mu_init(&q->mu);
      67          50 :   gpr_cv_init(&q->non_empty);
      68          50 :   gpr_cv_init(&q->non_full);
      69          50 :   q->head = 0;
      70          50 :   q->length = 0;
      71          50 : }
      72             : 
      73             : /* Free storage associated with *q. */
      74          50 : void queue_destroy(queue *q) {
      75          50 :   gpr_mu_destroy(&q->mu);
      76          50 :   gpr_cv_destroy(&q->non_empty);
      77          50 :   gpr_cv_destroy(&q->non_full);
      78          50 : }
      79             : 
      80             : /* Wait until there is room in *q, then append x to *q. */
      81      645073 : void queue_append(queue *q, int x) {
      82      645073 :   gpr_mu_lock(&q->mu);
      83             :   /* To wait for a predicate without a deadline, loop on the negation of the
      84             :      predicate, and use gpr_cv_wait(..., gpr_inf_future(GPR_CLOCK_REALTIME))
      85             :      inside the loop
      86             :      to release the lock, wait, and reacquire on each iteration.  Code that
      87             :      makes the condition true should use gpr_cv_broadcast() on the
      88             :      corresponding condition variable.  The predicate must be on state
      89             :      protected by the lock.  */
      90     2975353 :   while (q->length == N) {
      91     1685113 :     gpr_cv_wait(&q->non_full, &q->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
      92             :   }
      93      645120 :   if (q->length == 0) { /* Wake threads blocked in queue_remove(). */
      94             :     /* It's normal to use gpr_cv_broadcast() or gpr_signal() while
      95             :        holding the lock. */
      96      104471 :     gpr_cv_broadcast(&q->non_empty);
      97             :   }
      98      645120 :   q->elem[(q->head + q->length) % N] = x;
      99      645120 :   q->length++;
     100      645120 :   gpr_mu_unlock(&q->mu);
     101      645092 : }
     102             : 
     103             : /* If it can be done without blocking, append x to *q and return non-zero.
     104             :    Otherwise return 0. */
     105   150853538 : int queue_try_append(queue *q, int x) {
     106   150853538 :   int result = 0;
     107   150853538 :   if (gpr_mu_trylock(&q->mu)) {
     108    51986552 :     if (q->length != N) {
     109      645120 :       if (q->length == 0) { /* Wake threads blocked in queue_remove(). */
     110       31564 :         gpr_cv_broadcast(&q->non_empty);
     111             :       }
     112      645120 :       q->elem[(q->head + q->length) % N] = x;
     113      645120 :       q->length++;
     114      645120 :       result = 1;
     115             :     }
     116    51986552 :     gpr_mu_unlock(&q->mu);
     117             :   }
     118   149670282 :   return result;
     119             : }
     120             : 
     121             : /* Wait until the *q is non-empty or deadline abs_deadline passes.  If the
     122             :    queue is non-empty, remove its head entry, place it in *head, and return
     123             :    non-zero.  Otherwise return 0.  */
     124     1290246 : int queue_remove(queue *q, int *head, gpr_timespec abs_deadline) {
     125     1290246 :   int result = 0;
     126     1290246 :   gpr_mu_lock(&q->mu);
     127             :   /* To wait for a predicate with a deadline, loop on the negation of the
     128             :      predicate or until gpr_cv_wait() returns true.  Code that makes
     129             :      the condition true should use gpr_cv_broadcast() on the corresponding
     130             :      condition variable.  The predicate must be on state protected by the
     131             :      lock. */
     132     1290246 :   while (q->length == 0 && !gpr_cv_wait(&q->non_empty, &q->mu, abs_deadline)) {
     133             :   }
     134     1290246 :   if (q->length != 0) { /* Queue is non-empty. */
     135     1290240 :     result = 1;
     136     1290240 :     if (q->length == N) { /* Wake threads blocked in queue_append(). */
     137      778601 :       gpr_cv_broadcast(&q->non_full);
     138             :     }
     139     1290240 :     *head = q->elem[q->head];
     140     1290240 :     q->head = (q->head + 1) % N;
     141     1290240 :     q->length--;
     142             :   } /* else deadline exceeded */
     143     1290246 :   gpr_mu_unlock(&q->mu);
     144     1290246 :   return result;
     145             : }
     146             : 
     147             : /* ------------------------------------------------- */
     148             : /* Tests for gpr_mu and gpr_cv, and the queue example. */
     149             : struct test {
     150             :   int threads; /* number of threads */
     151             : 
     152             :   gpr_int64 iterations; /* number of iterations per thread */
     153             :   gpr_int64 counter;
     154             :   int thread_count; /* used to allocate thread ids */
     155             :   int done;         /* threads not yet completed */
     156             : 
     157             :   gpr_mu mu; /* protects iterations, counter, thread_count, done */
     158             : 
     159             :   gpr_cv cv; /* signalling depends on test */
     160             : 
     161             :   gpr_cv done_cv; /* signalled when done == 0 */
     162             : 
     163             :   queue q;
     164             : 
     165             :   gpr_stats_counter stats_counter;
     166             : 
     167             :   gpr_refcount refcount;
     168             :   gpr_refcount thread_refcount;
     169             :   gpr_event event;
     170             : };
     171             : 
     172             : /* Return pointer to a new struct test. */
     173          50 : static struct test *test_new(int threads, gpr_int64 iterations) {
     174          50 :   struct test *m = gpr_malloc(sizeof(*m));
     175          50 :   m->threads = threads;
     176          50 :   m->iterations = iterations;
     177          50 :   m->counter = 0;
     178          50 :   m->thread_count = 0;
     179          50 :   m->done = threads;
     180          50 :   gpr_mu_init(&m->mu);
     181          50 :   gpr_cv_init(&m->cv);
     182          50 :   gpr_cv_init(&m->done_cv);
     183          50 :   queue_init(&m->q);
     184          50 :   gpr_stats_init(&m->stats_counter, 0);
     185          50 :   gpr_ref_init(&m->refcount, 0);
     186          50 :   gpr_ref_init(&m->thread_refcount, threads);
     187          50 :   gpr_event_init(&m->event);
     188          50 :   return m;
     189             : }
     190             : 
     191             : /* Return pointer to a new struct test. */
     192          50 : static void test_destroy(struct test *m) {
     193          50 :   gpr_mu_destroy(&m->mu);
     194          50 :   gpr_cv_destroy(&m->cv);
     195          50 :   gpr_cv_destroy(&m->done_cv);
     196          50 :   queue_destroy(&m->q);
     197          50 :   gpr_free(m);
     198          50 : }
     199             : 
     200             : /* Create m->threads threads, each running (*body)(m) */
     201          50 : static void test_create_threads(struct test *m, void (*body)(void *arg)) {
     202             :   gpr_thd_id id;
     203             :   int i;
     204         550 :   for (i = 0; i != m->threads; i++) {
     205         500 :     GPR_ASSERT(gpr_thd_new(&id, body, m, NULL));
     206             :   }
     207          50 : }
     208             : 
     209             : /* Wait until all threads report done. */
     210          50 : static void test_wait(struct test *m) {
     211          50 :   gpr_mu_lock(&m->mu);
     212         150 :   while (m->done != 0) {
     213          50 :     gpr_cv_wait(&m->done_cv, &m->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
     214             :   }
     215          50 :   gpr_mu_unlock(&m->mu);
     216          50 : }
     217             : 
     218             : /* Get an integer thread id in the raneg 0..threads-1 */
     219          90 : static int thread_id(struct test *m) {
     220             :   int id;
     221          90 :   gpr_mu_lock(&m->mu);
     222          90 :   id = m->thread_count++;
     223          90 :   gpr_mu_unlock(&m->mu);
     224          90 :   return id;
     225             : }
     226             : 
     227             : /* Indicate that a thread is done, by decrementing m->done
     228             :    and signalling done_cv if m->done==0. */
     229         517 : static void mark_thread_done(struct test *m) {
     230         517 :   gpr_mu_lock(&m->mu);
     231         517 :   GPR_ASSERT(m->done != 0);
     232         517 :   m->done--;
     233         517 :   if (m->done == 0) {
     234          50 :     gpr_cv_signal(&m->done_cv);
     235             :   }
     236         517 :   gpr_mu_unlock(&m->mu);
     237         517 : }
     238             : 
     239             : /* Test several threads running (*body)(struct test *m) for increasing settings
     240             :    of m->iterations, until about timeout_s to 2*timeout_s seconds have elapsed.
     241             :    If extra!=NULL, run (*extra)(m) in an additional thread.  */
     242           8 : static void test(const char *name, void (*body)(void *m),
     243             :                  void (*extra)(void *m), int timeout_s) {
     244           8 :   gpr_int64 iterations = 1024;
     245             :   struct test *m;
     246           8 :   gpr_timespec start = gpr_now(GPR_CLOCK_REALTIME);
     247             :   gpr_timespec time_taken;
     248           8 :   gpr_timespec deadline = gpr_time_add(
     249           8 :       start, gpr_time_from_micros(timeout_s * 1000000, GPR_TIMESPAN));
     250           8 :   fprintf(stderr, "%s:", name);
     251          66 :   while (gpr_time_cmp(gpr_now(GPR_CLOCK_REALTIME), deadline) < 0) {
     252          50 :     iterations <<= 1;
     253          50 :     fprintf(stderr, " %ld", (long)iterations);
     254          50 :     m = test_new(10, iterations);
     255          50 :     if (extra != NULL) {
     256             :       gpr_thd_id id;
     257          17 :       GPR_ASSERT(gpr_thd_new(&id, extra, m, NULL));
     258          17 :       m->done++; /* one more thread to wait for */
     259             :     }
     260          50 :     test_create_threads(m, body);
     261          50 :     test_wait(m);
     262          50 :     if (m->counter != m->threads * m->iterations) {
     263           0 :       fprintf(stderr, "counter %ld  threads %d  iterations %ld\n",
     264             :               (long)m->counter, m->threads, (long)m->iterations);
     265           0 :       GPR_ASSERT(0);
     266             :     }
     267          50 :     test_destroy(m);
     268             :   }
     269           8 :   time_taken = gpr_time_sub(gpr_now(GPR_CLOCK_REALTIME), start);
     270           8 :   fprintf(stderr, " done %ld.%09d s\n", (long)time_taken.tv_sec,
     271             :           (int)time_taken.tv_nsec);
     272           8 : }
     273             : 
     274             : /* Increment m->counter on each iteration; then mark thread as done.  */
     275       19890 : static void inc(void *v /*=m*/) {
     276       19890 :   struct test *m = v;
     277             :   gpr_int64 i;
     278     5074320 :   for (i = 0; i != m->iterations; i++) {
     279     5074240 :     gpr_mu_lock(&m->mu);
     280     5222400 :     m->counter++;
     281     5222400 :     gpr_mu_unlock(&m->mu);
     282             :   }
     283          80 :   mark_thread_done(m);
     284          80 : }
     285             : 
     286             : /* Increment m->counter under lock acquired with trylock, m->iterations times;
     287             :    then mark thread as done.  */
     288     3993238 : static void inctry(void *v /*=m*/) {
     289     3993238 :   struct test *m = v;
     290             :   gpr_int64 i;
     291    28757281 :   for (i = 0; i != m->iterations;) {
     292    24763963 :     if (gpr_mu_trylock(&m->mu)) {
     293     5222400 :       m->counter++;
     294     5222400 :       gpr_mu_unlock(&m->mu);
     295     4872161 :       i++;
     296             :     }
     297             :   }
     298          80 :   mark_thread_done(m);
     299          80 : }
     300             : 
     301             : /* Increment counter only when (m->counter%m->threads)==m->thread_id; then mark
     302             :    thread as done.  */
     303          30 : static void inc_by_turns(void *v /*=m*/) {
     304          30 :   struct test *m = v;
     305             :   gpr_int64 i;
     306          30 :   int id = thread_id(m);
     307      143390 :   for (i = 0; i != m->iterations; i++) {
     308      143360 :     gpr_mu_lock(&m->mu);
     309      970012 :     while ((m->counter % m->threads) != id) {
     310      683292 :       gpr_cv_wait(&m->cv, &m->mu, gpr_inf_future(GPR_CLOCK_REALTIME));
     311             :     }
     312      143360 :     m->counter++;
     313      143360 :     gpr_cv_broadcast(&m->cv);
     314      143360 :     gpr_mu_unlock(&m->mu);
     315             :   }
     316          30 :   mark_thread_done(m);
     317          30 : }
     318             : 
     319             : /* Wait a millisecond and increment counter on each iteration;
     320             :    then mark thread as done. */
     321          10 : static void inc_with_1ms_delay(void *v /*=m*/) {
     322          10 :   struct test *m = v;
     323             :   gpr_int64 i;
     324       20490 :   for (i = 0; i != m->iterations; i++) {
     325             :     gpr_timespec deadline;
     326       20480 :     gpr_mu_lock(&m->mu);
     327       20480 :     deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
     328             :                             gpr_time_from_micros(1000, GPR_TIMESPAN));
     329       20480 :     while (!gpr_cv_wait(&m->cv, &m->mu, deadline)) {
     330             :     }
     331       20480 :     m->counter++;
     332       20480 :     gpr_mu_unlock(&m->mu);
     333             :   }
     334          10 :   mark_thread_done(m);
     335          10 : }
     336             : 
     337             : /* Wait a millisecond and increment counter on each iteration, using an event
     338             :    for timing; then mark thread as done. */
     339          10 : static void inc_with_1ms_delay_event(void *v /*=m*/) {
     340          10 :   struct test *m = v;
     341             :   gpr_int64 i;
     342       20490 :   for (i = 0; i != m->iterations; i++) {
     343             :     gpr_timespec deadline;
     344       20480 :     deadline = gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
     345             :                             gpr_time_from_micros(1000, GPR_TIMESPAN));
     346       20478 :     GPR_ASSERT(gpr_event_wait(&m->event, deadline) == NULL);
     347       20472 :     gpr_mu_lock(&m->mu);
     348       20480 :     m->counter++;
     349       20480 :     gpr_mu_unlock(&m->mu);
     350             :   }
     351          10 :   mark_thread_done(m);
     352          10 : }
     353             : 
     354             : /* Produce m->iterations elements on queue m->q, then mark thread as done.
     355             :    Even threads use queue_append(), and odd threads use queue_try_append()
     356             :    until it succeeds. */
     357          60 : static void many_producers(void *v /*=m*/) {
     358          60 :   struct test *m = v;
     359             :   gpr_int64 i;
     360          60 :   int x = thread_id(m);
     361          60 :   if ((x & 1) == 0) {
     362      645108 :     for (i = 0; i != m->iterations; i++) {
     363      645081 :       queue_append(&m->q, 1);
     364             :     }
     365             :   } else {
     366      645150 :     for (i = 0; i != m->iterations; i++) {
     367      645120 :       while (!queue_try_append(&m->q, 1)) {
     368             :       }
     369             :     }
     370             :   }
     371          57 :   mark_thread_done(m);
     372          60 : }
     373             : 
     374             : /* Consume elements from m->q until m->threads*m->iterations are seen,
     375             :    wait an extra second to confirm that no more elements are arriving,
     376             :    then mark thread as done. */
     377           6 : static void consumer(void *v /*=m*/) {
     378           6 :   struct test *m = v;
     379           6 :   gpr_int64 n = m->iterations * m->threads;
     380             :   gpr_int64 i;
     381             :   int value;
     382     1290246 :   for (i = 0; i != n; i++) {
     383     1290240 :     queue_remove(&m->q, &value, gpr_inf_future(GPR_CLOCK_REALTIME));
     384             :   }
     385           6 :   gpr_mu_lock(&m->mu);
     386           6 :   m->counter = n;
     387           6 :   gpr_mu_unlock(&m->mu);
     388           6 :   GPR_ASSERT(
     389             :       !queue_remove(&m->q, &value,
     390             :                     gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
     391             :                                  gpr_time_from_micros(1000000, GPR_TIMESPAN))));
     392           6 :   mark_thread_done(m);
     393           6 : }
     394             : 
     395             : /* Increment m->stats_counter m->iterations times, transfer counter value to
     396             :    m->counter, then mark thread as done.  */
     397    32635663 : static void statsinc(void *v /*=m*/) {
     398    32635663 :   struct test *m = v;
     399             :   gpr_int64 i;
     400    72098071 :   for (i = 0; i != m->iterations; i++) {
     401    72097951 :     gpr_stats_inc(&m->stats_counter, 1);
     402             :   }
     403         120 :   gpr_mu_lock(&m->mu);
     404         120 :   m->counter = gpr_stats_read(&m->stats_counter);
     405         120 :   gpr_mu_unlock(&m->mu);
     406         120 :   mark_thread_done(m);
     407         120 : }
     408             : 
     409             : /* Increment m->refcount m->iterations times, decrement m->thread_refcount
     410             :    once, and if it reaches zero, set m->event to (void*)1; then mark thread as
     411             :    done.  */
     412    12254538 : static void refinc(void *v /*=m*/) {
     413    12254538 :   struct test *m = v;
     414             :   gpr_int64 i;
     415    38503679 :   for (i = 0; i != m->iterations; i++) {
     416    38503569 :     gpr_ref(&m->refcount);
     417             :   }
     418         110 :   if (gpr_unref(&m->thread_refcount)) {
     419          11 :     gpr_event_set(&m->event, (void *)1);
     420             :   }
     421         110 :   mark_thread_done(m);
     422         110 : }
     423             : 
     424             : /* Wait until m->event is set to (void *)1, then decrement m->refcount
     425             :    m->stats_counter m->iterations times, and ensure that the last decrement
     426             :    caused the counter to reach zero, then mark thread as done.  */
     427          11 : static void refcheck(void *v /*=m*/) {
     428          11 :   struct test *m = v;
     429          11 :   gpr_int64 n = m->iterations * m->threads;
     430             :   gpr_int64 i;
     431          11 :   GPR_ASSERT(gpr_event_wait(&m->event, gpr_inf_future(GPR_CLOCK_REALTIME)) ==
     432             :              (void *)1);
     433          11 :   GPR_ASSERT(gpr_event_get(&m->event) == (void *)1);
     434    41922560 :   for (i = 1; i != n; i++) {
     435    41922549 :     GPR_ASSERT(!gpr_unref(&m->refcount));
     436    41922549 :     m->counter++;
     437             :   }
     438          11 :   GPR_ASSERT(gpr_unref(&m->refcount));
     439          11 :   m->counter++;
     440          11 :   mark_thread_done(m);
     441          11 : }
     442             : 
     443             : /* ------------------------------------------------- */
     444             : 
     445           1 : int main(int argc, char *argv[]) {
     446           1 :   grpc_test_init(argc, argv);
     447           1 :   test("mutex", &inc, NULL, 1);
     448           1 :   test("mutex try", &inctry, NULL, 1);
     449           1 :   test("cv", &inc_by_turns, NULL, 1);
     450           1 :   test("timedcv", &inc_with_1ms_delay, NULL, 1);
     451           1 :   test("queue", &many_producers, &consumer, 10);
     452           1 :   test("stats_counter", &statsinc, NULL, 1);
     453           1 :   test("refcount", &refinc, &refcheck, 1);
     454           1 :   test("timedevent", &inc_with_1ms_delay_event, NULL, 1);
     455           1 :   return 0;
     456             : }

Generated by: LCOV version 1.10