Line data Source code
1 : /*
2 : *
3 : * Copyright 2015, Google Inc.
4 : * All rights reserved.
5 : *
6 : * Redistribution and use in source and binary forms, with or without
7 : * modification, are permitted provided that the following conditions are
8 : * met:
9 : *
10 : * * Redistributions of source code must retain the above copyright
11 : * notice, this list of conditions and the following disclaimer.
12 : * * Redistributions in binary form must reproduce the above
13 : * copyright notice, this list of conditions and the following disclaimer
14 : * in the documentation and/or other materials provided with the
15 : * distribution.
16 : * * Neither the name of Google Inc. nor the names of its
17 : * contributors may be used to endorse or promote products derived from
18 : * this software without specific prior written permission.
19 : *
20 : * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21 : * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22 : * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23 : * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24 : * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25 : * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26 : * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27 : * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28 : * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29 : * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30 : * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 : *
32 : */
33 :
34 : #include <grpc++/impl/proto_utils.h>
35 :
36 : #include <grpc/grpc.h>
37 : #include <grpc/byte_buffer.h>
38 : #include <grpc/byte_buffer_reader.h>
39 : #include <grpc/support/log.h>
40 : #include <grpc/support/slice.h>
41 : #include <grpc/support/slice_buffer.h>
42 : #include <grpc/support/port_platform.h>
43 : #include <grpc++/support/config.h>
44 :
45 : const int kMaxBufferLength = 8192;
46 :
47 : class GrpcBufferWriter GRPC_FINAL
48 : : public ::grpc::protobuf::io::ZeroCopyOutputStream {
49 : public:
50 16 : explicit GrpcBufferWriter(grpc_byte_buffer** bp,
51 : int block_size = kMaxBufferLength)
52 16 : : block_size_(block_size), byte_count_(0), have_backup_(false) {
53 16 : *bp = grpc_raw_byte_buffer_create(NULL, 0);
54 16 : slice_buffer_ = &(*bp)->data.raw.slice_buffer;
55 16 : }
56 :
57 32 : ~GrpcBufferWriter() GRPC_OVERRIDE {
58 16 : if (have_backup_) {
59 16 : gpr_slice_unref(backup_slice_);
60 : }
61 16 : }
62 :
63 1078 : bool Next(void** data, int* size) GRPC_OVERRIDE {
64 1078 : if (have_backup_) {
65 0 : slice_ = backup_slice_;
66 0 : have_backup_ = false;
67 : } else {
68 1078 : slice_ = gpr_slice_malloc(block_size_);
69 : }
70 1078 : *data = GPR_SLICE_START_PTR(slice_);
71 1078 : byte_count_ += * size = GPR_SLICE_LENGTH(slice_);
72 1078 : gpr_slice_buffer_add(slice_buffer_, slice_);
73 1078 : return true;
74 : }
75 :
76 16 : void BackUp(int count) GRPC_OVERRIDE {
77 16 : gpr_slice_buffer_pop(slice_buffer_);
78 16 : if (count == block_size_) {
79 0 : backup_slice_ = slice_;
80 : } else {
81 : backup_slice_ =
82 16 : gpr_slice_split_tail(&slice_, GPR_SLICE_LENGTH(slice_) - count);
83 16 : gpr_slice_buffer_add(slice_buffer_, slice_);
84 : }
85 16 : have_backup_ = true;
86 16 : byte_count_ -= count;
87 16 : }
88 :
89 0 : grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE { return byte_count_; }
90 :
91 : private:
92 : const int block_size_;
93 : gpr_int64 byte_count_;
94 : gpr_slice_buffer* slice_buffer_;
95 : bool have_backup_;
96 : gpr_slice backup_slice_;
97 : gpr_slice slice_;
98 : };
99 :
100 : class GrpcBufferReader GRPC_FINAL
101 : : public ::grpc::protobuf::io::ZeroCopyInputStream {
102 : public:
103 2793636 : explicit GrpcBufferReader(grpc_byte_buffer* buffer)
104 2793636 : : byte_count_(0), backup_count_(0) {
105 2794305 : grpc_byte_buffer_reader_init(&reader_, buffer);
106 2794028 : }
107 5589991 : ~GrpcBufferReader() GRPC_OVERRIDE {
108 2795166 : grpc_byte_buffer_reader_destroy(&reader_);
109 2794688 : }
110 :
111 5599080 : bool Next(const void** data, int* size) GRPC_OVERRIDE {
112 5599080 : if (backup_count_ > 0) {
113 0 : *data = GPR_SLICE_START_PTR(slice_) + GPR_SLICE_LENGTH(slice_) -
114 0 : backup_count_;
115 0 : GPR_ASSERT(backup_count_ <= INT_MAX);
116 0 : *size = (int)backup_count_;
117 0 : backup_count_ = 0;
118 0 : return true;
119 : }
120 5599080 : if (!grpc_byte_buffer_reader_next(&reader_, &slice_)) {
121 2794613 : return false;
122 : }
123 2815955 : gpr_slice_unref(slice_);
124 2816682 : *data = GPR_SLICE_START_PTR(slice_);
125 2816682 : byte_count_ += * size = GPR_SLICE_LENGTH(slice_);
126 2816682 : return true;
127 : }
128 :
129 0 : void BackUp(int count) GRPC_OVERRIDE { backup_count_ = count; }
130 :
131 0 : bool Skip(int count) GRPC_OVERRIDE {
132 : const void* data;
133 : int size;
134 0 : while (Next(&data, &size)) {
135 0 : if (size >= count) {
136 0 : BackUp(size - count);
137 0 : return true;
138 : }
139 : // size < count;
140 0 : count -= size;
141 : }
142 : // error or we have too large count;
143 0 : return false;
144 : }
145 :
146 0 : grpc::protobuf::int64 ByteCount() const GRPC_OVERRIDE {
147 0 : return byte_count_ - backup_count_;
148 : }
149 :
150 : private:
151 : gpr_int64 byte_count_;
152 : gpr_int64 backup_count_;
153 : grpc_byte_buffer_reader reader_;
154 : gpr_slice slice_;
155 : };
156 :
157 : namespace grpc {
158 :
159 2792300 : Status SerializeProto(const grpc::protobuf::Message& msg,
160 : grpc_byte_buffer** bp) {
161 2792300 : int byte_size = msg.ByteSize();
162 2794520 : if (byte_size <= kMaxBufferLength) {
163 2794504 : gpr_slice slice = gpr_slice_malloc(byte_size);
164 2795835 : GPR_ASSERT(GPR_SLICE_END_PTR(slice) ==
165 : msg.SerializeWithCachedSizesToArray(GPR_SLICE_START_PTR(slice)));
166 2793448 : *bp = grpc_raw_byte_buffer_create(&slice, 1);
167 2794760 : gpr_slice_unref(slice);
168 2794615 : return Status::OK;
169 : } else {
170 16 : GrpcBufferWriter writer(bp);
171 16 : return msg.SerializeToZeroCopyStream(&writer)
172 : ? Status::OK
173 16 : : Status(StatusCode::INTERNAL, "Failed to serialize message");
174 : }
175 : }
176 :
177 2791167 : Status DeserializeProto(grpc_byte_buffer* buffer, grpc::protobuf::Message* msg,
178 : int max_message_size) {
179 2791167 : if (!buffer) {
180 2 : return Status(StatusCode::INTERNAL, "No payload");
181 : }
182 2791165 : GrpcBufferReader reader(buffer);
183 5588246 : ::grpc::protobuf::io::CodedInputStream decoder(&reader);
184 2794020 : if (max_message_size > 0) {
185 100484 : decoder.SetTotalBytesLimit(max_message_size, max_message_size);
186 : }
187 2794020 : if (!msg->ParseFromCodedStream(&decoder)) {
188 0 : return Status(StatusCode::INTERNAL, msg->InitializationErrorString());
189 : }
190 2795034 : if (!decoder.ConsumedEntireMessage()) {
191 0 : return Status(StatusCode::INTERNAL, "Did not read entire message");
192 : }
193 5588200 : return Status::OK;
194 : }
195 :
196 : } // namespace grpc
|