diff --git a/include/boost/iostreams/filter/gzip.hpp b/include/boost/iostreams/filter/gzip.hpp index e236983e0..e13a325d9 100644 --- a/include/boost/iostreams/filter/gzip.hpp +++ b/include/boost/iostreams/filter/gzip.hpp @@ -193,6 +193,7 @@ class basic_gzip_compressor : basic_zlib_compressor { struct category : dual_use, filter_tag, + flushable_tag, multichar_tag, closable_tag { }; @@ -271,6 +272,12 @@ class basic_gzip_compressor : basic_zlib_compressor { } close_impl(); } + + template + bool flush(Sink& snk) { + base_type::force_flush(snk); + return true; + } private: static gzip_params normalize_params(gzip_params p); void prepare_footer(); diff --git a/include/boost/iostreams/filter/symmetric.hpp b/include/boost/iostreams/filter/symmetric.hpp index f18089f4f..6ea667882 100644 --- a/include/boost/iostreams/filter/symmetric.hpp +++ b/include/boost/iostreams/filter/symmetric.hpp @@ -185,6 +185,26 @@ class symmetric_filter { close_impl(); } } + + template + void force_flush(Sink &snk) { + if (!(state() & f_write)) + begin_write(); + + buffer_type& buf = pimpl_->buf_; + char_type dummy; + const char_type* end = &dummy; + // Repeatedly invokes force_flush(), till the time zlib::deflate returns zlib::Z_BUF_ERROR. + // zlib::Z_BUF_ERROR ensures that no more futher progress is possible. + bool again = true; + while(again) { + if(buf.ptr() != buf.eptr()) { + again = filter().force_flush(end, end, buf.ptr(), buf.eptr()); + } + flush(snk); + } + } + SymmetricFilter& filter() { return *pimpl_; } string_type unconsumed_input() const; diff --git a/include/boost/iostreams/filter/zlib.hpp b/include/boost/iostreams/filter/zlib.hpp index e57870a51..c946502e5 100644 --- a/include/boost/iostreams/filter/zlib.hpp +++ b/include/boost/iostreams/filter/zlib.hpp @@ -84,6 +84,7 @@ BOOST_IOSTREAMS_DECL extern const int buf_error; BOOST_IOSTREAMS_DECL extern const int finish; BOOST_IOSTREAMS_DECL extern const int no_flush; BOOST_IOSTREAMS_DECL extern const int sync_flush; +BOOST_IOSTREAMS_DECL extern const int full_flush; // Code for current OS @@ -232,6 +233,8 @@ class zlib_compressor_impl : public zlib_base, public zlib_allocator { bool filter( const char*& src_begin, const char* src_end, char*& dest_begin, char* dest_end, bool flush ); void close(); + bool force_flush( const char*& src_begin, const char* src_end, + char*& dest_begin, char* dest_end); }; // @@ -361,6 +364,17 @@ bool zlib_compressor_impl::filter template void zlib_compressor_impl::close() { reset(true, true); } +template +bool zlib_compressor_impl::force_flush + ( const char*& src_begin, const char* src_end, + char*& dest_begin, char* dest_end) +{ + before(src_begin, src_end, dest_begin, dest_end); + int result = xdeflate(zlib::full_flush); + after(src_begin, dest_begin, true); + return result == zlib::okay; +} + //------------------Implementation of zlib_decompressor_impl------------------// template diff --git a/src/zlib.cpp b/src/zlib.cpp index 66380689b..484676ecb 100644 --- a/src/zlib.cpp +++ b/src/zlib.cpp @@ -58,6 +58,7 @@ const int buf_error = Z_BUF_ERROR; const int finish = Z_FINISH; const int no_flush = Z_NO_FLUSH; const int sync_flush = Z_SYNC_FLUSH; +const int full_flush = Z_FULL_FLUSH; // Code for current OS diff --git a/test/gzip_test.cpp b/test/gzip_test.cpp index eb7d4cc3d..770ea20e5 100644 --- a/test/gzip_test.cpp +++ b/test/gzip_test.cpp @@ -117,6 +117,35 @@ void array_source_test() BOOST_CHECK_EQUAL(data, res); } + +void flush_test() +{ + std::string encoded; + std::vector result; + + static const int flush_marker_bytes[] = {'\x00', '\x00', '\xff', '\xff', '\x00', '\x00', '\xff', '\xff'}; + std::vector target(flush_marker_bytes, flush_marker_bytes + sizeof(flush_marker_bytes) / sizeof(flush_marker_bytes[0]) ); + + filtering_ostream out; + out.push(gzip_compressor()); + out.push(io::back_inserter(encoded)); + write_data_in_chunks(out); + out.flush(); + + for(int i = 0; i < 4; i++) { + result.push_back(encoded[encoded.length() - 4 + i]); + } + + write_data_in_chunks(out); + out.flush(); + + for(int i = 0; i < 4; i++) { + result.push_back(encoded[encoded.length() - 4 + i]); + } + + BOOST_CHECK(std::equal(result.begin(), result.end(), target.begin())); +} + #if defined(BOOST_MSVC) # pragma warning(push) # pragma warning(disable:4309) // Truncation of constant value @@ -239,6 +268,7 @@ test_suite* init_unit_test_suite(int, char* []) test->add(BOOST_TEST_CASE(&compression_test)); test->add(BOOST_TEST_CASE(&multiple_member_test)); test->add(BOOST_TEST_CASE(&array_source_test)); + test->add(BOOST_TEST_CASE(&flush_test)); test->add(BOOST_TEST_CASE(&header_test)); test->add(BOOST_TEST_CASE(&empty_file_test)); test->add(BOOST_TEST_CASE(&multipart_test));