Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/channel/subchannel_call_holder.h"
35 :
36 : #include <grpc/support/alloc.h>
37 :
38 : #include "src/core/profiling/timers.h"
39 :
40 : #define GET_CALL(holder) \
41 : ((grpc_subchannel_call *)(gpr_atm_acq_load(&(holder)->subchannel_call)))
42 :
43 : #define CANCELLED_CALL ((grpc_subchannel_call *)1)
44 :
45 : static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *holder,
46 : int success);
47 : static void call_ready(grpc_exec_ctx *exec_ctx, void *holder, int success);
48 : static void retry_ops(grpc_exec_ctx *exec_ctx, void *retry_ops_args,
49 : int success);
50 :
51 : static void add_waiting_locked(grpc_subchannel_call_holder *holder,
52 : grpc_transport_stream_op *op);
53 : static void fail_locked(grpc_exec_ctx *exec_ctx,
54 : grpc_subchannel_call_holder *holder);
55 : static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
56 : grpc_subchannel_call_holder *holder);
57 :
58 2166623 : void grpc_subchannel_call_holder_init(
59 : grpc_subchannel_call_holder *holder,
60 : grpc_subchannel_call_holder_pick_subchannel pick_subchannel,
61 : void *pick_subchannel_arg) {
62 2166623 : gpr_atm_rel_store(&holder->subchannel_call, 0);
63 2166623 : holder->pick_subchannel = pick_subchannel;
64 2166623 : holder->pick_subchannel_arg = pick_subchannel_arg;
65 2166623 : gpr_mu_init(&holder->mu);
66 2167689 : holder->subchannel = NULL;
67 2167689 : holder->waiting_ops = NULL;
68 2167689 : holder->waiting_ops_count = 0;
69 2167689 : holder->waiting_ops_capacity = 0;
70 2167689 : holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
71 2167689 : }
72 :
73 2167547 : void grpc_subchannel_call_holder_destroy(grpc_exec_ctx *exec_ctx,
74 : grpc_subchannel_call_holder *holder) {
75 2167547 : grpc_subchannel_call *call = GET_CALL(holder);
76 2167547 : if (call != NULL && call != CANCELLED_CALL) {
77 2167052 : GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, call, "holder");
78 : }
79 2167724 : GPR_ASSERT(holder->creation_phase ==
80 : GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING);
81 2167724 : gpr_mu_destroy(&holder->mu);
82 2167719 : GPR_ASSERT(holder->waiting_ops_count == 0);
83 2167719 : gpr_free(holder->waiting_ops);
84 2167720 : }
85 :
86 4173230 : void grpc_subchannel_call_holder_perform_op(grpc_exec_ctx *exec_ctx,
87 : grpc_subchannel_call_holder *holder,
88 : grpc_transport_stream_op *op) {
89 : /* try to (atomically) get the call */
90 4173230 : grpc_subchannel_call *call = GET_CALL(holder);
91 : GPR_TIMER_BEGIN("grpc_subchannel_call_holder_perform_op", 0);
92 4173230 : if (call == CANCELLED_CALL) {
93 156 : grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
94 : GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
95 156 : return;
96 : }
97 4173074 : if (call != NULL) {
98 1997030 : grpc_subchannel_call_process_op(exec_ctx, call, op);
99 : GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
100 1997311 : return;
101 : }
102 : /* we failed; lock and figure out what to do */
103 2176044 : gpr_mu_lock(&holder->mu);
104 : retry:
105 : /* need to recheck that another thread hasn't set the call */
106 4333500 : call = GET_CALL(holder);
107 4333500 : if (call == CANCELLED_CALL) {
108 0 : gpr_mu_unlock(&holder->mu);
109 0 : grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
110 : GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
111 0 : return;
112 : }
113 4333500 : if (call != NULL) {
114 2156930 : gpr_mu_unlock(&holder->mu);
115 2156950 : grpc_subchannel_call_process_op(exec_ctx, call, op);
116 : GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
117 2156944 : return;
118 : }
119 : /* if this is a cancellation, then we can raise our cancelled flag */
120 2176570 : if (op->cancel_with_status != GRPC_STATUS_OK) {
121 546 : if (!gpr_atm_rel_cas(&holder->subchannel_call, 0, 1)) {
122 0 : goto retry;
123 : } else {
124 546 : switch (holder->creation_phase) {
125 : case GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING:
126 359 : fail_locked(exec_ctx, holder);
127 359 : break;
128 : case GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL:
129 22 : grpc_subchannel_cancel_create_call(exec_ctx, holder->subchannel,
130 : &holder->subchannel_call);
131 22 : break;
132 : case GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL:
133 165 : holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg, NULL,
134 : &holder->subchannel, NULL);
135 165 : break;
136 : }
137 546 : gpr_mu_unlock(&holder->mu);
138 546 : grpc_transport_stream_op_finish_with_failure(exec_ctx, op);
139 : GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
140 546 : return;
141 : }
142 : }
143 : /* if we don't have a subchannel, try to get one */
144 4343843 : if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
145 4335236 : holder->subchannel == NULL && op->send_initial_metadata != NULL) {
146 2167517 : holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL;
147 2167517 : grpc_closure_init(&holder->next_step, subchannel_ready, holder);
148 2167520 : if (holder->pick_subchannel(exec_ctx, holder->pick_subchannel_arg,
149 : op->send_initial_metadata, &holder->subchannel,
150 : &holder->next_step)) {
151 2157279 : holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
152 : }
153 : }
154 : /* if we've got a subchannel, then let's ask it to create a call */
155 4333634 : if (holder->creation_phase == GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING &&
156 2157629 : holder->subchannel != NULL) {
157 2157265 : holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL;
158 2157265 : grpc_closure_init(&holder->next_step, call_ready, holder);
159 2157263 : if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
160 : holder->pollset, &holder->subchannel_call,
161 : &holder->next_step)) {
162 : /* got one immediately - continue the op (and any waiting ops) */
163 2156955 : holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
164 2156955 : retry_waiting_locked(exec_ctx, holder);
165 2156932 : goto retry;
166 : }
167 : }
168 : /* nothing to be done but wait */
169 19080 : add_waiting_locked(holder, op);
170 19361 : gpr_mu_unlock(&holder->mu);
171 : GPR_TIMER_END("grpc_subchannel_call_holder_perform_op", 0);
172 : }
173 :
174 10214 : static void subchannel_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
175 10172 : grpc_subchannel_call_holder *holder = arg;
176 : grpc_subchannel_call *call;
177 10214 : gpr_mu_lock(&holder->mu);
178 10219 : GPR_ASSERT(holder->creation_phase ==
179 : GRPC_SUBCHANNEL_CALL_HOLDER_PICKING_SUBCHANNEL);
180 10219 : call = GET_CALL(holder);
181 10219 : GPR_ASSERT(call == NULL || call == CANCELLED_CALL);
182 10219 : if (holder->subchannel == NULL) {
183 164 : holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
184 164 : fail_locked(exec_ctx, holder);
185 : } else {
186 10055 : grpc_closure_init(&holder->next_step, call_ready, holder);
187 10052 : if (grpc_subchannel_create_call(exec_ctx, holder->subchannel,
188 : holder->pollset, &holder->subchannel_call,
189 : &holder->next_step)) {
190 10059 : holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
191 : /* got one immediately - continue the op (and any waiting ops) */
192 10059 : retry_waiting_locked(exec_ctx, holder);
193 : }
194 : }
195 10200 : gpr_mu_unlock(&holder->mu);
196 10218 : }
197 :
198 348 : static void call_ready(grpc_exec_ctx *exec_ctx, void *arg, int success) {
199 348 : grpc_subchannel_call_holder *holder = arg;
200 : GPR_TIMER_BEGIN("call_ready", 0);
201 348 : gpr_mu_lock(&holder->mu);
202 348 : GPR_ASSERT(holder->creation_phase ==
203 : GRPC_SUBCHANNEL_CALL_HOLDER_CREATING_CALL);
204 348 : holder->creation_phase = GRPC_SUBCHANNEL_CALL_HOLDER_NOT_CREATING;
205 348 : if (GET_CALL(holder) != NULL) {
206 348 : retry_waiting_locked(exec_ctx, holder);
207 : } else {
208 0 : fail_locked(exec_ctx, holder);
209 : }
210 348 : gpr_mu_unlock(&holder->mu);
211 : GPR_TIMER_END("call_ready", 0);
212 348 : }
213 :
214 : typedef struct {
215 : grpc_transport_stream_op *ops;
216 : size_t nops;
217 : grpc_subchannel_call *call;
218 : } retry_ops_args;
219 :
220 2167187 : static void retry_waiting_locked(grpc_exec_ctx *exec_ctx,
221 : grpc_subchannel_call_holder *holder) {
222 2167187 : retry_ops_args *a = gpr_malloc(sizeof(*a));
223 2167307 : a->ops = holder->waiting_ops;
224 2167307 : a->nops = holder->waiting_ops_count;
225 2167307 : a->call = GET_CALL(holder);
226 2167307 : if (a->call == CANCELLED_CALL) {
227 23 : gpr_free(a);
228 23 : fail_locked(exec_ctx, holder);
229 2167290 : return;
230 : }
231 2167284 : holder->waiting_ops = NULL;
232 2167284 : holder->waiting_ops_count = 0;
233 2167284 : holder->waiting_ops_capacity = 0;
234 2167284 : GRPC_SUBCHANNEL_CALL_REF(a->call, "retry_ops");
235 2167304 : grpc_exec_ctx_enqueue(exec_ctx, grpc_closure_create(retry_ops, a), 1);
236 : }
237 :
238 2167243 : static void retry_ops(grpc_exec_ctx *exec_ctx, void *args, int success) {
239 2167154 : retry_ops_args *a = args;
240 : size_t i;
241 2186315 : for (i = 0; i < a->nops; i++) {
242 19016 : grpc_subchannel_call_process_op(exec_ctx, a->call, &a->ops[i]);
243 : }
244 2167299 : GRPC_SUBCHANNEL_CALL_UNREF(exec_ctx, a->call, "retry_ops");
245 2167336 : gpr_free(a->ops);
246 2167212 : gpr_free(a);
247 2167326 : }
248 :
249 19361 : static void add_waiting_locked(grpc_subchannel_call_holder *holder,
250 : grpc_transport_stream_op *op) {
251 : GPR_TIMER_BEGIN("add_waiting_locked", 0);
252 19361 : if (holder->waiting_ops_count == holder->waiting_ops_capacity) {
253 10924 : holder->waiting_ops_capacity = GPR_MAX(3, 2 * holder->waiting_ops_capacity);
254 10924 : holder->waiting_ops =
255 10924 : gpr_realloc(holder->waiting_ops, holder->waiting_ops_capacity *
256 : sizeof(*holder->waiting_ops));
257 : }
258 19361 : holder->waiting_ops[holder->waiting_ops_count++] = *op;
259 : GPR_TIMER_END("add_waiting_locked", 0);
260 19361 : }
261 :
262 546 : static void fail_locked(grpc_exec_ctx *exec_ctx,
263 : grpc_subchannel_call_holder *holder) {
264 : size_t i;
265 795 : for (i = 0; i < holder->waiting_ops_count; i++) {
266 271 : grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].on_complete, 0);
267 271 : grpc_exec_ctx_enqueue(exec_ctx, holder->waiting_ops[i].recv_message_ready,
268 : 0);
269 : }
270 546 : holder->waiting_ops_count = 0;
271 546 : }
272 :
273 816 : char *grpc_subchannel_call_holder_get_peer(grpc_exec_ctx *exec_ctx,
274 : grpc_subchannel_call_holder *holder,
275 : grpc_channel *master) {
276 816 : grpc_subchannel_call *subchannel_call = GET_CALL(holder);
277 :
278 816 : if (subchannel_call) {
279 434 : return grpc_subchannel_call_get_peer(exec_ctx, subchannel_call);
280 : } else {
281 382 : return grpc_channel_get_target(master);
282 : }
283 : }
|