Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "test/core/end2end/fixtures/proxy.h"
35 :
36 : #include <string.h>
37 :
38 : #include <grpc/support/alloc.h>
39 : #include <grpc/support/host_port.h>
40 : #include <grpc/support/log.h>
41 : #include <grpc/support/sync.h>
42 : #include <grpc/support/thd.h>
43 : #include <grpc/support/useful.h>
44 :
45 : #include "test/core/util/port.h"
46 :
47 : struct grpc_end2end_proxy {
48 : gpr_thd_id thd;
49 : char *proxy_port;
50 : char *server_port;
51 : grpc_completion_queue *cq;
52 : grpc_server *server;
53 : grpc_channel *client;
54 :
55 : int shutdown;
56 :
57 : /* requested call */
58 : grpc_call *new_call;
59 : grpc_call_details new_call_details;
60 : grpc_metadata_array new_call_metadata;
61 : };
62 :
63 : typedef struct {
64 : void (*func)(void *arg, int success);
65 : void *arg;
66 : } closure;
67 :
68 : typedef struct {
69 : gpr_refcount refs;
70 : grpc_end2end_proxy *proxy;
71 :
72 : grpc_call *c2p;
73 : grpc_call *p2s;
74 :
75 : grpc_metadata_array c2p_initial_metadata;
76 : grpc_metadata_array p2s_initial_metadata;
77 :
78 : grpc_byte_buffer *c2p_msg;
79 : grpc_byte_buffer *p2s_msg;
80 :
81 : grpc_metadata_array p2s_trailing_metadata;
82 : grpc_status_code p2s_status;
83 : char *p2s_status_details;
84 : size_t p2s_status_details_capacity;
85 :
86 : int c2p_server_cancelled;
87 : } proxy_call;
88 :
89 : static void thread_main(void *arg);
90 : static void request_call(grpc_end2end_proxy *proxy);
91 :
92 194 : grpc_end2end_proxy *grpc_end2end_proxy_create(
93 : const grpc_end2end_proxy_def *def) {
94 194 : gpr_thd_options opt = gpr_thd_options_default();
95 194 : int proxy_port = grpc_pick_unused_port_or_die();
96 194 : int server_port = grpc_pick_unused_port_or_die();
97 :
98 194 : grpc_end2end_proxy *proxy = gpr_malloc(sizeof(*proxy));
99 194 : memset(proxy, 0, sizeof(*proxy));
100 :
101 194 : gpr_join_host_port(&proxy->proxy_port, "localhost", proxy_port);
102 194 : gpr_join_host_port(&proxy->server_port, "localhost", server_port);
103 :
104 194 : gpr_log(GPR_DEBUG, "PROXY ADDR:%s BACKEND:%s", proxy->proxy_port,
105 : proxy->server_port);
106 :
107 194 : proxy->cq = grpc_completion_queue_create(NULL);
108 194 : proxy->server = def->create_server(proxy->proxy_port);
109 194 : proxy->client = def->create_client(proxy->server_port);
110 :
111 194 : grpc_server_register_completion_queue(proxy->server, proxy->cq, NULL);
112 194 : grpc_server_start(proxy->server);
113 :
114 194 : gpr_thd_options_set_joinable(&opt);
115 194 : GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt));
116 :
117 194 : request_call(proxy);
118 :
119 194 : return proxy;
120 : }
121 :
122 4056 : static closure *new_closure(void (*func)(void *arg, int success), void *arg) {
123 4056 : closure *cl = gpr_malloc(sizeof(*cl));
124 4056 : cl->func = func;
125 4056 : cl->arg = arg;
126 4056 : return cl;
127 : }
128 :
129 194 : static void shutdown_complete(void *arg, int success) {
130 194 : grpc_end2end_proxy *proxy = arg;
131 194 : proxy->shutdown = 1;
132 194 : grpc_completion_queue_shutdown(proxy->cq);
133 194 : }
134 :
135 194 : void grpc_end2end_proxy_destroy(grpc_end2end_proxy *proxy) {
136 194 : grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
137 194 : new_closure(shutdown_complete, proxy));
138 194 : gpr_thd_join(proxy->thd);
139 194 : gpr_free(proxy->proxy_port);
140 194 : gpr_free(proxy->server_port);
141 194 : grpc_server_destroy(proxy->server);
142 194 : grpc_channel_destroy(proxy->client);
143 194 : grpc_completion_queue_destroy(proxy->cq);
144 194 : grpc_call_details_destroy(&proxy->new_call_details);
145 194 : gpr_free(proxy);
146 194 : }
147 :
148 3668 : static void unrefpc(proxy_call *pc, const char *reason) {
149 3668 : if (gpr_unref(&pc->refs)) {
150 286 : grpc_call_destroy(pc->c2p);
151 286 : grpc_call_destroy(pc->p2s);
152 286 : grpc_metadata_array_destroy(&pc->c2p_initial_metadata);
153 286 : grpc_metadata_array_destroy(&pc->p2s_initial_metadata);
154 286 : grpc_metadata_array_destroy(&pc->p2s_trailing_metadata);
155 286 : gpr_free(pc->p2s_status_details);
156 286 : gpr_free(pc);
157 : }
158 3668 : }
159 :
160 3382 : static void refpc(proxy_call *pc, const char *reason) { gpr_ref(&pc->refs); }
161 :
162 286 : static void on_c2p_sent_initial_metadata(void *arg, int success) {
163 286 : proxy_call *pc = arg;
164 286 : unrefpc(pc, "on_c2p_sent_initial_metadata");
165 286 : }
166 :
167 286 : static void on_p2s_recv_initial_metadata(void *arg, int success) {
168 286 : proxy_call *pc = arg;
169 : grpc_op op;
170 : grpc_call_error err;
171 :
172 286 : if (!pc->proxy->shutdown) {
173 286 : op.op = GRPC_OP_SEND_INITIAL_METADATA;
174 286 : op.flags = 0;
175 286 : op.reserved = NULL;
176 286 : op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
177 286 : op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata;
178 286 : refpc(pc, "on_c2p_sent_initial_metadata");
179 286 : err = grpc_call_start_batch(
180 286 : pc->c2p, &op, 1, new_closure(on_c2p_sent_initial_metadata, pc), NULL);
181 286 : GPR_ASSERT(err == GRPC_CALL_OK);
182 : }
183 :
184 286 : unrefpc(pc, "on_p2s_recv_initial_metadata");
185 286 : }
186 :
187 286 : static void on_p2s_sent_initial_metadata(void *arg, int success) {
188 286 : proxy_call *pc = arg;
189 286 : unrefpc(pc, "on_p2s_sent_initial_metadata");
190 286 : }
191 :
192 : static void on_c2p_recv_msg(void *arg, int success);
193 :
194 214 : static void on_p2s_sent_message(void *arg, int success) {
195 214 : proxy_call *pc = arg;
196 : grpc_op op;
197 : grpc_call_error err;
198 :
199 214 : grpc_byte_buffer_destroy(pc->c2p_msg);
200 214 : if (!pc->proxy->shutdown && success) {
201 214 : op.op = GRPC_OP_RECV_MESSAGE;
202 214 : op.flags = 0;
203 214 : op.reserved = NULL;
204 214 : op.data.recv_message = &pc->c2p_msg;
205 214 : refpc(pc, "on_c2p_recv_msg");
206 214 : err = grpc_call_start_batch(pc->c2p, &op, 1,
207 214 : new_closure(on_c2p_recv_msg, pc), NULL);
208 214 : GPR_ASSERT(err == GRPC_CALL_OK);
209 : }
210 :
211 214 : unrefpc(pc, "on_p2s_sent_message");
212 214 : }
213 :
214 284 : static void on_p2s_sent_close(void *arg, int success) {
215 284 : proxy_call *pc = arg;
216 284 : unrefpc(pc, "on_p2s_sent_close");
217 284 : }
218 :
219 500 : static void on_c2p_recv_msg(void *arg, int success) {
220 500 : proxy_call *pc = arg;
221 : grpc_op op;
222 : grpc_call_error err;
223 :
224 500 : if (!pc->proxy->shutdown && success) {
225 498 : if (pc->c2p_msg != NULL) {
226 214 : op.op = GRPC_OP_SEND_MESSAGE;
227 214 : op.flags = 0;
228 214 : op.reserved = NULL;
229 214 : op.data.send_message = pc->c2p_msg;
230 214 : refpc(pc, "on_p2s_sent_message");
231 214 : err = grpc_call_start_batch(pc->p2s, &op, 1,
232 214 : new_closure(on_p2s_sent_message, pc), NULL);
233 214 : GPR_ASSERT(err == GRPC_CALL_OK);
234 : } else {
235 284 : op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
236 284 : op.flags = 0;
237 284 : op.reserved = NULL;
238 284 : refpc(pc, "on_p2s_sent_close");
239 284 : err = grpc_call_start_batch(pc->p2s, &op, 1,
240 284 : new_closure(on_p2s_sent_close, pc), NULL);
241 284 : GPR_ASSERT(err == GRPC_CALL_OK);
242 : }
243 : }
244 :
245 500 : unrefpc(pc, "on_c2p_recv_msg");
246 500 : }
247 :
248 : static void on_p2s_recv_msg(void *arg, int success);
249 :
250 192 : static void on_c2p_sent_message(void *arg, int success) {
251 192 : proxy_call *pc = arg;
252 : grpc_op op;
253 : grpc_call_error err;
254 :
255 192 : grpc_byte_buffer_destroy(pc->p2s_msg);
256 192 : if (!pc->proxy->shutdown && success) {
257 190 : op.op = GRPC_OP_RECV_MESSAGE;
258 190 : op.flags = 0;
259 190 : op.reserved = NULL;
260 190 : op.data.recv_message = &pc->p2s_msg;
261 190 : refpc(pc, "on_p2s_recv_msg");
262 190 : err = grpc_call_start_batch(pc->p2s, &op, 1,
263 190 : new_closure(on_p2s_recv_msg, pc), NULL);
264 190 : GPR_ASSERT(err == GRPC_CALL_OK);
265 : }
266 :
267 192 : unrefpc(pc, "on_c2p_sent_message");
268 192 : }
269 :
270 476 : static void on_p2s_recv_msg(void *arg, int success) {
271 476 : proxy_call *pc = arg;
272 : grpc_op op;
273 : grpc_call_error err;
274 :
275 476 : if (!pc->proxy->shutdown && success && pc->p2s_msg) {
276 192 : op.op = GRPC_OP_SEND_MESSAGE;
277 192 : op.flags = 0;
278 192 : op.reserved = NULL;
279 192 : op.data.send_message = pc->p2s_msg;
280 192 : refpc(pc, "on_c2p_sent_message");
281 192 : err = grpc_call_start_batch(pc->c2p, &op, 1,
282 192 : new_closure(on_c2p_sent_message, pc), NULL);
283 192 : GPR_ASSERT(err == GRPC_CALL_OK);
284 : }
285 476 : unrefpc(pc, "on_p2s_recv_msg");
286 476 : }
287 :
288 286 : static void on_c2p_sent_status(void *arg, int success) {
289 286 : proxy_call *pc = arg;
290 286 : unrefpc(pc, "on_c2p_sent_status");
291 286 : }
292 :
293 286 : static void on_p2s_status(void *arg, int success) {
294 286 : proxy_call *pc = arg;
295 : grpc_op op;
296 : grpc_call_error err;
297 :
298 286 : if (!pc->proxy->shutdown) {
299 286 : GPR_ASSERT(success);
300 286 : op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
301 286 : op.flags = 0;
302 286 : op.reserved = NULL;
303 286 : op.data.send_status_from_server.trailing_metadata_count =
304 286 : pc->p2s_trailing_metadata.count;
305 286 : op.data.send_status_from_server.trailing_metadata =
306 286 : pc->p2s_trailing_metadata.metadata;
307 286 : op.data.send_status_from_server.status = pc->p2s_status;
308 286 : op.data.send_status_from_server.status_details = pc->p2s_status_details;
309 286 : refpc(pc, "on_c2p_sent_status");
310 286 : err = grpc_call_start_batch(pc->c2p, &op, 1,
311 286 : new_closure(on_c2p_sent_status, pc), NULL);
312 286 : GPR_ASSERT(err == GRPC_CALL_OK);
313 : }
314 :
315 286 : unrefpc(pc, "on_p2s_status");
316 286 : }
317 :
318 286 : static void on_c2p_closed(void *arg, int success) {
319 286 : proxy_call *pc = arg;
320 286 : unrefpc(pc, "on_c2p_closed");
321 286 : }
322 :
323 480 : static void on_new_call(void *arg, int success) {
324 480 : grpc_end2end_proxy *proxy = arg;
325 : grpc_call_error err;
326 :
327 480 : if (success) {
328 : grpc_op op;
329 286 : proxy_call *pc = gpr_malloc(sizeof(*pc));
330 286 : memset(pc, 0, sizeof(*pc));
331 286 : pc->proxy = proxy;
332 286 : GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata,
333 : proxy->new_call_metadata);
334 286 : pc->c2p = proxy->new_call;
335 286 : pc->p2s = grpc_channel_create_call(
336 : proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq,
337 286 : proxy->new_call_details.method, proxy->new_call_details.host,
338 : proxy->new_call_details.deadline, NULL);
339 286 : gpr_ref_init(&pc->refs, 1);
340 :
341 286 : op.flags = 0;
342 286 : op.reserved = NULL;
343 :
344 286 : op.op = GRPC_OP_RECV_INITIAL_METADATA;
345 286 : op.data.recv_initial_metadata = &pc->p2s_initial_metadata;
346 286 : refpc(pc, "on_p2s_recv_initial_metadata");
347 286 : err = grpc_call_start_batch(
348 286 : pc->p2s, &op, 1, new_closure(on_p2s_recv_initial_metadata, pc), NULL);
349 286 : GPR_ASSERT(err == GRPC_CALL_OK);
350 :
351 286 : op.op = GRPC_OP_SEND_INITIAL_METADATA;
352 286 : op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count;
353 286 : op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata;
354 286 : refpc(pc, "on_p2s_sent_initial_metadata");
355 286 : err = grpc_call_start_batch(
356 286 : pc->p2s, &op, 1, new_closure(on_p2s_sent_initial_metadata, pc), NULL);
357 286 : GPR_ASSERT(err == GRPC_CALL_OK);
358 :
359 286 : op.op = GRPC_OP_RECV_MESSAGE;
360 286 : op.data.recv_message = &pc->c2p_msg;
361 286 : refpc(pc, "on_c2p_recv_msg");
362 286 : err = grpc_call_start_batch(pc->c2p, &op, 1,
363 286 : new_closure(on_c2p_recv_msg, pc), NULL);
364 286 : GPR_ASSERT(err == GRPC_CALL_OK);
365 :
366 286 : op.op = GRPC_OP_RECV_MESSAGE;
367 286 : op.data.recv_message = &pc->p2s_msg;
368 286 : refpc(pc, "on_p2s_recv_msg");
369 286 : err = grpc_call_start_batch(pc->p2s, &op, 1,
370 286 : new_closure(on_p2s_recv_msg, pc), NULL);
371 286 : GPR_ASSERT(err == GRPC_CALL_OK);
372 :
373 286 : op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
374 286 : op.data.recv_status_on_client.trailing_metadata =
375 286 : &pc->p2s_trailing_metadata;
376 286 : op.data.recv_status_on_client.status = &pc->p2s_status;
377 286 : op.data.recv_status_on_client.status_details = &pc->p2s_status_details;
378 286 : op.data.recv_status_on_client.status_details_capacity =
379 286 : &pc->p2s_status_details_capacity;
380 286 : refpc(pc, "on_p2s_status");
381 286 : err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc),
382 : NULL);
383 286 : GPR_ASSERT(err == GRPC_CALL_OK);
384 :
385 286 : op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
386 286 : op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled;
387 286 : refpc(pc, "on_c2p_closed");
388 286 : err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc),
389 : NULL);
390 286 : GPR_ASSERT(err == GRPC_CALL_OK);
391 :
392 286 : request_call(proxy);
393 :
394 286 : unrefpc(pc, "init");
395 : } else {
396 194 : GPR_ASSERT(proxy->new_call == NULL);
397 : }
398 480 : }
399 :
400 480 : static void request_call(grpc_end2end_proxy *proxy) {
401 480 : proxy->new_call = NULL;
402 480 : GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
403 : proxy->server, &proxy->new_call,
404 : &proxy->new_call_details,
405 : &proxy->new_call_metadata, proxy->cq,
406 : proxy->cq, new_closure(on_new_call, proxy)));
407 480 : }
408 :
409 194 : static void thread_main(void *arg) {
410 194 : grpc_end2end_proxy *proxy = arg;
411 : closure *cl;
412 : for (;;) {
413 4250 : grpc_event ev = grpc_completion_queue_next(
414 : proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
415 4250 : switch (ev.type) {
416 : case GRPC_QUEUE_TIMEOUT:
417 0 : gpr_log(GPR_ERROR, "Should never reach here");
418 0 : abort();
419 : case GRPC_QUEUE_SHUTDOWN:
420 388 : return;
421 : case GRPC_OP_COMPLETE:
422 4056 : cl = ev.tag;
423 4056 : cl->func(cl->arg, ev.success);
424 4056 : gpr_free(cl);
425 4056 : break;
426 : }
427 4056 : }
428 : }
429 :
430 194 : const char *grpc_end2end_proxy_get_client_target(grpc_end2end_proxy *proxy) {
431 194 : return proxy->proxy_port;
432 : }
433 :
434 197 : const char *grpc_end2end_proxy_get_server_port(grpc_end2end_proxy *proxy) {
435 197 : return proxy->server_port;
436 : }
|