Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/surface/channel.h"
35 :
36 : #include <grpc/support/alloc.h>
37 : #include <grpc/support/log.h>
38 :
39 : #include "src/core/channel/client_channel.h"
40 : #include "src/core/channel/client_uchannel.h"
41 : #include "src/core/iomgr/timer.h"
42 : #include "src/core/surface/api_trace.h"
43 : #include "src/core/surface/completion_queue.h"
44 :
45 438 : grpc_connectivity_state grpc_channel_check_connectivity_state(
46 : grpc_channel *channel, int try_to_connect) {
47 : /* forward through to the underlying client channel */
48 438 : grpc_channel_element *client_channel_elem =
49 438 : grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
50 438 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
51 : grpc_connectivity_state state;
52 438 : GRPC_API_TRACE(
53 : "grpc_channel_check_connectivity_state(channel=%p, try_to_connect=%d)", 2,
54 : (channel, try_to_connect));
55 438 : if (client_channel_elem->filter == &grpc_client_channel_filter) {
56 422 : state = grpc_client_channel_check_connectivity_state(
57 : &exec_ctx, client_channel_elem, try_to_connect);
58 422 : grpc_exec_ctx_finish(&exec_ctx);
59 422 : return state;
60 : }
61 16 : if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
62 16 : state = grpc_client_uchannel_check_connectivity_state(
63 : &exec_ctx, client_channel_elem, try_to_connect);
64 16 : grpc_exec_ctx_finish(&exec_ctx);
65 16 : return state;
66 : }
67 0 : gpr_log(GPR_ERROR,
68 : "grpc_channel_check_connectivity_state called on something that is "
69 : "not a (u)client channel, but '%s'",
70 0 : client_channel_elem->filter->name);
71 0 : grpc_exec_ctx_finish(&exec_ctx);
72 0 : return GRPC_CHANNEL_FATAL_FAILURE;
73 : }
74 :
75 : typedef enum {
76 : WAITING,
77 : CALLING_BACK,
78 : CALLING_BACK_AND_FINISHED,
79 : CALLED_BACK
80 : } callback_phase;
81 :
82 : typedef struct {
83 : gpr_mu mu;
84 : callback_phase phase;
85 : int success;
86 : int removed;
87 : grpc_closure on_complete;
88 : grpc_timer alarm;
89 : grpc_connectivity_state state;
90 : grpc_completion_queue *cq;
91 : grpc_cq_completion completion_storage;
92 : grpc_channel *channel;
93 : void *tag;
94 : } state_watcher;
95 :
96 140 : static void delete_state_watcher(grpc_exec_ctx *exec_ctx, state_watcher *w) {
97 140 : grpc_channel_element *client_channel_elem = grpc_channel_stack_last_element(
98 : grpc_channel_get_channel_stack(w->channel));
99 140 : if (client_channel_elem->filter == &grpc_client_channel_filter) {
100 128 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel,
101 : "watch_channel_connectivity");
102 12 : } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
103 12 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, w->channel,
104 : "watch_uchannel_connectivity");
105 : } else {
106 0 : abort();
107 : }
108 140 : gpr_mu_destroy(&w->mu);
109 140 : gpr_free(w);
110 140 : }
111 :
112 140 : static void finished_completion(grpc_exec_ctx *exec_ctx, void *pw,
113 : grpc_cq_completion *ignored) {
114 128 : int delete = 0;
115 128 : state_watcher *w = pw;
116 140 : gpr_mu_lock(&w->mu);
117 140 : switch (w->phase) {
118 : case WAITING:
119 : case CALLED_BACK:
120 0 : GPR_UNREACHABLE_CODE(return );
121 : case CALLING_BACK:
122 58 : w->phase = CALLED_BACK;
123 58 : break;
124 : case CALLING_BACK_AND_FINISHED:
125 76 : delete = 1;
126 82 : break;
127 : }
128 140 : gpr_mu_unlock(&w->mu);
129 :
130 140 : if (delete) {
131 82 : delete_state_watcher(exec_ctx, w);
132 : }
133 140 : }
134 :
135 280 : static void partly_done(grpc_exec_ctx *exec_ctx, state_watcher *w,
136 : int due_to_completion) {
137 256 : int delete = 0;
138 256 : grpc_channel_element *client_channel_elem = NULL;
139 :
140 280 : gpr_mu_lock(&w->mu);
141 280 : if (w->removed == 0) {
142 140 : w->removed = 1;
143 140 : client_channel_elem = grpc_channel_stack_last_element(
144 : grpc_channel_get_channel_stack(w->channel));
145 140 : if (client_channel_elem->filter == &grpc_client_channel_filter) {
146 128 : grpc_client_channel_del_interested_party(exec_ctx, client_channel_elem,
147 : grpc_cq_pollset(w->cq));
148 : } else {
149 12 : grpc_client_uchannel_del_interested_party(exec_ctx, client_channel_elem,
150 : grpc_cq_pollset(w->cq));
151 : }
152 : }
153 280 : gpr_mu_unlock(&w->mu);
154 280 : if (due_to_completion) {
155 140 : gpr_mu_lock(&w->mu);
156 140 : w->success = 1;
157 140 : gpr_mu_unlock(&w->mu);
158 140 : grpc_timer_cancel(exec_ctx, &w->alarm);
159 : }
160 :
161 280 : gpr_mu_lock(&w->mu);
162 280 : switch (w->phase) {
163 : case WAITING:
164 140 : w->phase = CALLING_BACK;
165 140 : grpc_cq_end_op(exec_ctx, w->cq, w->tag, w->success, finished_completion,
166 : w, &w->completion_storage);
167 140 : break;
168 : case CALLING_BACK:
169 82 : w->phase = CALLING_BACK_AND_FINISHED;
170 82 : break;
171 : case CALLING_BACK_AND_FINISHED:
172 0 : GPR_UNREACHABLE_CODE(return );
173 : case CALLED_BACK:
174 52 : delete = 1;
175 58 : break;
176 : }
177 280 : gpr_mu_unlock(&w->mu);
178 :
179 280 : if (delete) {
180 58 : delete_state_watcher(exec_ctx, w);
181 : }
182 280 : }
183 :
184 140 : static void watch_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) {
185 140 : partly_done(exec_ctx, pw, 1);
186 140 : }
187 :
188 140 : static void timeout_complete(grpc_exec_ctx *exec_ctx, void *pw, int success) {
189 140 : partly_done(exec_ctx, pw, 0);
190 140 : }
191 :
192 140 : void grpc_channel_watch_connectivity_state(
193 : grpc_channel *channel, grpc_connectivity_state last_observed_state,
194 : gpr_timespec deadline, grpc_completion_queue *cq, void *tag) {
195 140 : grpc_channel_element *client_channel_elem =
196 140 : grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel));
197 140 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
198 140 : state_watcher *w = gpr_malloc(sizeof(*w));
199 :
200 140 : GRPC_API_TRACE(
201 : "grpc_channel_watch_connectivity_state("
202 : "channel=%p, last_observed_state=%d, "
203 : "deadline=gpr_timespec { tv_sec: %ld, tv_nsec: %d, clock_type: %d }, "
204 : "cq=%p, tag=%p)",
205 : 7, (channel, (int)last_observed_state, (long)deadline.tv_sec,
206 : deadline.tv_nsec, (int)deadline.clock_type, cq, tag));
207 :
208 140 : grpc_cq_begin_op(cq);
209 :
210 140 : gpr_mu_init(&w->mu);
211 140 : grpc_closure_init(&w->on_complete, watch_complete, w);
212 140 : w->phase = WAITING;
213 140 : w->state = last_observed_state;
214 140 : w->success = 0;
215 140 : w->removed = 0;
216 140 : w->cq = cq;
217 140 : w->tag = tag;
218 140 : w->channel = channel;
219 :
220 140 : grpc_timer_init(&exec_ctx, &w->alarm,
221 : gpr_convert_clock_type(deadline, GPR_CLOCK_MONOTONIC),
222 : timeout_complete, w, gpr_now(GPR_CLOCK_MONOTONIC));
223 :
224 140 : if (client_channel_elem->filter == &grpc_client_channel_filter) {
225 128 : GRPC_CHANNEL_INTERNAL_REF(channel, "watch_channel_connectivity");
226 128 : grpc_client_channel_add_interested_party(&exec_ctx, client_channel_elem,
227 : grpc_cq_pollset(cq));
228 128 : grpc_client_channel_watch_connectivity_state(&exec_ctx, client_channel_elem,
229 : &w->state, &w->on_complete);
230 12 : } else if (client_channel_elem->filter == &grpc_client_uchannel_filter) {
231 12 : GRPC_CHANNEL_INTERNAL_REF(channel, "watch_uchannel_connectivity");
232 12 : grpc_client_uchannel_add_interested_party(&exec_ctx, client_channel_elem,
233 : grpc_cq_pollset(cq));
234 12 : grpc_client_uchannel_watch_connectivity_state(
235 : &exec_ctx, client_channel_elem, &w->state, &w->on_complete);
236 : }
237 :
238 140 : grpc_exec_ctx_finish(&exec_ctx);
239 140 : }
|