LCOV - code coverage report
Current view: top level - test/core/surface - completion_queue_test.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 177 179 98.9 %
Date: 2015-10-10 Functions: 15 15 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/surface/completion_queue.h"
      35             : 
      36             : #include "src/core/iomgr/iomgr.h"
      37             : #include <grpc/support/alloc.h>
      38             : #include <grpc/support/log.h>
      39             : #include <grpc/support/thd.h>
      40             : #include <grpc/support/time.h>
      41             : #include <grpc/support/useful.h>
      42             : #include "test/core/util/test_config.h"
      43             : 
      44             : #define LOG_TEST(x) gpr_log(GPR_INFO, "%s", x)
      45             : 
      46         129 : static void *create_test_tag(void) {
      47             :   static gpr_intptr i = 0;
      48         129 :   return (void *)(++i);
      49             : }
      50             : 
      51             : /* helper for tests to shutdown correctly and tersely */
      52           4 : static void shutdown_and_destroy(grpc_completion_queue *cc) {
      53             :   grpc_event ev;
      54           4 :   grpc_completion_queue_shutdown(cc);
      55           4 :   ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
      56           4 :   GPR_ASSERT(ev.type == GRPC_QUEUE_SHUTDOWN);
      57           4 :   grpc_completion_queue_destroy(cc);
      58           4 : }
      59             : 
      60             : /* ensure we can create and destroy a completion channel */
      61           1 : static void test_no_op(void) {
      62           1 :   LOG_TEST("test_no_op");
      63           1 :   shutdown_and_destroy(grpc_completion_queue_create(NULL));
      64           1 : }
      65             : 
      66           1 : static void test_wait_empty(void) {
      67             :   grpc_completion_queue *cc;
      68             :   grpc_event event;
      69             : 
      70           1 :   LOG_TEST("test_wait_empty");
      71             : 
      72           1 :   cc = grpc_completion_queue_create(NULL);
      73           1 :   event = grpc_completion_queue_next(cc, gpr_now(GPR_CLOCK_REALTIME), NULL);
      74           1 :   GPR_ASSERT(event.type == GRPC_QUEUE_TIMEOUT);
      75           1 :   shutdown_and_destroy(cc);
      76           1 : }
      77             : 
      78         257 : static void do_nothing_end_completion(grpc_exec_ctx *exec_ctx, void *arg,
      79         257 :                                       grpc_cq_completion *c) {}
      80             : 
      81           1 : static void test_cq_end_op(void) {
      82             :   grpc_event ev;
      83             :   grpc_completion_queue *cc;
      84             :   grpc_cq_completion completion;
      85           1 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
      86           1 :   void *tag = create_test_tag();
      87             : 
      88           1 :   LOG_TEST("test_cq_end_op");
      89             : 
      90           1 :   cc = grpc_completion_queue_create(NULL);
      91             : 
      92           1 :   grpc_cq_begin_op(cc);
      93           1 :   grpc_cq_end_op(&exec_ctx, cc, tag, 1, do_nothing_end_completion, NULL,
      94             :                  &completion);
      95             : 
      96           1 :   ev = grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
      97           1 :   GPR_ASSERT(ev.type == GRPC_OP_COMPLETE);
      98           1 :   GPR_ASSERT(ev.tag == tag);
      99           1 :   GPR_ASSERT(ev.success);
     100             : 
     101           1 :   shutdown_and_destroy(cc);
     102           1 :   grpc_exec_ctx_finish(&exec_ctx);
     103           1 : }
     104             : 
     105           1 : static void test_shutdown_then_next_polling(void) {
     106             :   grpc_completion_queue *cc;
     107             :   grpc_event event;
     108           1 :   LOG_TEST("test_shutdown_then_next_polling");
     109             : 
     110           1 :   cc = grpc_completion_queue_create(NULL);
     111           1 :   grpc_completion_queue_shutdown(cc);
     112           1 :   event =
     113           1 :       grpc_completion_queue_next(cc, gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
     114           1 :   GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
     115           1 :   grpc_completion_queue_destroy(cc);
     116           1 : }
     117             : 
     118           1 : static void test_shutdown_then_next_with_timeout(void) {
     119             :   grpc_completion_queue *cc;
     120             :   grpc_event event;
     121           1 :   LOG_TEST("test_shutdown_then_next_with_timeout");
     122             : 
     123           1 :   cc = grpc_completion_queue_create(NULL);
     124           1 :   grpc_completion_queue_shutdown(cc);
     125           1 :   event =
     126           1 :       grpc_completion_queue_next(cc, gpr_inf_future(GPR_CLOCK_REALTIME), NULL);
     127           1 :   GPR_ASSERT(event.type == GRPC_QUEUE_SHUTDOWN);
     128           1 :   grpc_completion_queue_destroy(cc);
     129           1 : }
     130             : 
     131           1 : static void test_pluck(void) {
     132             :   grpc_event ev;
     133             :   grpc_completion_queue *cc;
     134             :   void *tags[128];
     135             :   grpc_cq_completion completions[GPR_ARRAY_SIZE(tags)];
     136           1 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     137             :   unsigned i, j;
     138             : 
     139           1 :   LOG_TEST("test_pluck");
     140             : 
     141         129 :   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
     142         128 :     tags[i] = create_test_tag();
     143        8256 :     for (j = 0; j < i; j++) {
     144        8128 :       GPR_ASSERT(tags[i] != tags[j]);
     145             :     }
     146             :   }
     147             : 
     148           1 :   cc = grpc_completion_queue_create(NULL);
     149             : 
     150         129 :   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
     151         128 :     grpc_cq_begin_op(cc);
     152         128 :     grpc_cq_end_op(&exec_ctx, cc, tags[i], 1, do_nothing_end_completion, NULL,
     153             :                    &completions[i]);
     154             :   }
     155             : 
     156         129 :   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
     157         128 :     ev = grpc_completion_queue_pluck(cc, tags[i],
     158             :                                      gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
     159         128 :     GPR_ASSERT(ev.tag == tags[i]);
     160             :   }
     161             : 
     162         129 :   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
     163         128 :     grpc_cq_begin_op(cc);
     164         128 :     grpc_cq_end_op(&exec_ctx, cc, tags[i], 1, do_nothing_end_completion, NULL,
     165             :                    &completions[i]);
     166             :   }
     167             : 
     168         129 :   for (i = 0; i < GPR_ARRAY_SIZE(tags); i++) {
     169         128 :     ev = grpc_completion_queue_pluck(cc, tags[GPR_ARRAY_SIZE(tags) - i - 1],
     170             :                                      gpr_inf_past(GPR_CLOCK_REALTIME), NULL);
     171         128 :     GPR_ASSERT(ev.tag == tags[GPR_ARRAY_SIZE(tags) - i - 1]);
     172             :   }
     173             : 
     174           1 :   shutdown_and_destroy(cc);
     175           1 :   grpc_exec_ctx_finish(&exec_ctx);
     176           1 : }
     177             : 
     178             : #define TEST_THREAD_EVENTS 10000
     179             : 
     180             : typedef struct test_thread_options {
     181             :   gpr_event on_started;
     182             :   gpr_event *phase1;
     183             :   gpr_event on_phase1_done;
     184             :   gpr_event *phase2;
     185             :   gpr_event on_finished;
     186             :   size_t events_triggered;
     187             :   int id;
     188             :   grpc_completion_queue *cc;
     189             : } test_thread_options;
     190             : 
     191      220242 : gpr_timespec ten_seconds_time(void) {
     192      220242 :   return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
     193             : }
     194             : 
     195      220000 : static void free_completion(grpc_exec_ctx *exec_ctx, void *arg,
     196             :                             grpc_cq_completion *completion) {
     197      220000 :   gpr_free(completion);
     198      220000 : }
     199             : 
     200          22 : static void producer_thread(void *arg) {
     201          22 :   test_thread_options *opt = arg;
     202             :   int i;
     203          22 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     204             : 
     205          22 :   gpr_log(GPR_INFO, "producer %d started", opt->id);
     206          22 :   gpr_event_set(&opt->on_started, (void *)(gpr_intptr)1);
     207          22 :   GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
     208             : 
     209          22 :   gpr_log(GPR_INFO, "producer %d phase 1", opt->id);
     210      220022 :   for (i = 0; i < TEST_THREAD_EVENTS; i++) {
     211      220000 :     grpc_cq_begin_op(opt->cc);
     212             :   }
     213             : 
     214          22 :   gpr_log(GPR_INFO, "producer %d phase 1 done", opt->id);
     215          22 :   gpr_event_set(&opt->on_phase1_done, (void *)(gpr_intptr)1);
     216          22 :   GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
     217             : 
     218          22 :   gpr_log(GPR_INFO, "producer %d phase 2", opt->id);
     219      220022 :   for (i = 0; i < TEST_THREAD_EVENTS; i++) {
     220      220000 :     grpc_cq_end_op(&exec_ctx, opt->cc, (void *)(gpr_intptr)1, 1,
     221             :                    free_completion, NULL,
     222      220000 :                    gpr_malloc(sizeof(grpc_cq_completion)));
     223      220000 :     opt->events_triggered++;
     224      220000 :     grpc_exec_ctx_finish(&exec_ctx);
     225             :   }
     226             : 
     227          22 :   gpr_log(GPR_INFO, "producer %d phase 2 done", opt->id);
     228          22 :   gpr_event_set(&opt->on_finished, (void *)(gpr_intptr)1);
     229          22 :   grpc_exec_ctx_finish(&exec_ctx);
     230          22 : }
     231             : 
     232          22 : static void consumer_thread(void *arg) {
     233          22 :   test_thread_options *opt = arg;
     234             :   grpc_event ev;
     235             : 
     236          22 :   gpr_log(GPR_INFO, "consumer %d started", opt->id);
     237          22 :   gpr_event_set(&opt->on_started, (void *)(gpr_intptr)1);
     238          22 :   GPR_ASSERT(gpr_event_wait(opt->phase1, ten_seconds_time()));
     239             : 
     240          22 :   gpr_log(GPR_INFO, "consumer %d phase 1", opt->id);
     241             : 
     242          22 :   gpr_log(GPR_INFO, "consumer %d phase 1 done", opt->id);
     243          22 :   gpr_event_set(&opt->on_phase1_done, (void *)(gpr_intptr)1);
     244          22 :   GPR_ASSERT(gpr_event_wait(opt->phase2, ten_seconds_time()));
     245             : 
     246          22 :   gpr_log(GPR_INFO, "consumer %d phase 2", opt->id);
     247             :   for (;;) {
     248      220022 :     ev = grpc_completion_queue_next(opt->cc, ten_seconds_time(), NULL);
     249      220022 :     switch (ev.type) {
     250             :       case GRPC_OP_COMPLETE:
     251      220000 :         GPR_ASSERT(ev.success);
     252      220000 :         opt->events_triggered++;
     253      220000 :         break;
     254             :       case GRPC_QUEUE_SHUTDOWN:
     255          22 :         gpr_log(GPR_INFO, "consumer %d phase 2 done", opt->id);
     256          22 :         gpr_event_set(&opt->on_finished, (void *)(gpr_intptr)1);
     257          22 :         return;
     258             :       case GRPC_QUEUE_TIMEOUT:
     259           0 :         gpr_log(GPR_ERROR, "Invalid timeout received");
     260           0 :         abort();
     261             :     }
     262      220000 :   }
     263             : }
     264             : 
     265           4 : static void test_threading(size_t producers, size_t consumers) {
     266           4 :   test_thread_options *options =
     267           4 :       gpr_malloc((producers + consumers) * sizeof(test_thread_options));
     268           4 :   gpr_event phase1 = GPR_EVENT_INIT;
     269           4 :   gpr_event phase2 = GPR_EVENT_INIT;
     270           4 :   grpc_completion_queue *cc = grpc_completion_queue_create(NULL);
     271             :   size_t i;
     272           4 :   size_t total_consumed = 0;
     273             :   static int optid = 101;
     274             : 
     275           4 :   gpr_log(GPR_INFO, "%s: %d producers, %d consumers", "test_threading",
     276             :           producers, consumers);
     277             : 
     278             :   /* start all threads: they will wait for phase1 */
     279          48 :   for (i = 0; i < producers + consumers; i++) {
     280             :     gpr_thd_id id;
     281          44 :     gpr_event_init(&options[i].on_started);
     282          44 :     gpr_event_init(&options[i].on_phase1_done);
     283          44 :     gpr_event_init(&options[i].on_finished);
     284          44 :     options[i].phase1 = &phase1;
     285          44 :     options[i].phase2 = &phase2;
     286          44 :     options[i].events_triggered = 0;
     287          44 :     options[i].cc = cc;
     288          44 :     options[i].id = optid++;
     289          44 :     GPR_ASSERT(gpr_thd_new(&id,
     290             :                            i < producers ? producer_thread : consumer_thread,
     291             :                            options + i, NULL));
     292          44 :     gpr_event_wait(&options[i].on_started, ten_seconds_time());
     293             :   }
     294             : 
     295             :   /* start phase1: producers will pre-declare all operations they will
     296             :      complete */
     297           4 :   gpr_log(GPR_INFO, "start phase 1");
     298           4 :   gpr_event_set(&phase1, (void *)(gpr_intptr)1);
     299             : 
     300           4 :   gpr_log(GPR_INFO, "wait phase 1");
     301          48 :   for (i = 0; i < producers + consumers; i++) {
     302          44 :     GPR_ASSERT(gpr_event_wait(&options[i].on_phase1_done, ten_seconds_time()));
     303             :   }
     304           4 :   gpr_log(GPR_INFO, "done phase 1");
     305             : 
     306             :   /* start phase2: operations will complete, and consumers will consume them */
     307           4 :   gpr_log(GPR_INFO, "start phase 2");
     308           4 :   gpr_event_set(&phase2, (void *)(gpr_intptr)1);
     309             : 
     310             :   /* in parallel, we shutdown the completion channel - all events should still
     311             :      be consumed */
     312           4 :   grpc_completion_queue_shutdown(cc);
     313             : 
     314             :   /* join all threads */
     315           4 :   gpr_log(GPR_INFO, "wait phase 2");
     316          48 :   for (i = 0; i < producers + consumers; i++) {
     317          44 :     GPR_ASSERT(gpr_event_wait(&options[i].on_finished, ten_seconds_time()));
     318             :   }
     319           4 :   gpr_log(GPR_INFO, "done phase 2");
     320             : 
     321             :   /* destroy the completion channel */
     322           4 :   grpc_completion_queue_destroy(cc);
     323             : 
     324             :   /* verify that everything was produced and consumed */
     325          48 :   for (i = 0; i < producers + consumers; i++) {
     326          44 :     if (i < producers) {
     327          22 :       GPR_ASSERT(options[i].events_triggered == TEST_THREAD_EVENTS);
     328             :     } else {
     329          22 :       total_consumed += options[i].events_triggered;
     330             :     }
     331             :   }
     332           4 :   GPR_ASSERT(total_consumed == producers * TEST_THREAD_EVENTS);
     333             : 
     334           4 :   gpr_free(options);
     335           4 : }
     336             : 
     337           1 : int main(int argc, char **argv) {
     338           1 :   grpc_test_init(argc, argv);
     339           1 :   grpc_init();
     340           1 :   test_no_op();
     341           1 :   test_wait_empty();
     342           1 :   test_shutdown_then_next_polling();
     343           1 :   test_shutdown_then_next_with_timeout();
     344           1 :   test_cq_end_op();
     345           1 :   test_pluck();
     346           1 :   test_threading(1, 1);
     347           1 :   test_threading(1, 10);
     348           1 :   test_threading(10, 1);
     349           1 :   test_threading(10, 10);
     350           1 :   grpc_shutdown();
     351           1 :   return 0;
     352             : }

Generated by: LCOV version 1.10