Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/iomgr/executor.h"
35 :
36 : #include <string.h>
37 :
38 : #include <grpc/support/alloc.h>
39 : #include <grpc/support/log.h>
40 : #include <grpc/support/sync.h>
41 : #include <grpc/support/thd.h>
42 : #include "src/core/iomgr/exec_ctx.h"
43 :
44 : typedef struct grpc_executor_data {
45 : int busy; /**< is the thread currently running? */
46 : int shutting_down; /**< has \a grpc_shutdown() been invoked? */
47 : int pending_join; /**< has the thread finished but not been joined? */
48 : grpc_closure_list closures; /**< collection of pending work */
49 : gpr_thd_id tid; /**< thread id of the thread, only valid if \a busy or \a
50 : pending_join are true */
51 : gpr_thd_options options;
52 : gpr_mu mu;
53 : } grpc_executor;
54 :
55 : static grpc_executor g_executor;
56 :
57 3453 : void grpc_executor_init() {
58 3453 : memset(&g_executor, 0, sizeof(grpc_executor));
59 3453 : gpr_mu_init(&g_executor.mu);
60 3453 : g_executor.options = gpr_thd_options_default();
61 3453 : gpr_thd_options_set_joinable(&g_executor.options);
62 3453 : }
63 :
64 : /* thread body */
65 5325 : static void closure_exec_thread_func(void *ignored) {
66 5325 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
67 : while (1) {
68 11255 : gpr_mu_lock(&g_executor.mu);
69 11255 : if (g_executor.shutting_down != 0) {
70 783 : gpr_mu_unlock(&g_executor.mu);
71 783 : break;
72 : }
73 10472 : if (grpc_closure_list_empty(g_executor.closures)) {
74 : /* no more work, time to die */
75 4542 : GPR_ASSERT(g_executor.busy == 1);
76 4542 : g_executor.busy = 0;
77 4542 : gpr_mu_unlock(&g_executor.mu);
78 4542 : break;
79 : } else {
80 5930 : grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures);
81 : }
82 5930 : gpr_mu_unlock(&g_executor.mu);
83 5930 : grpc_exec_ctx_flush(&exec_ctx);
84 5930 : }
85 5325 : grpc_exec_ctx_finish(&exec_ctx);
86 5325 : }
87 :
88 : /* Spawn the thread if new work has arrived a no thread is up */
89 6016 : static void maybe_spawn_locked() {
90 6016 : if (grpc_closure_list_empty(g_executor.closures) == 1) {
91 0 : return;
92 : }
93 6016 : if (g_executor.shutting_down == 1) {
94 0 : return;
95 : }
96 :
97 6016 : if (g_executor.busy != 0) {
98 : /* Thread still working. New work will be picked up by already running
99 : * thread. Not spawning anything. */
100 691 : return;
101 5325 : } else if (g_executor.pending_join != 0) {
102 : /* Pickup the remains of the previous incarnations of the thread. */
103 2394 : gpr_thd_join(g_executor.tid);
104 2394 : g_executor.pending_join = 0;
105 : }
106 :
107 : /* All previous instances of the thread should have been joined at this point.
108 : * Spawn time! */
109 5325 : g_executor.busy = 1;
110 5325 : gpr_thd_new(&g_executor.tid, closure_exec_thread_func, NULL,
111 : &g_executor.options);
112 5325 : g_executor.pending_join = 1;
113 : }
114 :
115 6016 : void grpc_executor_enqueue(grpc_closure *closure, int success) {
116 6016 : gpr_mu_lock(&g_executor.mu);
117 6016 : if (g_executor.shutting_down == 0) {
118 6016 : grpc_closure_list_add(&g_executor.closures, closure, success);
119 6016 : maybe_spawn_locked();
120 : }
121 6016 : gpr_mu_unlock(&g_executor.mu);
122 6016 : }
123 :
124 3451 : void grpc_executor_shutdown() {
125 : int pending_join;
126 3451 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
127 :
128 3451 : gpr_mu_lock(&g_executor.mu);
129 3451 : pending_join = g_executor.pending_join;
130 3451 : g_executor.shutting_down = 1;
131 3451 : gpr_mu_unlock(&g_executor.mu);
132 : /* we can release the lock at this point despite the access to the closure
133 : * list below because we aren't accepting new work */
134 :
135 : /* Execute pending callbacks, some may be performing cleanups */
136 3451 : grpc_exec_ctx_enqueue_list(&exec_ctx, &g_executor.closures);
137 3451 : grpc_exec_ctx_finish(&exec_ctx);
138 3451 : GPR_ASSERT(grpc_closure_list_empty(g_executor.closures));
139 3451 : if (pending_join) {
140 2929 : gpr_thd_join(g_executor.tid);
141 : }
142 3451 : gpr_mu_destroy(&g_executor.mu);
143 3451 : }
|