Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <grpc++/impl/proto_utils.h>
35 :
36 : #include <climits>
37 :
38 : #include <grpc/grpc.h>
39 : #include <grpc/byte_buffer.h>
40 : #include <grpc/byte_buffer_reader.h>
41 : #include <grpc/support/log.h>
42 : #include <grpc/support/slice.h>
43 : #include <grpc/support/slice_buffer.h>
44 : #include <grpc/support/port_platform.h>
45 : #include <grpc++/support/config.h>
46 :
47 : #include "src/core/profiling/timers.h"
48 :
49 : const int kMaxBufferLength = 8192;
50 :
51 : class GrpcBufferWriter GRPC_FINAL
52 : : public ::grpc::protobuf::io::ZeroCopyOutputStream {
53 : public:
54 16 : explicit GrpcBufferWriter(grpc_byte_buffer** bp,
55 : int block_size = kMaxBufferLength)
56 16 : : block_size_(block_size), byte_count_(0), have_backup_(false) {
57 16 : *bp = grpc_raw_byte_buffer_create(NULL, 0);
58 16 : slice_buffer_ = &(*bp)->data.raw.slice_buffer;
59 16 : }
60 :
61 32 : ~GrpcBufferWriter() GRPC_OVERRIDE {
62 16 : if (have_backup_) {
63 16 : gpr_slice_unref(backup_slice_);
64 : }
65 16 : }
66 :
67 1078 : bool Next(void** data, int* size) GRPC_OVERRIDE {
68 1078 : if (have_backup_) {
69 0 : slice_ = backup_slice_;
70 0 : have_backup_ = false;
71 : } else {
72 1078 : slice_ = gpr_slice_malloc(block_size_);
73 : }
74 1078 : *data = GPR_SLICE_START_PTR(slice_);
75 : // On win x64, int is only 32bit
76 1078 : GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
77 1078 : byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
78 1078 : gpr_slice_buffer_add(slice_buffer_, slice_);
79 1078 : return true;
80 : }
81 :
82 16 : void BackUp(int count) GRPC_OVERRIDE {
83 16 : gpr_slice_buffer_pop(slice_buffer_);
84 16 : if (count == block_size_) {
85 0 : backup_slice_ = slice_;
86 : } else {
87 : backup_slice_ =
88 16 : gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count);
89 16 : gpr_slice_buffer_add(slice_buffer_, slice_);
90 : }
91 16 : have_backup_ = true;
92 16 : byte_count_ -= count;
93 16 : }
94 :
95 0 : grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; }
96 :
97 : private:
98 : const int block_size_;
99 : gpr_int64 byte_count_;
100 : gpr_slice_buffer* slice_buffer_;
101 : bool have_backup_;
102 : gpr_slice backup_slice_;
103 : gpr_slice slice_;
104 : };
105 :
106 : class GrpcBufferReader GRPC_FINAL
107 : : public ::grpc::protobuf::io::ZeroCopyInputStream {
108 : public:
109 3751103 : explicit GrpcBufferReader(grpc_byte_buffer* buffer)
110 3751103 : : byte_count_(0), backup_count_(0) {
111 3754060 : grpc_byte_buffer_reader_init(&reader_, buffer);
112 3755033 : }
113 7511942 : ~GrpcBufferReader() GRPC_OVERRIDE {
114 3755357 : grpc_byte_buffer_reader_destroy(&reader_);
115 3756579 : }
116 :
117 7503090 : bool Next(const void** data, int* size) GRPC_OVERRIDE {
118 7503090 : if (backup_count_ > 0) {
119 0 : *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
120 0 : backup_count_;
121 0 : GPR_ASSERT(backup_count_ <= INT_MAX);
122 0 : *size = (int)backup_count_;
123 0 : backup_count_ = 0;
124 0 : return true;
125 : }
126 7503090 : if (!grpc_byte_buffer_reader_next(&reader_, &slice_)) {
127 5511499 : return false;
128 : }
129 2007195 : gpr_slice_unref(slice_);
130 2007209 : *data = GPR_SLICE_START_PTR(slice_);
131 : // On win x64, int is only 32bit
132 2007209 : GPR_ASSERT(GPR_SLICE_LENGTH(slice_) <= INT_MAX);
133 2007209 : byte_count_ += * size = (int)GPR_SLICE_LENGTH(slice_);
134 2007209 : return true;
135 : }
136 :
137 0 : void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; }
138 :
139 0 : bool Skip(int count) GRPC_OVERRIDE {
140 : const void* data;
141 : int size;
142 0 : while (Next(&data, &size)) {
143 0 : if (size >= count) {
144 0 : BackUp(size - count);
145 0 : return true;
146 : }
147 : // size < count;
148 0 : count -= size;
149 : }
150 : // error or we have too large count;
151 0 : return false;
152 : }
153 :
154 0 : grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE {
155 0 : return byte_count_ - backup_count_;
156 : }
157 :
158 : private:
159 : gpr_int64 byte_count_;
160 : gpr_int64 backup_count_;
161 : grpc_byte_buffer_reader reader_;
162 : gpr_slice slice_;
163 : };
164 :
165 : namespace grpc {
166 :
167 3755741 : Status SerializeProto(const grpc::protobuf::Message& msg,
168 : grpc_byte_buffer** bp) {
169 : GPR_TIMER_SCOPE("SerializeProto", 0);
170 3755741 : int byte_size = msg.ByteSize();
171 3754659 : if (byte_size <= kMaxBufferLength) {
172 3754643 : gpr_slice slice = gpr_slice_malloc(byte_size);
173 3756206 : GPR_ASSERT(GPR_SLICE_END_PTR(slice) ==
174 : msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
175 3752793 : *bp = grpc_raw_byte_buffer_create(&slice, 1);
176 3755847 : gpr_slice_unref(slice);
177 3753473 : return Status::OK;
178 : } else {
179 16 : GrpcBufferWriter writer(bp);
180 16 : return msg.SerializeToZeroCopyStream(&writer)
181 : ? Status::OK
182 16 : : Status(StatusCode::INTERNAL, "Failed to serialize message");
183 : }
184 : }
185 :
186 3751811 : Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
187 : int max_message_size) {
188 : GPR_TIMER_SCOPE("DeserializeProto", 0);
189 3751811 : if (!buffer) {
190 1 : return Status(StatusCode::INTERNAL, "No payload");
191 : }
192 3751810 : GrpcBufferReader reader(buffer);
193 7508638 : ::grpc::protobuf::io::CodedInputStream decoder(&reader);
194 3755902 : if (max_message_size > 0) {
195 100485 : decoder.SetTotalBytesLimit(max_message_size, max_message_size);
196 : }
197 3755902 : if (!msg->ParseFromCodedStream(&decoder)) {
198 0 : return Status(StatusCode::INTERNAL, msg->InitializationErrorString());
199 : }
200 3756249 : if (!decoder.ConsumedEntireMessage()) {
201 0 : return Status(StatusCode::INTERNAL, "Did not read entire message");
202 : }
203 7513391 : return Status::OK;
204 : }
205 :
206 : } // namespace grpc
|