LCOV - code coverage report
Current view: top level - core/iomgr - executor.c (source / functions) Hit Total Coverage
Test: tmp.CaZ6RjdVn2 Lines: 54 56 96.4 %
Date: 2015-12-10 22:15:08 Functions: 5 5 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "src/core/iomgr/executor.h"
      35             : 
      36             : #include <string.h>
      37             : 
      38             : #include <grpc/support/alloc.h>
      39             : #include <grpc/support/log.h>
      40             : #include <grpc/support/sync.h>
      41             : #include <grpc/support/thd.h>
      42             : #include "src/core/iomgr/exec_ctx.h"
      43             : 
      44             : typedef struct grpc_executor_data {
      45             :   int busy;          /**< is the thread currently running? */
      46             :   int shutting_down; /**< has \a grpc_shutdown() been invoked? */
      47             :   int pending_join;  /**< has the thread finished but not been joined? */
      48             :   grpc_closure_list closures; /**< collection of pending work */
      49             :   gpr_thd_id tid; /**< thread id of the thread, only valid if \a busy or \a
      50             :                      pending_join are true */
      51             :   gpr_thd_options options;
      52             :   gpr_mu mu;
      53             : } grpc_executor;
      54             : 
      55             : static grpc_executor g_executor;
      56             : 
      57        3453 : void grpc_executor_init() {
      58        3453 :   memset(&g_executor, 0, sizeof(grpc_executor));
      59        3453 :   gpr_mu_init(&g_executor.mu);
      60        3453 :   g_executor.options = gpr_thd_options_default();
      61        3453 :   gpr_thd_options_set_joinable(&g_executor.options);
      62        3453 : }
      63             : 
      64             : /* thread body */
      65        5325 : static void closure_exec_thread_func(void *ignored) {
      66        5325 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
      67             :   while (1) {
      68       11255 :     gpr_mu_lock(&g_executor.mu);
      69       11255 :     if (g_executor.shutting_down != 0) {
      70         783 :       gpr_mu_unlock(&g_executor.mu);
      71         783 :       break;
      72             :     }
      73       10472 :     if (grpc_closure_list_empty(g_executor.closures)) {
      74             :       /* no more work, time to die */
      75        4542 :       GPR_ASSERT(g_executor.busy == 1);
      76        4542 :       g_executor.busy = 0;
      77        4542 :       gpr_mu_unlock(&g_executor.mu);
      78        4542 :       break;
      79             :     } else {
      80        5930 :       grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures);
      81             :     }
      82        5930 :     gpr_mu_unlock(&g_executor.mu);
      83        5930 :     grpc_exec_ctx_flush(&exec_ctx);
      84        5930 :   }
      85        5325 :   grpc_exec_ctx_finish(&exec_ctx);
      86        5325 : }
      87             : 
      88             : /* Spawn the thread if new work has arrived a no thread is up */
      89        6016 : static void maybe_spawn_locked() {
      90        6016 :   if (grpc_closure_list_empty(g_executor.closures) == 1) {
      91           0 :     return;
      92             :   }
      93        6016 :   if (g_executor.shutting_down == 1) {
      94           0 :     return;
      95             :   }
      96             : 
      97        6016 :   if (g_executor.busy != 0) {
      98             :     /* Thread still working. New work will be picked up by already running
      99             :      * thread. Not spawning anything. */
     100         691 :     return;
     101        5325 :   } else if (g_executor.pending_join != 0) {
     102             :     /* Pickup the remains of the previous incarnations of the thread. */
     103        2394 :     gpr_thd_join(g_executor.tid);
     104        2394 :     g_executor.pending_join = 0;
     105             :   }
     106             : 
     107             :   /* All previous instances of the thread should have been joined at this point.
     108             :    * Spawn time! */
     109        5325 :   g_executor.busy = 1;
     110        5325 :   gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL,
     111             :               &g_executor.options);
     112        5325 :   g_executor.pending_join = 1;
     113             : }
     114             : 
     115        6016 : void grpc_executor_enqueue(grpc_closure *closure, int success) {
     116        6016 :   gpr_mu_lock(&g_executor.mu);
     117        6016 :   if (g_executor.shutting_down == 0) {
     118        6016 :     grpc_closure_list_add(&g_executor.closures, closure, success);
     119        6016 :     maybe_spawn_locked();
     120             :   }
     121        6016 :   gpr_mu_unlock(&g_executor.mu);
     122        6016 : }
     123             : 
     124        3451 : void grpc_executor_shutdown() {
     125             :   int pending_join;
     126        3451 :   grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
     127             : 
     128        3451 :   gpr_mu_lock(&g_executor.mu);
     129        3451 :   pending_join = g_executor.pending_join;
     130        3451 :   g_executor.shutting_down = 1;
     131        3451 :   gpr_mu_unlock(&g_executor.mu);
     132             :   /* we can release the lock at this point despite the access to the closure
     133             :    * list below because we aren't accepting new work */
     134             : 
     135             :   /* Execute pending callbacks, some may be performing cleanups */
     136        3451 :   grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures);
     137        3451 :   grpc_exec_ctx_finish(&exec_ctx);
     138        3451 :   GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
     139        3451 :   if (pending_join) {
     140        2929 :     gpr_thd_join(g_executor.tid);
     141             :   }
     142        3451 :   gpr_mu_destroy(&g_executor.mu);
     143        3451 : }

Generated by: LCOV version 1.11