Skip to content

Commit f59db49

Browse files
authored
AVRO-4111: [C++] Replace boost::iostreams with zlib library (#3290)
* AVRO-4111: [C++] Replace boost::iostreams with zlib library * declare buf as uint8_t * fix lint * remove unused cmake variables
1 parent 8e51c7e commit f59db49

File tree

5 files changed

+114
-70
lines changed

5 files changed

+114
-70
lines changed

.github/workflows/test-lang-c++-ARM.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ jobs:
4444
- name: Install dependencies
4545
run: |
4646
sudo apt-get update -q
47-
sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev cmake
47+
sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev zlib1g-dev cmake
4848
4949
- name: Build
5050
run: |

.github/workflows/test-lang-c++.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ jobs:
3939
- uses: actions/checkout@v4
4040

4141
- name: Install Dependencies
42-
run: sudo apt update && sudo apt-get install -qqy cppcheck libboost-all-dev libsnappy-dev libfmt-dev cmake
42+
run: sudo apt update && sudo apt-get install -qqy cppcheck libboost-all-dev libsnappy-dev libfmt-dev zlib1g-dev cmake
4343

4444
- name: Print Versions
4545
run: |

lang/c++/CMakeLists.txt

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ else (SNAPPY_FOUND)
110110
message("Disabled snappy codec. libsnappy not found.")
111111
endif (SNAPPY_FOUND)
112112

113+
find_package(ZLIB REQUIRED)
114+
if (ZLIB_FOUND)
115+
message("Enabled zlib codec")
116+
else (ZLIB_FOUND)
117+
message(FATAL_ERROR "ZLIB is not found")
118+
endif (ZLIB_FOUND)
119+
113120
add_definitions (${Boost_LIB_DIAGNOSTIC_DEFINITIONS})
114121

115122
add_definitions (-DAVRO_VERSION="${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR}.${AVRO_VERSION_PATCH}")
@@ -140,8 +147,8 @@ set_property (TARGET avrocpp
140147
APPEND PROPERTY COMPILE_DEFINITIONS AVRO_DYN_LINK)
141148

142149
add_library (avrocpp_s STATIC ${AVRO_SOURCE_FILES})
143-
target_include_directories(avrocpp_s PRIVATE ${SNAPPY_INCLUDE_DIR})
144-
target_link_libraries(avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} fmt::fmt-header-only)
150+
target_include_directories(avrocpp_s PRIVATE ${SNAPPY_INCLUDE_DIR} ${ZLIB_INCLUDE_DIR})
151+
target_link_libraries(avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} ${ZLIB_LIBRARIES} fmt::fmt-header-only)
145152

146153
set_property (TARGET avrocpp avrocpp_s
147154
APPEND PROPERTY COMPILE_DEFINITIONS AVRO_SOURCE)
@@ -152,8 +159,8 @@ set_target_properties (avrocpp PROPERTIES
152159
set_target_properties (avrocpp_s PROPERTIES
153160
VERSION ${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR}.${AVRO_VERSION_PATCH})
154161

155-
target_link_libraries (avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} fmt::fmt-header-only)
156-
target_include_directories(avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR})
162+
target_link_libraries (avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} ${ZLIB_LIBRARIES} fmt::fmt-header-only)
163+
target_include_directories(avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR} ${ZLIB_INCLUDE_DIR})
157164

158165
target_include_directories(avrocpp PUBLIC
159166
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
@@ -209,7 +216,7 @@ if (AVRO_BUILD_TESTS)
209216

210217
macro (unittest name)
211218
add_executable (${name} test/${name}.cc)
212-
target_link_libraries (${name} avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES})
219+
target_link_libraries (${name} avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} ${ZLIB_LIBRARIES})
213220
add_test (NAME ${name} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
214221
COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${name})
215222
endmacro (unittest)

lang/c++/impl/DataFile.cc

Lines changed: 100 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,12 @@
2323
#include <random>
2424
#include <sstream>
2525

26-
#include <boost/crc.hpp> // for boost::crc_32_type
27-
#include <boost/iostreams/device/file.hpp>
28-
#include <boost/iostreams/filter/gzip.hpp>
29-
#include <boost/iostreams/filter/zlib.hpp>
30-
3126
#ifdef SNAPPY_CODEC_AVAILABLE
3227
#include <snappy.h>
3328
#endif
3429

30+
#include <zlib.h>
31+
3532
namespace avro {
3633
using std::copy;
3734
using std::istringstream;
@@ -55,12 +52,8 @@ const string AVRO_SNAPPY_CODEC = "snappy";
5552
const size_t minSyncInterval = 32;
5653
const size_t maxSyncInterval = 1u << 30;
5754

58-
boost::iostreams::zlib_params get_zlib_params() {
59-
boost::iostreams::zlib_params ret;
60-
ret.method = boost::iostreams::zlib::deflated;
61-
ret.noheader = true;
62-
return ret;
63-
}
55+
// Recommended by https://www.zlib.net/zlib_how.html
56+
const size_t zlibBufGrowSize = 128 * 1024;
6457

6558
} // namespace
6659

@@ -144,21 +137,45 @@ void DataFileWriterBase::sync() {
144137
std::unique_ptr<InputStream> in = memoryInputStream(*buffer_);
145138
copy(*in, *stream_);
146139
} else if (codec_ == DEFLATE_CODEC) {
147-
std::vector<char> buf;
140+
std::vector<uint8_t> buf;
148141
{
149-
boost::iostreams::filtering_ostream os;
150-
os.push(boost::iostreams::zlib_compressor(get_zlib_params()));
151-
os.push(boost::iostreams::back_inserter(buf));
152-
const uint8_t *data;
153-
size_t len;
142+
z_stream zs;
143+
zs.zalloc = Z_NULL;
144+
zs.zfree = Z_NULL;
145+
zs.opaque = Z_NULL;
146+
147+
int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY);
148+
if (ret != Z_OK) {
149+
throw Exception("Failed to initialize deflate, error: {}", ret);
150+
}
154151

155152
std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
156-
while (input->next(&data, &len)) {
157-
boost::iostreams::write(os, reinterpret_cast<const char *>(data), len);
153+
const uint8_t *data;
154+
size_t len;
155+
while (ret != Z_STREAM_END && input->next(&data, &len)) {
156+
zs.avail_in = static_cast<uInt>(len);
157+
zs.next_in = const_cast<Bytef *>(data);
158+
bool flush = (zs.total_in + len) >= buffer_->byteCount();
159+
do {
160+
if (zs.total_out == buf.size()) {
161+
buf.resize(buf.size() + zlibBufGrowSize);
162+
}
163+
zs.avail_out = static_cast<uInt>(buf.size() - zs.total_out);
164+
zs.next_out = buf.data() + zs.total_out;
165+
ret = deflate(&zs, flush ? Z_FINISH : Z_NO_FLUSH);
166+
if (ret == Z_STREAM_END) {
167+
break;
168+
}
169+
if (ret != Z_OK) {
170+
throw Exception("Failed to deflate, error: {}", ret);
171+
}
172+
} while (zs.avail_out == 0);
158173
}
174+
175+
buf.resize(zs.total_out);
176+
(void) deflateEnd(&zs);
159177
} // make sure all is flushed
160-
std::unique_ptr<InputStream> in = memoryInputStream(
161-
reinterpret_cast<const uint8_t *>(buf.data()), buf.size());
178+
std::unique_ptr<InputStream> in = memoryInputStream(buf.data(), buf.size());
162179
int64_t byteCount = buf.size();
163180
avro::encode(*encoderPtr_, byteCount);
164181
encoderPtr_->flush();
@@ -167,35 +184,28 @@ void DataFileWriterBase::sync() {
167184
} else if (codec_ == SNAPPY_CODEC) {
168185
std::vector<char> temp;
169186
std::string compressed;
170-
boost::crc_32_type crc;
171-
{
172-
boost::iostreams::filtering_ostream os;
173-
os.push(boost::iostreams::back_inserter(temp));
174-
const uint8_t *data;
175-
size_t len;
176187

177-
std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
178-
while (input->next(&data, &len)) {
179-
boost::iostreams::write(os, reinterpret_cast<const char *>(data),
180-
len);
181-
}
182-
} // make sure all is flushed
188+
const uint8_t *data;
189+
size_t len;
190+
std::unique_ptr<InputStream> input = memoryInputStream(*buffer_);
191+
while (input->next(&data, &len)) {
192+
temp.insert(temp.end(), reinterpret_cast<const char *>(data),
193+
reinterpret_cast<const char *>(data) + len);
194+
}
183195

184-
crc.process_bytes(reinterpret_cast<const char *>(temp.data()),
185-
temp.size());
186196
// For Snappy, add the CRC32 checksum
187-
auto checksum = crc();
197+
auto checksum = crc32(0, reinterpret_cast<const Bytef *>(temp.data()),
198+
static_cast<uInt>(temp.size()));
188199

189200
// Now compress
190201
size_t compressed_size = snappy::Compress(
191202
reinterpret_cast<const char *>(temp.data()), temp.size(),
192203
&compressed);
204+
193205
temp.clear();
194-
{
195-
boost::iostreams::filtering_ostream os;
196-
os.push(boost::iostreams::back_inserter(temp));
197-
boost::iostreams::write(os, compressed.c_str(), compressed_size);
198-
}
206+
temp.insert(temp.end(), compressed.c_str(),
207+
compressed.c_str() + compressed_size);
208+
199209
temp.push_back(static_cast<char>((checksum >> 24) & 0xFF));
200210
temp.push_back(static_cast<char>((checksum >> 16) & 0xFF));
201211
temp.push_back(static_cast<char>((checksum >> 8) & 0xFF));
@@ -285,8 +295,7 @@ void DataFileReaderBase::init(const ValidSchema &readerSchema) {
285295
static void drain(InputStream &in) {
286296
const uint8_t *p = nullptr;
287297
size_t n = 0;
288-
while (in.next(&p, &n))
289-
;
298+
while (in.next(&p, &n));
290299
}
291300

292301
char hex(unsigned int x) {
@@ -384,7 +393,6 @@ void DataFileReaderBase::readDataBlock() {
384393
dataStream_ = std::move(st);
385394
#ifdef SNAPPY_CODEC_AVAILABLE
386395
} else if (codec_ == SNAPPY_CODEC) {
387-
boost::crc_32_type crc;
388396
uint32_t checksum = 0;
389397
compressed_.clear();
390398
uncompressed.clear();
@@ -408,35 +416,67 @@ void DataFileReaderBase::readDataBlock() {
408416
throw Exception(
409417
"Snappy Compression reported an error when decompressing");
410418
}
411-
crc.process_bytes(uncompressed.c_str(), uncompressed.size());
412-
auto c = crc();
419+
auto c = crc32(0, reinterpret_cast<const Bytef *>(uncompressed.c_str()),
420+
static_cast<uInt>(uncompressed.size()));
413421
if (checksum != c) {
414422
throw Exception(
415423
"Checksum did not match for Snappy compression: Expected: {}, computed: {}",
416424
checksum, c);
417425
}
418-
os_.reset(new boost::iostreams::filtering_istream());
419-
os_->push(
420-
boost::iostreams::basic_array_source<char>(uncompressed.c_str(),
421-
uncompressed.size()));
422-
std::unique_ptr<InputStream> in = istreamInputStream(*os_);
426+
427+
std::unique_ptr<InputStream> in = memoryInputStream(
428+
reinterpret_cast<const uint8_t *>(uncompressed.c_str()),
429+
uncompressed.size());
423430

424431
dataDecoder_->init(*in);
425432
dataStream_ = std::move(in);
426433
#endif
427434
} else {
428435
compressed_.clear();
429-
const uint8_t *data;
430-
size_t len;
431-
while (st->next(&data, &len)) {
432-
compressed_.insert(compressed_.end(), data, data + len);
436+
uncompressed.clear();
437+
438+
{
439+
z_stream zs;
440+
zs.zalloc = Z_NULL;
441+
zs.zfree = Z_NULL;
442+
zs.opaque = Z_NULL;
443+
zs.avail_in = 0;
444+
zs.next_in = Z_NULL;
445+
446+
int ret = inflateInit2(&zs, /*windowBits=*/-15);
447+
if (ret != Z_OK) {
448+
throw Exception("Failed to initialize inflate, error: {}", ret);
449+
}
450+
451+
const uint8_t *data;
452+
size_t len;
453+
while (ret != Z_STREAM_END && st->next(&data, &len)) {
454+
zs.avail_in = static_cast<uInt>(len);
455+
zs.next_in = const_cast<Bytef *>(data);
456+
do {
457+
if (zs.total_out == uncompressed.size()) {
458+
uncompressed.resize(uncompressed.size() + zlibBufGrowSize);
459+
}
460+
zs.avail_out = static_cast<uInt>(uncompressed.size() - zs.total_out);
461+
zs.next_out = reinterpret_cast<Bytef *>(uncompressed.data() + zs.total_out);
462+
ret = inflate(&zs, Z_NO_FLUSH);
463+
if (ret == Z_STREAM_END) {
464+
break;
465+
}
466+
if (ret != Z_OK) {
467+
throw Exception("Failed to inflate, error: {}", ret);
468+
}
469+
} while (zs.avail_out == 0);
470+
}
471+
472+
uncompressed.resize(zs.total_out);
473+
(void) inflateEnd(&zs);
433474
}
434-
os_.reset(new boost::iostreams::filtering_istream());
435-
os_->push(boost::iostreams::zlib_decompressor(get_zlib_params()));
436-
os_->push(boost::iostreams::basic_array_source<char>(
437-
compressed_.data(), compressed_.size()));
438475

439-
std::unique_ptr<InputStream> in = nonSeekableIstreamInputStream(*os_);
476+
std::unique_ptr<InputStream> in = memoryInputStream(
477+
reinterpret_cast<const uint8_t *>(uncompressed.c_str()),
478+
uncompressed.size());
479+
440480
dataDecoder_->init(*in);
441481
dataStream_ = std::move(in);
442482
}

lang/c++/include/avro/DataFile.hh

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
#include <string>
3232
#include <vector>
3333

34-
#include <boost/iostreams/filtering_stream.hpp>
35-
3634
namespace avro {
3735

3836
/** Specify type of compression to use when writing data files. */
@@ -216,7 +214,6 @@ class AVRO_DECL DataFileReaderBase {
216214
DataFileSync sync_{};
217215

218216
// for compressed buffer
219-
std::unique_ptr<boost::iostreams::filtering_istream> os_;
220217
std::vector<char> compressed_;
221218
std::string uncompressed;
222219
void readHeader();

0 commit comments

Comments
 (0)