Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <grpc/grpc.h>
35 :
36 : #include <stdlib.h>
37 : #include <string.h>
38 :
39 : #include <grpc/support/alloc.h>
40 : #include <grpc/support/slice.h>
41 : #include <grpc/support/slice_buffer.h>
42 :
43 : #include "src/core/census/grpc_filter.h"
44 : #include "src/core/channel/channel_args.h"
45 : #include "src/core/channel/client_channel.h"
46 : #include "src/core/channel/compress_filter.h"
47 : #include "src/core/channel/http_client_filter.h"
48 : #include "src/core/client_config/resolver_registry.h"
49 : #include "src/core/iomgr/tcp_client.h"
50 : #include "src/core/surface/api_trace.h"
51 : #include "src/core/surface/channel.h"
52 : #include "src/core/transport/chttp2_transport.h"
53 :
54 : typedef struct {
55 : grpc_connector base;
56 : gpr_refcount refs;
57 :
58 : grpc_closure *notify;
59 : grpc_connect_in_args args;
60 : grpc_connect_out_args *result;
61 : grpc_closure initial_string_sent;
62 : gpr_slice_buffer initial_string_buffer;
63 :
64 : grpc_endpoint *tcp;
65 :
66 : grpc_closure connected;
67 : } connector;
68 :
69 2498 : static void connector_ref(grpc_connector *con) {
70 2446 : connector *c = (connector *)con;
71 2498 : gpr_ref(&c->refs);
72 2498 : }
73 :
74 4961 : static void connector_unref(grpc_exec_ctx *exec_ctx, grpc_connector *con) {
75 4883 : connector *c = (connector *)con;
76 4961 : if (gpr_unref(&c->refs)) {
77 : /* c->initial_string_buffer does not need to be destroyed */
78 2463 : gpr_free(c);
79 : }
80 4961 : }
81 :
82 2 : static void on_initial_connect_string_sent(grpc_exec_ctx *exec_ctx, void *arg,
83 : int success) {
84 2 : connector_unref(exec_ctx, arg);
85 2 : }
86 :
87 1930 : static void connected(grpc_exec_ctx *exec_ctx, void *arg, int success) {
88 1904 : connector *c = arg;
89 : grpc_closure *notify;
90 1930 : grpc_endpoint *tcp = c->tcp;
91 1930 : if (tcp != NULL) {
92 1558 : if (!GPR_SLICE_IS_EMPTY(c->args.initial_connect_string)) {
93 2 : grpc_closure_init(&c->initial_string_sent, on_initial_connect_string_sent,
94 : c);
95 2 : gpr_slice_buffer_init(&c->initial_string_buffer);
96 2 : gpr_slice_buffer_add(&c->initial_string_buffer,
97 : c->args.initial_connect_string);
98 2 : connector_ref(arg);
99 2 : grpc_endpoint_write(exec_ctx, tcp, &c->initial_string_buffer,
100 : &c->initial_string_sent);
101 : }
102 3116 : c->result->transport =
103 1558 : grpc_create_chttp2_transport(exec_ctx, c->args.channel_args, tcp, 1);
104 1558 : grpc_chttp2_transport_start_reading(exec_ctx, c->result->transport, NULL,
105 : 0);
106 1558 : GPR_ASSERT(c->result->transport);
107 1558 : c->result->filters = gpr_malloc(sizeof(grpc_channel_filter *));
108 1558 : c->result->filters[0] = &grpc_http_client_filter;
109 1558 : c->result->num_filters = 1;
110 : } else {
111 372 : memset(c->result, 0, sizeof(*c->result));
112 : }
113 1930 : notify = c->notify;
114 1930 : c->notify = NULL;
115 1930 : notify->cb(exec_ctx, notify->cb_arg, 1);
116 1930 : }
117 :
118 2327 : static void connector_shutdown(grpc_exec_ctx *exec_ctx, grpc_connector *con) {}
119 :
120 1955 : static void connector_connect(grpc_exec_ctx *exec_ctx, grpc_connector *con,
121 : const grpc_connect_in_args *args,
122 : grpc_connect_out_args *result,
123 : grpc_closure *notify) {
124 1904 : connector *c = (connector *)con;
125 1955 : GPR_ASSERT(c->notify == NULL);
126 1955 : GPR_ASSERT(notify->cb);
127 1955 : c->notify = notify;
128 1955 : c->args = *args;
129 1955 : c->result = result;
130 1955 : c->tcp = NULL;
131 1955 : grpc_closure_init(&c->connected, connected, c);
132 1955 : grpc_tcp_client_connect(exec_ctx, &c->connected, &c->tcp,
133 : args->interested_parties, args->addr, args->addr_len,
134 : args->deadline);
135 1955 : }
136 :
137 : static const grpc_connector_vtable connector_vtable = {
138 : connector_ref, connector_unref, connector_shutdown, connector_connect};
139 :
140 : typedef struct {
141 : grpc_subchannel_factory base;
142 : gpr_refcount refs;
143 : grpc_channel_args *merge_args;
144 : grpc_channel *master;
145 : } subchannel_factory;
146 :
147 1828 : static void subchannel_factory_ref(grpc_subchannel_factory *scf) {
148 1788 : subchannel_factory *f = (subchannel_factory *)scf;
149 1828 : gpr_ref(&f->refs);
150 1828 : }
151 :
152 3631 : static void subchannel_factory_unref(grpc_exec_ctx *exec_ctx,
153 : grpc_subchannel_factory *scf) {
154 3575 : subchannel_factory *f = (subchannel_factory *)scf;
155 3631 : if (gpr_unref(&f->refs)) {
156 1803 : GRPC_CHANNEL_INTERNAL_UNREF(exec_ctx, f->master, "subchannel_factory");
157 1803 : grpc_channel_args_destroy(f->merge_args);
158 1803 : gpr_free(f);
159 : }
160 3631 : }
161 :
162 2496 : static grpc_subchannel *subchannel_factory_create_subchannel(
163 : grpc_exec_ctx *exec_ctx, grpc_subchannel_factory *scf,
164 : grpc_subchannel_args *args) {
165 2444 : subchannel_factory *f = (subchannel_factory *)scf;
166 2496 : connector *c = gpr_malloc(sizeof(*c));
167 2496 : grpc_channel_args *final_args =
168 2496 : grpc_channel_args_merge(args->args, f->merge_args);
169 : grpc_subchannel *s;
170 2496 : memset(c, 0, sizeof(*c));
171 2496 : c->base.vtable = &connector_vtable;
172 2496 : gpr_ref_init(&c->refs, 1);
173 2496 : args->args = final_args;
174 2496 : args->master = f->master;
175 2496 : s = grpc_subchannel_create(&c->base, args);
176 2496 : grpc_connector_unref(exec_ctx, &c->base);
177 2496 : grpc_channel_args_destroy(final_args);
178 2496 : return s;
179 : }
180 :
181 : static const grpc_subchannel_factory_vtable subchannel_factory_vtable = {
182 : subchannel_factory_ref, subchannel_factory_unref,
183 : subchannel_factory_create_subchannel};
184 :
185 : /* Create a client channel:
186 : Asynchronously: - resolve target
187 : - connect to it (trying alternatives as presented)
188 : - perform handshakes */
189 1829 : grpc_channel *grpc_insecure_channel_create(const char *target,
190 : const grpc_channel_args *args,
191 : void *reserved) {
192 1789 : grpc_channel *channel = NULL;
193 : #define MAX_FILTERS 3
194 : const grpc_channel_filter *filters[MAX_FILTERS];
195 : grpc_resolver *resolver;
196 : subchannel_factory *f;
197 1829 : grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
198 1789 : size_t n = 0;
199 1829 : GRPC_API_TRACE(
200 : "grpc_insecure_channel_create(target=%p, args=%p, reserved=%p)", 3,
201 : (target, args, reserved));
202 1829 : GPR_ASSERT(!reserved);
203 1829 : if (grpc_channel_args_is_census_enabled(args)) {
204 12 : filters[n++] = &grpc_client_census_filter;
205 : }
206 1829 : filters[n++] = &grpc_compress_filter;
207 1829 : filters[n++] = &grpc_client_channel_filter;
208 1829 : GPR_ASSERT(n <= MAX_FILTERS);
209 :
210 1829 : channel =
211 : grpc_channel_create_from_filters(&exec_ctx, target, filters, n, args, 1);
212 :
213 1829 : f = gpr_malloc(sizeof(*f));
214 1829 : f->base.vtable = &subchannel_factory_vtable;
215 1829 : gpr_ref_init(&f->refs, 1);
216 1829 : f->merge_args = grpc_channel_args_copy(args);
217 1829 : f->master = channel;
218 1829 : GRPC_CHANNEL_INTERNAL_REF(f->master, "subchannel_factory");
219 1829 : resolver = grpc_resolver_create(target, &f->base);
220 1829 : if (!resolver) {
221 1 : GRPC_CHANNEL_INTERNAL_UNREF(&exec_ctx, f->master, "subchannel_factory");
222 1 : grpc_subchannel_factory_unref(&exec_ctx, &f->base);
223 1 : grpc_exec_ctx_finish(&exec_ctx);
224 1 : return NULL;
225 : }
226 :
227 1828 : grpc_client_channel_set_resolver(
228 : &exec_ctx, grpc_channel_get_channel_stack(channel), resolver);
229 1828 : GRPC_RESOLVER_UNREF(&exec_ctx, resolver, "create");
230 1828 : grpc_subchannel_factory_unref(&exec_ctx, &f->base);
231 :
232 1828 : grpc_exec_ctx_finish(&exec_ctx);
233 :
234 1828 : return channel;
235 : }
|