LCOV - code coverage report
Current view: top level - test/core/end2end/fixtures - proxy.c (source / functions) Hit Total Coverage
Test: tmp.zDYK9MVh93 Lines: 241 243 99.2 %
Date: 2015-10-10 Functions: 22 22 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  * Copyright 2015, Google Inc.
       4             :  * All rights reserved.
       5             :  *
       6             :  * Redistribution and use in source and binary forms, with or without
       7             :  * modification, are permitted provided that the following conditions are
       8             :  * met:
       9             :  *
      10             :  *     * Redistributions of source code must retain the above copyright
      11             :  * notice, this list of conditions and the following disclaimer.
      12             :  *     * Redistributions in binary form must reproduce the above
      13             :  * copyright notice, this list of conditions and the following disclaimer
      14             :  * in the documentation and/or other materials provided with the
      15             :  * distribution.
      16             :  *     * Neither the name of Google Inc. nor the names of its
      17             :  * contributors may be used to endorse or promote products derived from
      18             :  * this software without specific prior written permission.
      19             :  *
      20             :  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
      21             :  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
      22             :  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
      23             :  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
      24             :  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
      25             :  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
      26             :  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
      27             :  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
      28             :  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
      29             :  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
      30             :  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
      31             :  *
      32             :  */
      33             : 
      34             : #include "test/core/end2end/fixtures/proxy.h"
      35             : 
      36             : #include <string.h>
      37             : 
      38             : #include <grpc/support/alloc.h>
      39             : #include <grpc/support/host_port.h>
      40             : #include <grpc/support/log.h>
      41             : #include <grpc/support/sync.h>
      42             : #include <grpc/support/thd.h>
      43             : #include <grpc/support/useful.h>
      44             : 
      45             : #include "test/core/util/port.h"
      46             : 
      47             : struct grpc_end2end_proxy {
      48             :   gpr_thd_id thd;
      49             :   char *proxy_port;
      50             :   char *server_port;
      51             :   grpc_completion_queue *cq;
      52             :   grpc_server *server;
      53             :   grpc_channel *client;
      54             : 
      55             :   int shutdown;
      56             : 
      57             :   /* requested call */
      58             :   grpc_call *new_call;
      59             :   grpc_call_details new_call_details;
      60             :   grpc_metadata_array new_call_metadata;
      61             : };
      62             : 
      63             : typedef struct {
      64             :   void (*func)(void *arg, int success);
      65             :   void *arg;
      66             : } closure;
      67             : 
      68             : typedef struct {
      69             :   gpr_refcount refs;
      70             :   grpc_end2end_proxy *proxy;
      71             : 
      72             :   grpc_call *c2p;
      73             :   grpc_call *p2s;
      74             : 
      75             :   grpc_metadata_array c2p_initial_metadata;
      76             :   grpc_metadata_array p2s_initial_metadata;
      77             : 
      78             :   grpc_byte_buffer *c2p_msg;
      79             :   grpc_byte_buffer *p2s_msg;
      80             : 
      81             :   grpc_metadata_array p2s_trailing_metadata;
      82             :   grpc_status_code p2s_status;
      83             :   char *p2s_status_details;
      84             :   size_t p2s_status_details_capacity;
      85             : 
      86             :   int c2p_server_cancelled;
      87             : } proxy_call;
      88             : 
      89             : static void thread_main(void *arg);
      90             : static void request_call(grpc_end2end_proxy *proxy);
      91             : 
      92         194 : grpc_end2end_proxy *grpc_end2end_proxy_create(
      93             :     const grpc_end2end_proxy_def *def) {
      94         194 :   gpr_thd_options opt = gpr_thd_options_default();
      95         194 :   int proxy_port = grpc_pick_unused_port_or_die();
      96         194 :   int server_port = grpc_pick_unused_port_or_die();
      97             : 
      98         194 :   grpc_end2end_proxy *proxy = gpr_malloc(sizeof(*proxy));
      99         194 :   memset(proxy, 0, sizeof(*proxy));
     100             : 
     101         194 :   gpr_join_host_port(&proxy->proxy_port, "localhost", proxy_port);
     102         194 :   gpr_join_host_port(&proxy->server_port, "localhost", server_port);
     103             : 
     104         194 :   gpr_log(GPR_DEBUG, "PROXY ADDR:%s BACKEND:%s", proxy->proxy_port,
     105             :           proxy->server_port);
     106             : 
     107         194 :   proxy->cq = grpc_completion_queue_create(NULL);
     108         194 :   proxy->server = def->create_server(proxy->proxy_port);
     109         194 :   proxy->client = def->create_client(proxy->server_port);
     110             : 
     111         194 :   grpc_server_register_completion_queue(proxy->server, proxy->cq, NULL);
     112         194 :   grpc_server_start(proxy->server);
     113             : 
     114         194 :   gpr_thd_options_set_joinable(&opt);
     115         194 :   GPR_ASSERT(gpr_thd_new(&proxy->thd, thread_main, proxy, &opt));
     116             : 
     117         194 :   request_call(proxy);
     118             : 
     119         194 :   return proxy;
     120             : }
     121             : 
     122        4056 : static closure *new_closure(void (*func)(void *arg, int success), void *arg) {
     123        4056 :   closure *cl = gpr_malloc(sizeof(*cl));
     124        4056 :   cl->func = func;
     125        4056 :   cl->arg = arg;
     126        4056 :   return cl;
     127             : }
     128             : 
     129         194 : static void shutdown_complete(void *arg, int success) {
     130         194 :   grpc_end2end_proxy *proxy = arg;
     131         194 :   proxy->shutdown = 1;
     132         194 :   grpc_completion_queue_shutdown(proxy->cq);
     133         194 : }
     134             : 
     135         194 : void grpc_end2end_proxy_destroy(grpc_end2end_proxy *proxy) {
     136         194 :   grpc_server_shutdown_and_notify(proxy->server, proxy->cq,
     137         194 :                                   new_closure(shutdown_complete, proxy));
     138         194 :   gpr_thd_join(proxy->thd);
     139         194 :   gpr_free(proxy->proxy_port);
     140         194 :   gpr_free(proxy->server_port);
     141         194 :   grpc_server_destroy(proxy->server);
     142         194 :   grpc_channel_destroy(proxy->client);
     143         194 :   grpc_completion_queue_destroy(proxy->cq);
     144         194 :   grpc_call_details_destroy(&proxy->new_call_details);
     145         194 :   gpr_free(proxy);
     146         194 : }
     147             : 
     148        3668 : static void unrefpc(proxy_call *pc, const char *reason) {
     149        3668 :   if (gpr_unref(&pc->refs)) {
     150         286 :     grpc_call_destroy(pc->c2p);
     151         286 :     grpc_call_destroy(pc->p2s);
     152         286 :     grpc_metadata_array_destroy(&pc->c2p_initial_metadata);
     153         286 :     grpc_metadata_array_destroy(&pc->p2s_initial_metadata);
     154         286 :     grpc_metadata_array_destroy(&pc->p2s_trailing_metadata);
     155         286 :     gpr_free(pc->p2s_status_details);
     156         286 :     gpr_free(pc);
     157             :   }
     158        3668 : }
     159             : 
     160        3382 : static void refpc(proxy_call *pc, const char *reason) { gpr_ref(&pc->refs); }
     161             : 
     162         286 : static void on_c2p_sent_initial_metadata(void *arg, int success) {
     163         286 :   proxy_call *pc = arg;
     164         286 :   unrefpc(pc, "on_c2p_sent_initial_metadata");
     165         286 : }
     166             : 
     167         286 : static void on_p2s_recv_initial_metadata(void *arg, int success) {
     168         286 :   proxy_call *pc = arg;
     169             :   grpc_op op;
     170             :   grpc_call_error err;
     171             : 
     172         286 :   if (!pc->proxy->shutdown) {
     173         286 :     op.op = GRPC_OP_SEND_INITIAL_METADATA;
     174         286 :     op.flags = 0;
     175         286 :     op.reserved = NULL;
     176         286 :     op.data.send_initial_metadata.count = pc->p2s_initial_metadata.count;
     177         286 :     op.data.send_initial_metadata.metadata = pc->p2s_initial_metadata.metadata;
     178         286 :     refpc(pc, "on_c2p_sent_initial_metadata");
     179         286 :     err = grpc_call_start_batch(
     180         286 :         pc->c2p, &op, 1, new_closure(on_c2p_sent_initial_metadata, pc), NULL);
     181         286 :     GPR_ASSERT(err == GRPC_CALL_OK);
     182             :   }
     183             : 
     184         286 :   unrefpc(pc, "on_p2s_recv_initial_metadata");
     185         286 : }
     186             : 
     187         286 : static void on_p2s_sent_initial_metadata(void *arg, int success) {
     188         286 :   proxy_call *pc = arg;
     189         286 :   unrefpc(pc, "on_p2s_sent_initial_metadata");
     190         286 : }
     191             : 
     192             : static void on_c2p_recv_msg(void *arg, int success);
     193             : 
     194         214 : static void on_p2s_sent_message(void *arg, int success) {
     195         214 :   proxy_call *pc = arg;
     196             :   grpc_op op;
     197             :   grpc_call_error err;
     198             : 
     199         214 :   grpc_byte_buffer_destroy(pc->c2p_msg);
     200         214 :   if (!pc->proxy->shutdown && success) {
     201         214 :     op.op = GRPC_OP_RECV_MESSAGE;
     202         214 :     op.flags = 0;
     203         214 :     op.reserved = NULL;
     204         214 :     op.data.recv_message = &pc->c2p_msg;
     205         214 :     refpc(pc, "on_c2p_recv_msg");
     206         214 :     err = grpc_call_start_batch(pc->c2p, &op, 1,
     207         214 :                                 new_closure(on_c2p_recv_msg, pc), NULL);
     208         214 :     GPR_ASSERT(err == GRPC_CALL_OK);
     209             :   }
     210             : 
     211         214 :   unrefpc(pc, "on_p2s_sent_message");
     212         214 : }
     213             : 
     214         284 : static void on_p2s_sent_close(void *arg, int success) {
     215         284 :   proxy_call *pc = arg;
     216         284 :   unrefpc(pc, "on_p2s_sent_close");
     217         284 : }
     218             : 
     219         500 : static void on_c2p_recv_msg(void *arg, int success) {
     220         500 :   proxy_call *pc = arg;
     221             :   grpc_op op;
     222             :   grpc_call_error err;
     223             : 
     224         500 :   if (!pc->proxy->shutdown && success) {
     225         498 :     if (pc->c2p_msg != NULL) {
     226         214 :       op.op = GRPC_OP_SEND_MESSAGE;
     227         214 :       op.flags = 0;
     228         214 :       op.reserved = NULL;
     229         214 :       op.data.send_message = pc->c2p_msg;
     230         214 :       refpc(pc, "on_p2s_sent_message");
     231         214 :       err = grpc_call_start_batch(pc->p2s, &op, 1,
     232         214 :                                   new_closure(on_p2s_sent_message, pc), NULL);
     233         214 :       GPR_ASSERT(err == GRPC_CALL_OK);
     234             :     } else {
     235         284 :       op.op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
     236         284 :       op.flags = 0;
     237         284 :       op.reserved = NULL;
     238         284 :       refpc(pc, "on_p2s_sent_close");
     239         284 :       err = grpc_call_start_batch(pc->p2s, &op, 1,
     240         284 :                                   new_closure(on_p2s_sent_close, pc), NULL);
     241         284 :       GPR_ASSERT(err == GRPC_CALL_OK);
     242             :     }
     243             :   }
     244             : 
     245         500 :   unrefpc(pc, "on_c2p_recv_msg");
     246         500 : }
     247             : 
     248             : static void on_p2s_recv_msg(void *arg, int success);
     249             : 
     250         192 : static void on_c2p_sent_message(void *arg, int success) {
     251         192 :   proxy_call *pc = arg;
     252             :   grpc_op op;
     253             :   grpc_call_error err;
     254             : 
     255         192 :   grpc_byte_buffer_destroy(pc->p2s_msg);
     256         192 :   if (!pc->proxy->shutdown && success) {
     257         190 :     op.op = GRPC_OP_RECV_MESSAGE;
     258         190 :     op.flags = 0;
     259         190 :     op.reserved = NULL;
     260         190 :     op.data.recv_message = &pc->p2s_msg;
     261         190 :     refpc(pc, "on_p2s_recv_msg");
     262         190 :     err = grpc_call_start_batch(pc->p2s, &op, 1,
     263         190 :                                 new_closure(on_p2s_recv_msg, pc), NULL);
     264         190 :     GPR_ASSERT(err == GRPC_CALL_OK);
     265             :   }
     266             : 
     267         192 :   unrefpc(pc, "on_c2p_sent_message");
     268         192 : }
     269             : 
     270         476 : static void on_p2s_recv_msg(void *arg, int success) {
     271         476 :   proxy_call *pc = arg;
     272             :   grpc_op op;
     273             :   grpc_call_error err;
     274             : 
     275         476 :   if (!pc->proxy->shutdown && success && pc->p2s_msg) {
     276         192 :     op.op = GRPC_OP_SEND_MESSAGE;
     277         192 :     op.flags = 0;
     278         192 :     op.reserved = NULL;
     279         192 :     op.data.send_message = pc->p2s_msg;
     280         192 :     refpc(pc, "on_c2p_sent_message");
     281         192 :     err = grpc_call_start_batch(pc->c2p, &op, 1,
     282         192 :                                 new_closure(on_c2p_sent_message, pc), NULL);
     283         192 :     GPR_ASSERT(err == GRPC_CALL_OK);
     284             :   }
     285         476 :   unrefpc(pc, "on_p2s_recv_msg");
     286         476 : }
     287             : 
     288         286 : static void on_c2p_sent_status(void *arg, int success) {
     289         286 :   proxy_call *pc = arg;
     290         286 :   unrefpc(pc, "on_c2p_sent_status");
     291         286 : }
     292             : 
     293         286 : static void on_p2s_status(void *arg, int success) {
     294         286 :   proxy_call *pc = arg;
     295             :   grpc_op op;
     296             :   grpc_call_error err;
     297             : 
     298         286 :   if (!pc->proxy->shutdown) {
     299         286 :     GPR_ASSERT(success);
     300         286 :     op.op = GRPC_OP_SEND_STATUS_FROM_SERVER;
     301         286 :     op.flags = 0;
     302         286 :     op.reserved = NULL;
     303         286 :     op.data.send_status_from_server.trailing_metadata_count =
     304         286 :         pc->p2s_trailing_metadata.count;
     305         286 :     op.data.send_status_from_server.trailing_metadata =
     306         286 :         pc->p2s_trailing_metadata.metadata;
     307         286 :     op.data.send_status_from_server.status = pc->p2s_status;
     308         286 :     op.data.send_status_from_server.status_details = pc->p2s_status_details;
     309         286 :     refpc(pc, "on_c2p_sent_status");
     310         286 :     err = grpc_call_start_batch(pc->c2p, &op, 1,
     311         286 :                                 new_closure(on_c2p_sent_status, pc), NULL);
     312         286 :     GPR_ASSERT(err == GRPC_CALL_OK);
     313             :   }
     314             : 
     315         286 :   unrefpc(pc, "on_p2s_status");
     316         286 : }
     317             : 
     318         286 : static void on_c2p_closed(void *arg, int success) {
     319         286 :   proxy_call *pc = arg;
     320         286 :   unrefpc(pc, "on_c2p_closed");
     321         286 : }
     322             : 
     323         480 : static void on_new_call(void *arg, int success) {
     324         480 :   grpc_end2end_proxy *proxy = arg;
     325             :   grpc_call_error err;
     326             : 
     327         480 :   if (success) {
     328             :     grpc_op op;
     329         286 :     proxy_call *pc = gpr_malloc(sizeof(*pc));
     330         286 :     memset(pc, 0, sizeof(*pc));
     331         286 :     pc->proxy = proxy;
     332         286 :     GPR_SWAP(grpc_metadata_array, pc->c2p_initial_metadata,
     333             :              proxy->new_call_metadata);
     334         286 :     pc->c2p = proxy->new_call;
     335         286 :     pc->p2s = grpc_channel_create_call(
     336             :         proxy->client, pc->c2p, GRPC_PROPAGATE_DEFAULTS, proxy->cq,
     337         286 :         proxy->new_call_details.method, proxy->new_call_details.host,
     338             :         proxy->new_call_details.deadline, NULL);
     339         286 :     gpr_ref_init(&pc->refs, 1);
     340             : 
     341         286 :     op.flags = 0;
     342         286 :     op.reserved = NULL;
     343             : 
     344         286 :     op.op = GRPC_OP_RECV_INITIAL_METADATA;
     345         286 :     op.data.recv_initial_metadata = &pc->p2s_initial_metadata;
     346         286 :     refpc(pc, "on_p2s_recv_initial_metadata");
     347         286 :     err = grpc_call_start_batch(
     348         286 :         pc->p2s, &op, 1, new_closure(on_p2s_recv_initial_metadata, pc), NULL);
     349         286 :     GPR_ASSERT(err == GRPC_CALL_OK);
     350             : 
     351         286 :     op.op = GRPC_OP_SEND_INITIAL_METADATA;
     352         286 :     op.data.send_initial_metadata.count = pc->c2p_initial_metadata.count;
     353         286 :     op.data.send_initial_metadata.metadata = pc->c2p_initial_metadata.metadata;
     354         286 :     refpc(pc, "on_p2s_sent_initial_metadata");
     355         286 :     err = grpc_call_start_batch(
     356         286 :         pc->p2s, &op, 1, new_closure(on_p2s_sent_initial_metadata, pc), NULL);
     357         286 :     GPR_ASSERT(err == GRPC_CALL_OK);
     358             : 
     359         286 :     op.op = GRPC_OP_RECV_MESSAGE;
     360         286 :     op.data.recv_message = &pc->c2p_msg;
     361         286 :     refpc(pc, "on_c2p_recv_msg");
     362         286 :     err = grpc_call_start_batch(pc->c2p, &op, 1,
     363         286 :                                 new_closure(on_c2p_recv_msg, pc), NULL);
     364         286 :     GPR_ASSERT(err == GRPC_CALL_OK);
     365             : 
     366         286 :     op.op = GRPC_OP_RECV_MESSAGE;
     367         286 :     op.data.recv_message = &pc->p2s_msg;
     368         286 :     refpc(pc, "on_p2s_recv_msg");
     369         286 :     err = grpc_call_start_batch(pc->p2s, &op, 1,
     370         286 :                                 new_closure(on_p2s_recv_msg, pc), NULL);
     371         286 :     GPR_ASSERT(err == GRPC_CALL_OK);
     372             : 
     373         286 :     op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
     374         286 :     op.data.recv_status_on_client.trailing_metadata =
     375         286 :         &pc->p2s_trailing_metadata;
     376         286 :     op.data.recv_status_on_client.status = &pc->p2s_status;
     377         286 :     op.data.recv_status_on_client.status_details = &pc->p2s_status_details;
     378         286 :     op.data.recv_status_on_client.status_details_capacity =
     379         286 :         &pc->p2s_status_details_capacity;
     380         286 :     refpc(pc, "on_p2s_status");
     381         286 :     err = grpc_call_start_batch(pc->p2s, &op, 1, new_closure(on_p2s_status, pc),
     382             :                                 NULL);
     383         286 :     GPR_ASSERT(err == GRPC_CALL_OK);
     384             : 
     385         286 :     op.op = GRPC_OP_RECV_CLOSE_ON_SERVER;
     386         286 :     op.data.recv_close_on_server.cancelled = &pc->c2p_server_cancelled;
     387         286 :     refpc(pc, "on_c2p_closed");
     388         286 :     err = grpc_call_start_batch(pc->c2p, &op, 1, new_closure(on_c2p_closed, pc),
     389             :                                 NULL);
     390         286 :     GPR_ASSERT(err == GRPC_CALL_OK);
     391             : 
     392         286 :     request_call(proxy);
     393             : 
     394         286 :     unrefpc(pc, "init");
     395             :   } else {
     396         194 :     GPR_ASSERT(proxy->new_call == NULL);
     397             :   }
     398         480 : }
     399             : 
     400         480 : static void request_call(grpc_end2end_proxy *proxy) {
     401         480 :   proxy->new_call = NULL;
     402         480 :   GPR_ASSERT(GRPC_CALL_OK == grpc_server_request_call(
     403             :                                  proxy->server, &proxy->new_call,
     404             :                                  &proxy->new_call_details,
     405             :                                  &proxy->new_call_metadata, proxy->cq,
     406             :                                  proxy->cq, new_closure(on_new_call, proxy)));
     407         480 : }
     408             : 
     409         194 : static void thread_main(void *arg) {
     410         194 :   grpc_end2end_proxy *proxy = arg;
     411             :   closure *cl;
     412             :   for (;;) {
     413        4250 :     grpc_event ev = grpc_completion_queue_next(
     414             :         proxy->cq, gpr_inf_future(GPR_CLOCK_MONOTONIC), NULL);
     415        4250 :     switch (ev.type) {
     416             :       case GRPC_QUEUE_TIMEOUT:
     417           0 :         gpr_log(GPR_ERROR, "Should never reach here");
     418           0 :         abort();
     419             :       case GRPC_QUEUE_SHUTDOWN:
     420         388 :         return;
     421             :       case GRPC_OP_COMPLETE:
     422        4056 :         cl = ev.tag;
     423        4056 :         cl->func(cl->arg, ev.success);
     424        4056 :         gpr_free(cl);
     425        4056 :         break;
     426             :     }
     427        4056 :   }
     428             : }
     429             : 
     430         194 : const char *grpc_end2end_proxy_get_client_target(grpc_end2end_proxy *proxy) {
     431         194 :   return proxy->proxy_port;
     432             : }
     433             : 
     434         197 : const char *grpc_end2end_proxy_get_server_port(grpc_end2end_proxy *proxy) {
     435         197 :   return proxy->server_port;
     436             : }

Generated by: LCOV version 1.10