Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include "src/core/transport/transport.h"
35 : #include <grpc/support/alloc.h>
36 : #include <grpc/support/atm.h>
37 : #include <grpc/support/log.h>
38 : #include "src/core/transport/transport_impl.h"
39 :
40 : #ifdef GRPC_STREAM_REFCOUNT_DEBUG
41 : void grpc_stream_ref(grpc_stream_refcount *refcount, const char *reason) {
42 : gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
43 : gpr_log(GPR_DEBUG, "STREAM %p:%p REF %d->%d %s", refcount,
44 : refcount->destroy.cb_arg, val, val + 1, reason);
45 : #else
46 27565985 : void grpc_stream_ref(grpc_stream_refcount *refcount) {
47 : #endif
48 27565985 : gpr_ref(&refcount->refs);
49 27695174 : }
50 :
51 : #ifdef GRPC_STREAM_REFCOUNT_DEBUG
52 : void grpc_stream_unref(grpc_exec_ctx *exec_ctx, grpc_stream_refcount *refcount,
53 : const char *reason) {
54 : gpr_atm val = gpr_atm_no_barrier_load(&refcount->refs.count);
55 : gpr_log(GPR_DEBUG, "STREAM %p:%p UNREF %d->%d %s", refcount,
56 : refcount->destroy.cb_arg, val, val - 1, reason);
57 : #else
58 34194660 : void grpc_stream_unref(grpc_exec_ctx *exec_ctx,
59 : grpc_stream_refcount *refcount) {
60 : #endif
61 34194660 : if (gpr_unref(&refcount->refs)) {
62 6609911 : grpc_exec_ctx_enqueue(exec_ctx, &refcount->destroy, 1);
63 : }
64 34357751 : }
65 :
66 6039 : size_t grpc_transport_stream_size(grpc_transport *transport) {
67 6039 : return transport->vtable->sizeof_stream;
68 : }
69 :
70 5963 : void grpc_transport_destroy(grpc_exec_ctx *exec_ctx,
71 : grpc_transport *transport) {
72 5963 : transport->vtable->destroy(exec_ctx, transport);
73 5964 : }
74 :
75 4438203 : int grpc_transport_init_stream(grpc_exec_ctx *exec_ctx,
76 : grpc_transport *transport, grpc_stream *stream,
77 : grpc_stream_refcount *refcount,
78 : const void *server_data) {
79 4438203 : return transport->vtable->init_stream(exec_ctx, transport, stream, refcount,
80 : server_data);
81 : }
82 :
83 12326147 : void grpc_transport_perform_stream_op(grpc_exec_ctx *exec_ctx,
84 : grpc_transport *transport,
85 : grpc_stream *stream,
86 : grpc_transport_stream_op *op) {
87 12326147 : transport->vtable->perform_stream_op(exec_ctx, transport, stream, op);
88 12381743 : }
89 :
90 16910 : void grpc_transport_perform_op(grpc_exec_ctx *exec_ctx,
91 : grpc_transport *transport,
92 : grpc_transport_op *op) {
93 16910 : transport->vtable->perform_op(exec_ctx, transport, op);
94 16911 : }
95 :
96 4439823 : void grpc_transport_set_pollset(grpc_exec_ctx *exec_ctx,
97 : grpc_transport *transport, grpc_stream *stream,
98 : grpc_pollset *pollset) {
99 4439823 : transport->vtable->set_pollset(exec_ctx, transport, stream, pollset);
100 4442797 : }
101 :
102 4439118 : void grpc_transport_destroy_stream(grpc_exec_ctx *exec_ctx,
103 : grpc_transport *transport,
104 : grpc_stream *stream) {
105 4439118 : transport->vtable->destroy_stream(exec_ctx, transport, stream);
106 4443190 : }
107 :
108 1245 : char *grpc_transport_get_peer(grpc_exec_ctx *exec_ctx,
109 : grpc_transport *transport) {
110 1245 : return transport->vtable->get_peer(exec_ctx, transport);
111 : }
112 :
113 702 : void grpc_transport_stream_op_finish_with_failure(
114 : grpc_exec_ctx *exec_ctx, grpc_transport_stream_op *op) {
115 702 : grpc_exec_ctx_enqueue(exec_ctx, op->recv_message_ready, 0);
116 702 : grpc_exec_ctx_enqueue(exec_ctx, op->on_complete, 0);
117 702 : }
118 :
119 2 : void grpc_transport_stream_op_add_cancellation(grpc_transport_stream_op *op,
120 : grpc_status_code status) {
121 2 : GPR_ASSERT(status != GRPC_STATUS_OK);
122 2 : if (op->cancel_with_status == GRPC_STATUS_OK) {
123 2 : op->cancel_with_status = status;
124 : }
125 2 : if (op->close_with_status != GRPC_STATUS_OK) {
126 0 : op->close_with_status = GRPC_STATUS_OK;
127 0 : if (op->optional_close_message != NULL) {
128 0 : gpr_slice_unref(*op->optional_close_message);
129 0 : op->optional_close_message = NULL;
130 : }
131 : }
132 2 : }
133 :
134 : typedef struct {
135 : gpr_slice message;
136 : grpc_closure *then_call;
137 : grpc_closure closure;
138 : } close_message_data;
139 :
140 5 : static void free_message(grpc_exec_ctx *exec_ctx, void *p, int iomgr_success) {
141 5 : close_message_data *cmd = p;
142 5 : gpr_slice_unref(cmd->message);
143 5 : if (cmd->then_call != NULL) {
144 0 : cmd->then_call->cb(exec_ctx, cmd->then_call->cb_arg, iomgr_success);
145 : }
146 5 : gpr_free(cmd);
147 5 : }
148 :
149 5 : void grpc_transport_stream_op_add_close(grpc_transport_stream_op *op,
150 : grpc_status_code status,
151 : gpr_slice *optional_message) {
152 : close_message_data *cmd;
153 5 : GPR_ASSERT(status != GRPC_STATUS_OK);
154 10 : if (op->cancel_with_status != GRPC_STATUS_OK ||
155 5 : op->close_with_status != GRPC_STATUS_OK) {
156 0 : if (optional_message) {
157 0 : gpr_slice_unref(*optional_message);
158 : }
159 5 : return;
160 : }
161 5 : if (optional_message) {
162 5 : cmd = gpr_malloc(sizeof(*cmd));
163 5 : cmd->message = *optional_message;
164 5 : cmd->then_call = op->on_complete;
165 5 : grpc_closure_init(&cmd->closure, free_message, cmd);
166 5 : op->on_complete = &cmd->closure;
167 5 : op->optional_close_message = &cmd->message;
168 : }
169 5 : op->close_with_status = status;
170 : }
|