Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/census/grpc_filter.h"
35 :
36 : #include <stdio.h>
37 : #include <string.h>
38 :
39 : #include "src/core/channel/channel_stack.h"
40 : #include "src/core/channel/noop_filter.h"
41 : #include "src/core/statistics/census_interface.h"
42 : #include "src/core/statistics/census_rpc_stats.h"
43 : #include <grpc/census.h>
44 : #include <grpc/support/alloc.h>
45 : #include <grpc/support/log.h>
46 : #include <grpc/support/slice.h>
47 : #include <grpc/support/time.h>
48 :
49 : typedef struct call_data {
50 : census_op_id op_id;
51 : census_context *ctxt;
52 : gpr_timespec start_ts;
53 : int error;
54 :
55 : /* recv callback */
56 : grpc_stream_op_buffer *recv_ops;
57 : grpc_closure *on_done_recv;
58 : } call_data;
59 :
60 : typedef struct channel_data {
61 : grpc_mdstr *path_str; /* pointer to meta data str with key == ":path" */
62 : } channel_data;
63 :
64 17 : static void extract_and_annotate_method_tag(grpc_stream_op_buffer *sopb,
65 : call_data *calld,
66 : channel_data *chand) {
67 : grpc_linked_mdelem *m;
68 : size_t i;
69 34 : for (i = 0; i < sopb->nops; i++) {
70 17 : grpc_stream_op *op = &sopb->ops[i];
71 17 : if (op->type != GRPC_OP_METADATA) continue;
72 51 : for (m = op->data.metadata.list.head; m != NULL; m = m->next) {
73 34 : if (m->md->key == chand->path_str) {
74 34 : gpr_log(GPR_DEBUG, "%s",
75 34 : (const char *)GPR_SLICE_START_PTR(m->md->value->slice));
76 : /* Add method tag here */
77 : }
78 : }
79 : }
80 17 : }
81 :
82 20 : static void client_mutate_op(grpc_call_element *elem,
83 : grpc_transport_stream_op *op) {
84 20 : call_data *calld = elem->call_data;
85 20 : channel_data *chand = elem->channel_data;
86 20 : if (op->send_ops) {
87 17 : extract_and_annotate_method_tag(op->send_ops, calld, chand);
88 : }
89 20 : }
90 :
91 20 : static void client_start_transport_op(grpc_exec_ctx *exec_ctx,
92 : grpc_call_element *elem,
93 : grpc_transport_stream_op *op) {
94 20 : client_mutate_op(elem, op);
95 20 : grpc_call_next_op(exec_ctx, elem, op);
96 20 : }
97 :
98 0 : static void server_on_done_recv(grpc_exec_ctx *exec_ctx, void *ptr,
99 : int success) {
100 0 : grpc_call_element *elem = ptr;
101 0 : call_data *calld = elem->call_data;
102 0 : channel_data *chand = elem->channel_data;
103 0 : if (success) {
104 0 : extract_and_annotate_method_tag(calld->recv_ops, calld, chand);
105 : }
106 0 : calld->on_done_recv->cb(exec_ctx, calld->on_done_recv->cb_arg, success);
107 0 : }
108 :
109 0 : static void server_mutate_op(grpc_call_element *elem,
110 : grpc_transport_stream_op *op) {
111 0 : call_data *calld = elem->call_data;
112 0 : if (op->recv_ops) {
113 : /* substitute our callback for the op callback */
114 0 : calld->recv_ops = op->recv_ops;
115 0 : calld->on_done_recv = op->on_done_recv;
116 0 : op->on_done_recv = calld->on_done_recv;
117 : }
118 0 : }
119 :
120 0 : static void server_start_transport_op(grpc_exec_ctx *exec_ctx,
121 : grpc_call_element *elem,
122 : grpc_transport_stream_op *op) {
123 0 : call_data *calld = elem->call_data;
124 0 : GPR_ASSERT((calld->op_id.upper != 0) || (calld->op_id.lower != 0));
125 0 : server_mutate_op(elem, op);
126 0 : grpc_call_next_op(exec_ctx, elem, op);
127 0 : }
128 :
129 17 : static void client_init_call_elem(grpc_exec_ctx *exec_ctx,
130 : grpc_call_element *elem,
131 : const void *server_transport_data,
132 : grpc_transport_stream_op *initial_op) {
133 17 : call_data *d = elem->call_data;
134 17 : GPR_ASSERT(d != NULL);
135 17 : d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
136 17 : if (initial_op) client_mutate_op(elem, initial_op);
137 17 : }
138 :
139 17 : static void client_destroy_call_elem(grpc_exec_ctx *exec_ctx,
140 : grpc_call_element *elem) {
141 17 : call_data *d = elem->call_data;
142 17 : GPR_ASSERT(d != NULL);
143 : /* TODO(hongyu): record rpc client stats and census_rpc_end_op here */
144 17 : }
145 :
146 0 : static void server_init_call_elem(grpc_exec_ctx *exec_ctx,
147 : grpc_call_element *elem,
148 : const void *server_transport_data,
149 : grpc_transport_stream_op *initial_op) {
150 0 : call_data *d = elem->call_data;
151 0 : GPR_ASSERT(d != NULL);
152 0 : d->start_ts = gpr_now(GPR_CLOCK_REALTIME);
153 : /* TODO(hongyu): call census_tracing_start_op here. */
154 0 : grpc_closure_init(d->on_done_recv, server_on_done_recv, elem);
155 0 : if (initial_op) server_mutate_op(elem, initial_op);
156 0 : }
157 :
158 0 : static void server_destroy_call_elem(grpc_exec_ctx *exec_ctx,
159 : grpc_call_element *elem) {
160 0 : call_data *d = elem->call_data;
161 0 : GPR_ASSERT(d != NULL);
162 : /* TODO(hongyu): record rpc server stats and census_tracing_end_op here */
163 0 : }
164 :
165 17 : static void init_channel_elem(grpc_exec_ctx *exec_ctx,
166 : grpc_channel_element *elem, grpc_channel *master,
167 : const grpc_channel_args *args, grpc_mdctx *mdctx,
168 : int is_first, int is_last) {
169 17 : channel_data *chand = elem->channel_data;
170 17 : GPR_ASSERT(chand != NULL);
171 17 : chand->path_str = grpc_mdstr_from_string(mdctx, ":path");
172 17 : }
173 :
174 17 : static void destroy_channel_elem(grpc_exec_ctx *exec_ctx,
175 : grpc_channel_element *elem) {
176 17 : channel_data *chand = elem->channel_data;
177 17 : GPR_ASSERT(chand != NULL);
178 17 : if (chand->path_str != NULL) {
179 17 : GRPC_MDSTR_UNREF(chand->path_str);
180 : }
181 17 : }
182 :
183 : const grpc_channel_filter grpc_client_census_filter = {
184 : client_start_transport_op, grpc_channel_next_op,
185 : sizeof(call_data), client_init_call_elem,
186 : client_destroy_call_elem, sizeof(channel_data),
187 : init_channel_elem, destroy_channel_elem,
188 : grpc_call_next_get_peer, "census-client"};
189 :
190 : const grpc_channel_filter grpc_server_census_filter = {
191 : server_start_transport_op, grpc_channel_next_op,
192 : sizeof(call_data), server_init_call_elem,
193 : server_destroy_call_elem, sizeof(channel_data),
194 : init_channel_elem, destroy_channel_elem,
195 : grpc_call_next_get_peer, "census-server"};
|