Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <grpc/grpc.h>
35 :
36 : #include <string.h>
37 :
38 : #include "src/core/channel/channel_stack.h"
39 : #include "src/core/support/string.h"
40 : #include "src/core/surface/api_trace.h"
41 : #include "src/core/surface/channel.h"
42 : #include "src/core/surface/call.h"
43 : #include <grpc/support/alloc.h>
44 : #include <grpc/support/log.h>
45 :
46 : typedef struct {
47 : grpc_linked_mdelem status;
48 : grpc_linked_mdelem details;
49 : } call_data;
50 :
51 : typedef struct {
52 : grpc_mdctx *mdctx;
53 : grpc_channel *master;
54 : grpc_status_code error_code;
55 : const char *error_message;
56 : } channel_data;
57 :
58 4 : static void lame_start_transport_stream_op(grpc_exec_ctx *exec_ctx,
59 : grpc_call_element *elem,
60 : grpc_transport_stream_op *op) {
61 4 : call_data *calld = elem->call_data;
62 4 : channel_data *chand = elem->channel_data;
63 4 : GRPC_CALL_LOG_OP(GPR_INFO, elem, op);
64 4 : if (op->send_ops != NULL) {
65 3 : grpc_stream_ops_unref_owned_objects(op->send_ops->ops, op->send_ops->nops);
66 3 : op->on_done_send->cb(exec_ctx, op->on_done_send->cb_arg, 0);
67 : }
68 4 : if (op->recv_ops != NULL) {
69 : char tmp[GPR_LTOA_MIN_BUFSIZE];
70 : grpc_metadata_batch mdb;
71 3 : gpr_ltoa(chand->error_code, tmp);
72 3 : calld->status.md =
73 3 : grpc_mdelem_from_strings(chand->mdctx, "grpc-status", tmp);
74 3 : calld->details.md = grpc_mdelem_from_strings(chand->mdctx, "grpc-message",
75 : chand->error_message);
76 3 : calld->status.prev = calld->details.next = NULL;
77 3 : calld->status.next = &calld->details;
78 3 : calld->details.prev = &calld->status;
79 3 : mdb.list.head = &calld->status;
80 3 : mdb.list.tail = &calld->details;
81 3 : mdb.garbage.head = mdb.garbage.tail = NULL;
82 3 : mdb.deadline = gpr_inf_future(GPR_CLOCK_REALTIME);
83 3 : grpc_sopb_add_metadata(op->recv_ops, mdb);
84 3 : *op->recv_state = GRPC_STREAM_CLOSED;
85 3 : op->on_done_recv->cb(exec_ctx, op->on_done_recv->cb_arg, 1);
86 : }
87 4 : if (op->on_consumed != NULL) {
88 3 : op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 0);
89 : }
90 4 : }
91 :
92 0 : static char *lame_get_peer(grpc_exec_ctx *exec_ctx, grpc_call_element *elem) {
93 0 : channel_data *chand = elem->channel_data;
94 0 : return grpc_channel_get_target(chand->master);
95 : }
96 :
97 2 : static void lame_start_transport_op(grpc_exec_ctx *exec_ctx,
98 : grpc_channel_element *elem,
99 : grpc_transport_op *op) {
100 2 : if (op->on_connectivity_state_change) {
101 0 : GPR_ASSERT(*op->connectivity_state != GRPC_CHANNEL_FATAL_FAILURE);
102 0 : *op->connectivity_state = GRPC_CHANNEL_FATAL_FAILURE;
103 0 : op->on_connectivity_state_change->cb(
104 0 : exec_ctx, op->on_connectivity_state_change->cb_arg, 1);
105 : }
106 2 : if (op->on_consumed != NULL) {
107 0 : op->on_consumed->cb(exec_ctx, op->on_consumed->cb_arg, 1);
108 : }
109 2 : }
110 :
111 3 : static void init_call_elem(grpc_exec_ctx *exec_ctx, grpc_call_element *elem,
112 : const void *transport_server_data,
113 : grpc_transport_stream_op *initial_op) {
114 3 : if (initial_op) {
115 0 : grpc_transport_stream_op_finish_with_failure(exec_ctx, initial_op);
116 : }
117 3 : }
118 :
119 3 : static void destroy_call_elem(grpc_exec_ctx *exec_ctx,
120 3 : grpc_call_element *elem) {}
121 :
122 2 : static void init_channel_elem(grpc_exec_ctx *exec_ctx,
123 : grpc_channel_element *elem, grpc_channel *master,
124 : const grpc_channel_args *args, grpc_mdctx *mdctx,
125 : int is_first, int is_last) {
126 2 : channel_data *chand = elem->channel_data;
127 2 : GPR_ASSERT(is_first);
128 2 : GPR_ASSERT(is_last);
129 2 : chand->mdctx = mdctx;
130 2 : chand->master = master;
131 2 : }
132 :
133 2 : static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
134 2 : grpc_channel_element *elem) {}
135 :
136 : static const grpc_channel_filter lame_filter = {
137 : lame_start_transport_stream_op, lame_start_transport_op, sizeof(call_data),
138 : init_call_elem, destroy_call_elem, sizeof(channel_data), init_channel_elem,
139 : destroy_channel_elem, lame_get_peer, "lame-client",
140 : };
141 :
142 : #define CHANNEL_STACK_FROM_CHANNEL(c) ((grpc_channel_stack *)((c) + 1))
143 :
144 2 : grpc_channel *grpc_lame_client_channel_create(const char *target,
145 : grpc_status_code error_code,
146 : const char *error_message) {
147 : grpc_channel *channel;
148 : grpc_channel_element *elem;
149 : channel_data *chand;
150 2 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
151 : static const grpc_channel_filter *filters[] = {&lame_filter};
152 2 : channel = grpc_channel_create_from_filters(&exec_ctx, target, filters, 1,
153 : NULL, grpc_mdctx_create(), 1);
154 2 : elem = grpc_channel_stack_element(grpc_channel_get_channel_stack(channel), 0);
155 2 : GRPC_API_TRACE(
156 : "grpc_lame_client_channel_create(target=%s, error_code=%d, "
157 : "error_message=%s)",
158 : 3, (target, (int)error_code, error_message));
159 2 : GPR_ASSERT(elem->filter == &lame_filter);
160 2 : chand = (channel_data *)elem->channel_data;
161 2 : chand->error_code = error_code;
162 2 : chand->error_message = error_message;
163 2 : grpc_exec_ctx_finish(&exec_ctx);
164 2 : return channel;
165 : }
|