Line data Source code
1 : // Protocol Buffers - Google's data interchange format
2 : // Copyright 2008 Google Inc. All rights reserved.
3 : // https://developers.google.com/protocol-buffers/
4 : //
5 : // Redistribution and use in source and binary forms, with or without
6 : // modification, are permitted provided that the following conditions are
7 : // met:
8 : //
9 : // * Redistributions of source code must retain the above copyright
10 : // notice, this list of conditions and the following disclaimer.
11 : // * Redistributions in binary form must reproduce the above
12 : // copyright notice, this list of conditions and the following disclaimer
13 : // in the documentation and/or other materials provided with the
14 : // distribution.
15 : // * Neither the name of Google Inc. nor the names of its
16 : // contributors may be used to endorse or promote products derived from
17 : // this software without specific prior written permission.
18 : //
19 : // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 : // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 : // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 : // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 : // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 : // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 : // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 : // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 : // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 : // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 : // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 :
31 : // Author: kenton@google.com (Kenton Varda)
32 : // Based on original Protocol Buffers design by
33 : // Sanjay Ghemawat, Jeff Dean, and others.
34 :
35 : #ifdef _MSC_VER
36 : #include <io.h>
37 : #else
38 : #include <unistd.h>
39 : #include <sys/types.h>
40 : #include <sys/stat.h>
41 : #include <fcntl.h>
42 : #endif
43 : #include <errno.h>
44 : #include <iostream>
45 : #include <algorithm>
46 :
47 : #include <google/protobuf/io/zero_copy_stream_impl.h>
48 : #include <google/protobuf/stubs/common.h>
49 : #include <google/protobuf/stubs/logging.h>
50 : #include <google/protobuf/stubs/stl_util.h>
51 :
52 :
53 : namespace google {
54 : namespace protobuf {
55 : namespace io {
56 :
57 : #ifdef _WIN32
58 : // Win32 lseek is broken: If invoked on a non-seekable file descriptor, its
59 : // return value is undefined. We re-define it to always produce an error.
60 : #define lseek(fd, offset, origin) ((off_t)-1)
61 : #endif
62 :
63 : namespace {
64 :
65 : // EINTR sucks.
66 135 : int close_no_eintr(int fd) {
67 : int result;
68 135 : do {
69 135 : result = close(fd);
70 0 : } while (result < 0 && errno == EINTR);
71 135 : return result;
72 : }
73 :
74 : } // namespace
75 :
76 :
77 : // ===================================================================
78 :
79 143 : FileInputStream::FileInputStream(int file_descriptor, int block_size)
80 : : copying_input_(file_descriptor),
81 286 : impl_(©ing_input_, block_size) {
82 143 : }
83 :
84 278 : FileInputStream::~FileInputStream() {}
85 :
86 0 : bool FileInputStream::Close() {
87 0 : return copying_input_.Close();
88 : }
89 :
90 214 : bool FileInputStream::Next(const void** data, int* size) {
91 214 : return impl_.Next(data, size);
92 : }
93 :
94 0 : void FileInputStream::BackUp(int count) {
95 0 : impl_.BackUp(count);
96 0 : }
97 :
98 0 : bool FileInputStream::Skip(int count) {
99 0 : return impl_.Skip(count);
100 : }
101 :
102 0 : int64 FileInputStream::ByteCount() const {
103 0 : return impl_.ByteCount();
104 : }
105 :
106 143 : FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
107 : int file_descriptor)
108 : : file_(file_descriptor),
109 : close_on_delete_(false),
110 : is_closed_(false),
111 : errno_(0),
112 286 : previous_seek_failed_(false) {
113 143 : }
114 :
115 286 : FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
116 143 : if (close_on_delete_) {
117 135 : if (!Close()) {
118 0 : GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
119 : }
120 : }
121 143 : }
122 :
123 135 : bool FileInputStream::CopyingFileInputStream::Close() {
124 135 : GOOGLE_CHECK(!is_closed_);
125 :
126 135 : is_closed_ = true;
127 135 : if (close_no_eintr(file_) != 0) {
128 : // The docs on close() do not specify whether a file descriptor is still
129 : // open after close() fails with EIO. However, the glibc source code
130 : // seems to indicate that it is not.
131 0 : errno_ = errno;
132 0 : return false;
133 : }
134 :
135 : return true;
136 : }
137 :
138 214 : int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
139 214 : GOOGLE_CHECK(!is_closed_);
140 :
141 : int result;
142 214 : do {
143 428 : result = read(file_, buffer, size);
144 0 : } while (result < 0 && errno == EINTR);
145 :
146 214 : if (result < 0) {
147 : // Read error (not EOF).
148 0 : errno_ = errno;
149 : }
150 :
151 214 : return result;
152 : }
153 :
154 0 : int FileInputStream::CopyingFileInputStream::Skip(int count) {
155 0 : GOOGLE_CHECK(!is_closed_);
156 :
157 0 : if (!previous_seek_failed_ &&
158 0 : lseek(file_, count, SEEK_CUR) != (off_t)-1) {
159 : // Seek succeeded.
160 : return count;
161 : } else {
162 : // Failed to seek.
163 :
164 : // Note to self: Don't seek again. This file descriptor doesn't
165 : // support it.
166 0 : previous_seek_failed_ = true;
167 :
168 : // Use the default implementation.
169 0 : return CopyingInputStream::Skip(count);
170 : }
171 : }
172 :
173 : // ===================================================================
174 :
175 8 : FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
176 : : copying_output_(file_descriptor),
177 16 : impl_(©ing_output_, block_size) {
178 8 : }
179 :
180 16 : FileOutputStream::~FileOutputStream() {
181 8 : impl_.Flush();
182 8 : }
183 :
184 0 : bool FileOutputStream::Close() {
185 0 : bool flush_succeeded = impl_.Flush();
186 0 : return copying_output_.Close() && flush_succeeded;
187 : }
188 :
189 0 : bool FileOutputStream::Flush() {
190 0 : return impl_.Flush();
191 : }
192 :
193 18 : bool FileOutputStream::Next(void** data, int* size) {
194 18 : return impl_.Next(data, size);
195 : }
196 :
197 8 : void FileOutputStream::BackUp(int count) {
198 8 : impl_.BackUp(count);
199 8 : }
200 :
201 0 : int64 FileOutputStream::ByteCount() const {
202 0 : return impl_.ByteCount();
203 : }
204 :
205 8 : FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
206 : int file_descriptor)
207 : : file_(file_descriptor),
208 : close_on_delete_(false),
209 : is_closed_(false),
210 16 : errno_(0) {
211 8 : }
212 :
213 16 : FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
214 8 : if (close_on_delete_) {
215 0 : if (!Close()) {
216 0 : GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
217 : }
218 : }
219 8 : }
220 :
221 0 : bool FileOutputStream::CopyingFileOutputStream::Close() {
222 0 : GOOGLE_CHECK(!is_closed_);
223 :
224 0 : is_closed_ = true;
225 0 : if (close_no_eintr(file_) != 0) {
226 : // The docs on close() do not specify whether a file descriptor is still
227 : // open after close() fails with EIO. However, the glibc source code
228 : // seems to indicate that it is not.
229 0 : errno_ = errno;
230 0 : return false;
231 : }
232 :
233 : return true;
234 : }
235 :
236 18 : bool FileOutputStream::CopyingFileOutputStream::Write(
237 : const void* buffer, int size) {
238 18 : GOOGLE_CHECK(!is_closed_);
239 18 : int total_written = 0;
240 :
241 18 : const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
242 :
243 54 : while (total_written < size) {
244 : int bytes;
245 18 : do {
246 18 : bytes = write(file_, buffer_base + total_written, size - total_written);
247 0 : } while (bytes < 0 && errno == EINTR);
248 :
249 18 : if (bytes <= 0) {
250 : // Write error.
251 :
252 : // FIXME(kenton): According to the man page, if write() returns zero,
253 : // there was no error; write() simply did not write anything. It's
254 : // unclear under what circumstances this might happen, but presumably
255 : // errno won't be set in this case. I am confused as to how such an
256 : // event should be handled. For now I'm treating it as an error, since
257 : // retrying seems like it could lead to an infinite loop. I suspect
258 : // this never actually happens anyway.
259 :
260 0 : if (bytes < 0) {
261 0 : errno_ = errno;
262 : }
263 : return false;
264 : }
265 18 : total_written += bytes;
266 : }
267 :
268 : return true;
269 : }
270 :
271 : // ===================================================================
272 :
273 0 : IstreamInputStream::IstreamInputStream(istream* input, int block_size)
274 : : copying_input_(input),
275 0 : impl_(©ing_input_, block_size) {
276 0 : }
277 :
278 0 : IstreamInputStream::~IstreamInputStream() {}
279 :
280 0 : bool IstreamInputStream::Next(const void** data, int* size) {
281 0 : return impl_.Next(data, size);
282 : }
283 :
284 0 : void IstreamInputStream::BackUp(int count) {
285 0 : impl_.BackUp(count);
286 0 : }
287 :
288 0 : bool IstreamInputStream::Skip(int count) {
289 0 : return impl_.Skip(count);
290 : }
291 :
292 0 : int64 IstreamInputStream::ByteCount() const {
293 0 : return impl_.ByteCount();
294 : }
295 :
296 0 : IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
297 : istream* input)
298 0 : : input_(input) {
299 0 : }
300 :
301 0 : IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
302 :
303 0 : int IstreamInputStream::CopyingIstreamInputStream::Read(
304 : void* buffer, int size) {
305 0 : input_->read(reinterpret_cast<char*>(buffer), size);
306 0 : int result = input_->gcount();
307 0 : if (result == 0 && input_->fail() && !input_->eof()) {
308 : return -1;
309 : }
310 0 : return result;
311 : }
312 :
313 : // ===================================================================
314 :
315 0 : OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size)
316 : : copying_output_(output),
317 0 : impl_(©ing_output_, block_size) {
318 0 : }
319 :
320 0 : OstreamOutputStream::~OstreamOutputStream() {
321 0 : impl_.Flush();
322 0 : }
323 :
324 0 : bool OstreamOutputStream::Next(void** data, int* size) {
325 0 : return impl_.Next(data, size);
326 : }
327 :
328 0 : void OstreamOutputStream::BackUp(int count) {
329 0 : impl_.BackUp(count);
330 0 : }
331 :
332 0 : int64 OstreamOutputStream::ByteCount() const {
333 0 : return impl_.ByteCount();
334 : }
335 :
336 0 : OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
337 : ostream* output)
338 0 : : output_(output) {
339 0 : }
340 :
341 0 : OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
342 0 : }
343 :
344 0 : bool OstreamOutputStream::CopyingOstreamOutputStream::Write(
345 : const void* buffer, int size) {
346 0 : output_->write(reinterpret_cast<const char*>(buffer), size);
347 0 : return output_->good();
348 : }
349 :
350 : // ===================================================================
351 :
352 0 : ConcatenatingInputStream::ConcatenatingInputStream(
353 : ZeroCopyInputStream* const streams[], int count)
354 0 : : streams_(streams), stream_count_(count), bytes_retired_(0) {
355 0 : }
356 :
357 0 : ConcatenatingInputStream::~ConcatenatingInputStream() {
358 0 : }
359 :
360 0 : bool ConcatenatingInputStream::Next(const void** data, int* size) {
361 0 : while (stream_count_ > 0) {
362 0 : if (streams_[0]->Next(data, size)) return true;
363 :
364 : // That stream is done. Advance to the next one.
365 0 : bytes_retired_ += streams_[0]->ByteCount();
366 0 : ++streams_;
367 0 : --stream_count_;
368 : }
369 :
370 : // No more streams.
371 : return false;
372 : }
373 :
374 0 : void ConcatenatingInputStream::BackUp(int count) {
375 0 : if (stream_count_ > 0) {
376 0 : streams_[0]->BackUp(count);
377 : } else {
378 0 : GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
379 : }
380 0 : }
381 :
382 0 : bool ConcatenatingInputStream::Skip(int count) {
383 0 : while (stream_count_ > 0) {
384 : // Assume that ByteCount() can be used to find out how much we actually
385 : // skipped when Skip() fails.
386 0 : int64 target_byte_count = streams_[0]->ByteCount() + count;
387 0 : if (streams_[0]->Skip(count)) return true;
388 :
389 : // Hit the end of the stream. Figure out how many more bytes we still have
390 : // to skip.
391 0 : int64 final_byte_count = streams_[0]->ByteCount();
392 : GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
393 0 : count = target_byte_count - final_byte_count;
394 :
395 : // That stream is done. Advance to the next one.
396 0 : bytes_retired_ += final_byte_count;
397 0 : ++streams_;
398 0 : --stream_count_;
399 : }
400 :
401 : return false;
402 : }
403 :
404 0 : int64 ConcatenatingInputStream::ByteCount() const {
405 0 : if (stream_count_ == 0) {
406 0 : return bytes_retired_;
407 : } else {
408 0 : return bytes_retired_ + streams_[0]->ByteCount();
409 : }
410 : }
411 :
412 :
413 : // ===================================================================
414 :
415 0 : LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
416 : int64 limit)
417 0 : : input_(input), limit_(limit) {
418 0 : prior_bytes_read_ = input_->ByteCount();
419 0 : }
420 :
421 0 : LimitingInputStream::~LimitingInputStream() {
422 : // If we overshot the limit, back up.
423 0 : if (limit_ < 0) input_->BackUp(-limit_);
424 0 : }
425 :
426 0 : bool LimitingInputStream::Next(const void** data, int* size) {
427 0 : if (limit_ <= 0) return false;
428 0 : if (!input_->Next(data, size)) return false;
429 :
430 0 : limit_ -= *size;
431 0 : if (limit_ < 0) {
432 : // We overshot the limit. Reduce *size to hide the rest of the buffer.
433 0 : *size += limit_;
434 : }
435 : return true;
436 : }
437 :
438 0 : void LimitingInputStream::BackUp(int count) {
439 0 : if (limit_ < 0) {
440 0 : input_->BackUp(count - limit_);
441 0 : limit_ = count;
442 : } else {
443 0 : input_->BackUp(count);
444 0 : limit_ += count;
445 : }
446 0 : }
447 :
448 0 : bool LimitingInputStream::Skip(int count) {
449 0 : if (count > limit_) {
450 0 : if (limit_ < 0) return false;
451 0 : input_->Skip(limit_);
452 0 : limit_ = 0;
453 0 : return false;
454 : } else {
455 0 : if (!input_->Skip(count)) return false;
456 0 : limit_ -= count;
457 0 : return true;
458 : }
459 : }
460 :
461 0 : int64 LimitingInputStream::ByteCount() const {
462 0 : if (limit_ < 0) {
463 0 : return input_->ByteCount() + limit_ - prior_bytes_read_;
464 : } else {
465 0 : return input_->ByteCount() - prior_bytes_read_;
466 : }
467 : }
468 :
469 :
470 : // ===================================================================
471 :
472 : } // namespace io
473 : } // namespace protobuf
474 138 : } // namespace google
|