From d40b4b5d8fad62e4c155f654323b1b68266194e7 Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Thu, 7 Mar 2024 20:28:53 +0800 Subject: [PATCH] Revert "Revert "Reuse Serializer (#8849)"" This reverts commit 8f68446991ce2fb7ceeb4b4656ba80077fe40fa5. --- velox/common/memory/ByteStream.h | 9 ++ velox/common/memory/HashStringAllocator.h | 2 +- velox/common/memory/StreamArena.cpp | 10 ++ velox/common/memory/StreamArena.h | 4 + velox/common/memory/tests/ByteStreamTest.cpp | 12 ++ velox/exec/PartitionedOutput.cpp | 4 +- velox/serializers/PrestoSerializer.cpp | 50 +++++- .../tests/PrestoSerializerTest.cpp | 148 +++++++++++++++--- velox/vector/VectorStream.h | 10 ++ 9 files changed, 215 insertions(+), 34 deletions(-) diff --git a/velox/common/memory/ByteStream.h b/velox/common/memory/ByteStream.h index 677d6659df51..80e89d0ccb8c 100644 --- a/velox/common/memory/ByteStream.h +++ b/velox/common/memory/ByteStream.h @@ -246,7 +246,16 @@ class ByteOutputStream { return ranges_; } + /// Prepares 'this' for writing. Can be called several times, + /// e.g. PrestoSerializer resets these. The memory formerly backing + /// 'ranges_' is not owned and the caller needs to recycle or free + /// this independently. void startWrite(int32_t initialSize) { + ranges_.clear(); + isReversed_ = false; + allocatedBytes_ = 0; + current_ = nullptr; + lastRangeEnd_ = 0; extend(initialSize); } diff --git a/velox/common/memory/HashStringAllocator.h b/velox/common/memory/HashStringAllocator.h index 3a5ac0f69193..2c3ef7d8dcb7 100644 --- a/velox/common/memory/HashStringAllocator.h +++ b/velox/common/memory/HashStringAllocator.h @@ -326,7 +326,7 @@ class HashStringAllocator : public StreamArena { } // Frees all memory associated with 'this' and leaves 'this' ready for reuse. - void clear(); + void clear() override; memory::MemoryPool* FOLLY_NONNULL pool() const { return pool_.pool(); diff --git a/velox/common/memory/StreamArena.cpp b/velox/common/memory/StreamArena.cpp index e8778750d2fc..1153afbb8cf6 100644 --- a/velox/common/memory/StreamArena.cpp +++ b/velox/common/memory/StreamArena.cpp @@ -76,4 +76,14 @@ void StreamArena::newTinyRange( range->buffer = reinterpret_cast(tinyRanges_.back().data()); range->size = bytes; } +void StreamArena::clear() { + allocations_.clear(); + pool_->freeNonContiguous(allocation_); + currentRun_ = 0; + currentOffset_ = 0; + largeAllocations_.clear(); + size_ = 0; + tinyRanges_.clear(); +} + } // namespace facebook::velox diff --git a/velox/common/memory/StreamArena.h b/velox/common/memory/StreamArena.h index b0a0b9498ec9..d46f00c436f4 100644 --- a/velox/common/memory/StreamArena.h +++ b/velox/common/memory/StreamArena.h @@ -62,6 +62,10 @@ class StreamArena { return pool_; } + /// Restores 'this' to post-construction state. Used in recycling streams for + /// serilizers. + virtual void clear(); + private: memory::MemoryPool* const pool_; const memory::MachinePageCount allocationQuantum_{2}; diff --git a/velox/common/memory/tests/ByteStreamTest.cpp b/velox/common/memory/tests/ByteStreamTest.cpp index 5c5aad545892..6f751a3811aa 100644 --- a/velox/common/memory/tests/ByteStreamTest.cpp +++ b/velox/common/memory/tests/ByteStreamTest.cpp @@ -381,3 +381,15 @@ TEST_F(ByteStreamTest, nextViewNegativeSize) { ByteInputStream byteStream({ByteRange{buffer, kBufferSize, 0}}); EXPECT_THROW(byteStream.nextView(-100), VeloxRuntimeError); } + +TEST_F(ByteStreamTest, reuse) { + auto arena = newArena(); + ByteOutputStream stream(arena.get()); + char bytes[10000] = {}; + for (auto i = 0; i < 10; ++i) { + arena->clear(); + stream.startWrite(i * 100); + stream.appendStringView(std::string_view(bytes, sizeof(bytes))); + EXPECT_EQ(sizeof(bytes), stream.size()); + } +} diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index e72253dd192b..8bd0e0fe4875 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -73,7 +73,7 @@ BlockingReason Destination::flush( OutputBufferManager& bufferManager, const std::function& bufferReleaseFn, ContinueFuture* future) { - if (!current_) { + if (!current_ || rowsInCurrent_ == 0) { return BlockingReason::kNotBlocked; } @@ -87,7 +87,7 @@ BlockingReason Destination::flush( const int64_t flushedRows = rowsInCurrent_; current_->flush(&stream); - current_.reset(); + current_->clear(); const int64_t flushedBytes = stream.tellp(); diff --git a/velox/serializers/PrestoSerializer.cpp b/velox/serializers/PrestoSerializer.cpp index a513af54c3d8..fc4603dcf51a 100644 --- a/velox/serializers/PrestoSerializer.cpp +++ b/velox/serializers/PrestoSerializer.cpp @@ -1351,13 +1351,15 @@ class VectorStream { int32_t initialNumRows, const SerdeOpts& opts) : type_(type), - encoding_(getEncoding(encoding, vector)), - opts_(opts), streamArena_(streamArena), + useLosslessTimestamp_(opts.useLosslessTimestamp), + nullsFirst_(opts.nullsFirst), + isLongDecimal_(type_->isLongDecimal()), + opts_(opts), + encoding_(getEncoding(encoding, vector)), nulls_(streamArena, true, true), lengths_(streamArena), - values_(streamArena), - isLongDecimal_(type_->isLongDecimal()) { + values_(streamArena) { if (initialNumRows == 0) { initializeHeader(typeToEncodingName(type), *streamArena); return; @@ -1680,6 +1682,27 @@ class VectorStream { return isLongDecimal_; } + void clear() { + encoding_ = std::nullopt; + initializeHeader(typeToEncodingName(type_), *streamArena_); + nonNullCount_ = 0; + nullCount_ = 0; + totalLength_ = 0; + if (hasLengths_) { + lengths_.startWrite(lengths_.size()); + if (type_->kind() == TypeKind::ROW || type_->kind() == TypeKind::ARRAY || + type_->kind() == TypeKind::MAP) { + // A complex type has a 0 as first length. + lengths_.appendOne(0); + } + } + nulls_.startWrite(nulls_.size()); + values_.startWrite(values_.size()); + for (auto& child : children_) { + child->clear(); + } + } + private: void initializeFlatStream( std::optional vector, @@ -1727,10 +1750,15 @@ class VectorStream { } const TypePtr type_; - std::optional encoding_; + StreamArena* const streamArena_; + /// Indicates whether to serialize timestamps with nanosecond precision. + /// If false, they are serialized with millisecond precision which is + /// compatible with presto. + const bool useLosslessTimestamp_; + const bool nullsFirst_; + const bool isLongDecimal_; const SerdeOpts opts_; - - StreamArena* streamArena_; + std::optional encoding_; int32_t nonNullCount_{0}; int32_t nullCount_{0}; int32_t totalLength_{0}; @@ -1740,7 +1768,6 @@ class VectorStream { ByteOutputStream lengths_; ByteOutputStream values_; std::vector> children_; - const bool isLongDecimal_; bool isDictionaryStream_{false}; bool isConstantStream_{false}; }; @@ -3633,6 +3660,13 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer { flushStreams(streams_, numRows_, *streamArena_, *codec_, out); } + void clear() override { + numRows_ = 0; + for (auto& stream : streams_) { + stream->clear(); + } + } + private: StreamArena* const streamArena_; const std::unique_ptr codec_; diff --git a/velox/serializers/tests/PrestoSerializerTest.cpp b/velox/serializers/tests/PrestoSerializerTest.cpp index 6bb0bbde3c20..954448948e18 100644 --- a/velox/serializers/tests/PrestoSerializerTest.cpp +++ b/velox/serializers/tests/PrestoSerializerTest.cpp @@ -87,16 +87,26 @@ class PrestoSerializerTest std::ostream* output, const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions, std::optional> indexRanges = std::nullopt, - std::optional> rows = std::nullopt) { + std::optional> rows = std::nullopt, + std::unique_ptr* reuseSerializer = nullptr, + std::unique_ptr* reuseArena = nullptr) { auto streamInitialSize = output->tellp(); sanityCheckEstimateSerializedSize(rowVector); - auto arena = std::make_unique(pool_.get()); + std::unique_ptr arena; auto rowType = asRowType(rowVector->type()); auto numRows = rowVector->size(); auto paramOptions = getParamSerdeOptions(serdeOptions); - auto serializer = serde_->createIterativeSerializer( - rowType, numRows, arena.get(), ¶mOptions); + std::unique_ptr serializer; + if (reuseSerializer && *reuseSerializer) { + arena = std::move(*reuseArena); + serializer = std::move(*reuseSerializer); + serializer->clear(); + } else { + arena = std::make_unique(pool_.get()); + serializer = serde_->createIterativeSerializer( + rowType, numRows, arena.get(), ¶mOptions); + } vector_size_t sizeEstimate = 0; Scratch scratch; @@ -134,6 +144,10 @@ class PrestoSerializerTest } else { EXPECT_GE(size, out.tellp() - streamInitialSize); } + if (reuseSerializer) { + *reuseArena = std::move(arena); + *reuseSerializer = std::move(serializer); + } return {static_cast(size), sizeEstimate}; } @@ -218,9 +232,18 @@ class PrestoSerializerTest VectorPtr vector, const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions = nullptr) { + std::unique_ptr reuseSerializer; + std::unique_ptr reuseArena; auto rowVector = makeRowVector({vector}); std::ostringstream out; - serialize(rowVector, &out, serdeOptions); + serialize( + rowVector, + &out, + serdeOptions, + std::nullopt, + std::nullopt, + &reuseSerializer, + &reuseArena); auto rowType = asRowType(rowVector->type()); auto deserialized = deserialize(rowType, out.str(), serdeOptions); @@ -236,7 +259,14 @@ class PrestoSerializerTest std::vector serialized; for (const auto& split : splits) { std::ostringstream out; - serialize(split, &out, serdeOptions); + serialize( + split, + &out, + serdeOptions, + std::nullopt, + std::nullopt, + &reuseSerializer, + &reuseArena); serialized.push_back(out.str()); } @@ -257,12 +287,16 @@ class PrestoSerializerTest makeIndices(rowVector->size() / 2, [&](auto row) { return row * 2; }); auto odd = makeIndices( (rowVector->size() - 1) / 2, [&](auto row) { return (row * 2) + 1; }); - testSerializeRows(rowVector, even, serdeOptions); - auto oddStats = testSerializeRows(rowVector, odd, serdeOptions); + testSerializeRows( + rowVector, even, serdeOptions, &reuseSerializer, &reuseArena); + auto oddStats = testSerializeRows( + rowVector, odd, serdeOptions, &reuseSerializer, &reuseArena); auto wrappedRowVector = wrapChildren(rowVector); - auto wrappedStats = testSerializeRows(wrappedRowVector, odd, serdeOptions); + auto wrappedStats = testSerializeRows( + wrappedRowVector, odd, serdeOptions, &reuseSerializer, &reuseArena); EXPECT_EQ(oddStats.estimatedSize, wrappedStats.estimatedSize); - EXPECT_EQ(oddStats.actualSize, wrappedStats.actualSize); + // The second serialization may come out smaller if encoding is better. + EXPECT_GE(oddStats.actualSize, wrappedStats.actualSize); } void testLexer( @@ -278,12 +312,20 @@ class PrestoSerializerTest SerializeStats testSerializeRows( const RowVectorPtr& rowVector, BufferPtr indices, - const serializer::presto::PrestoVectorSerde::PrestoOptions* - serdeOptions) { + const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions, + std::unique_ptr* reuseSerializer, + std::unique_ptr* reuseArena) { std::ostringstream out; auto rows = folly::Range( indices->as(), indices->size() / sizeof(vector_size_t)); - auto stats = serialize(rowVector, &out, serdeOptions, std::nullopt, rows); + auto stats = serialize( + rowVector, + &out, + serdeOptions, + std::nullopt, + rows, + reuseSerializer, + reuseArena); auto rowType = asRowType(rowVector->type()); auto deserialized = deserialize(rowType, out.str(), serdeOptions); @@ -380,21 +422,16 @@ class PrestoSerializerTest const serializer::presto::PrestoVectorSerde::PrestoOptions* serdeOptions = nullptr) { std::vector pieces; + std::vector reusedPieces; auto rowType = ROW({{"f", vectors[0]->type()}}); auto concatenation = BaseVector::create(rowType, 0, pool_.get()); auto arena = std::make_unique(pool_.get()); auto paramOptions = getParamSerdeOptions(serdeOptions); auto serializer = serde_->createIterativeSerializer( rowType, 10, arena.get(), ¶mOptions); + auto reusedSerializer = serde_->createIterativeSerializer( + rowType, 10, arena.get(), ¶mOptions); - for (const auto& vector : vectors) { - auto data = makeRowVector({"f"}, {vector}); - concatenation->append(data.get()); - std::ostringstream out; - serializeBatch(data, &out, ¶mOptions); - pieces.push_back(out.str()); - serializer->append(data); - } facebook::velox::serializer::presto::PrestoOutputStreamListener listener; std::ostringstream allOut; OStreamOutputStream allOutStream(&allOut, &listener); @@ -404,7 +441,8 @@ class PrestoSerializerTest assertEqualVectors(allDeserialized, concatenation); RowVectorPtr deserialized = BaseVector::create(rowType, 0, pool_.get()); - for (auto& piece : pieces) { + for (auto pieceIdx = 0; pieceIdx < pieces.size(); ++pieceIdx) { + auto piece = pieces[pieceIdx]; auto byteStream = toByteStream(piece); serde_->deserialize( &byteStream, @@ -413,9 +451,42 @@ class PrestoSerializerTest &deserialized, deserialized->size(), ¶mOptions); + + RowVectorPtr single = + BaseVector::create(rowType, 0, pool_.get()); + byteStream = toByteStream(piece); + serde_->deserialize( + &byteStream, pool_.get(), rowType, &single, 0, ¶mOptions); + assertEqualVectors(single->childAt(0), vectors[pieceIdx]); + + RowVectorPtr single2 = + BaseVector::create(rowType, 0, pool_.get()); + byteStream = toByteStream(reusedPieces[pieceIdx]); + serde_->deserialize( + &byteStream, pool_.get(), rowType, &single2, 0, ¶mOptions); + assertEqualVectors(single2->childAt(0), vectors[pieceIdx]); } assertEqualVectors(concatenation, deserialized); } + int64_t randInt(int64_t min, int64_t max) { + return min + folly::Random::rand64(rng_) % (max - min); + } + + void randomOptions(VectorFuzzer::Options& options, bool isFirst) { + options.nullRatio = randInt(1, 10) < 3 ? 0.0 : randInt(1, 10) / 10.0; + options.stringLength = randInt(1, 100); + options.stringVariableLength = true; + options.containerLength = randInt(1, 50); + options.containerVariableLength = true; + options.complexElementsMaxSize = 20000; + options.maxConstantContainerSize = 2; + options.normalizeMapKeys = randInt(0, 20) < 16; + if (isFirst) { + options.timestampPrecision = + static_cast(randInt(0, 3)); + } + options.allowLazyVector = false; + } void serializeBatch( const RowVectorPtr& rowVector, @@ -665,6 +736,7 @@ class PrestoSerializerTest } std::unique_ptr serde_; + folly::Random::DefaultGenerator rng_; }; TEST_P(PrestoSerializerTest, basic) { @@ -1020,7 +1092,7 @@ TEST_P(PrestoSerializerTest, ioBufRoundTrip) { opts.nullRatio = 0.1; VectorFuzzer fuzzer(opts, pool_.get()); - const size_t numRounds = 100; + const size_t numRounds = 200; for (size_t i = 0; i < numRounds; ++i) { auto rowType = fuzzer.randRowType(); @@ -1121,6 +1193,36 @@ TEST_P(PrestoSerializerTest, encodedConcatenation) { for (auto i = 0; i < permutations.size(); ++i) { serializer::presto::PrestoVectorSerde::PrestoOptions opts; opts.nullsFirst = i % 2 == 0; + + testEncodedConcatenation(permutations[i], &opts); + } + } +} + +TEST_P(PrestoSerializerTest, encodedConcatenation2) { + // Slow test, run only for no compression. + if (GetParam() != common::CompressionKind::CompressionKind_NONE) { + return; + } + VectorFuzzer::Options options; + VectorFuzzer fuzzer(options, pool_.get()); + for (auto nthType = 0; nthType < 20; ++nthType) { + auto type = (fuzzer.randRowType()); + + std::vector vectors; + for (auto nth = 0; nth < 3; ++nth) { + randomOptions(options, 0 == nth); + fuzzer.setOptions(options); + vectors.push_back(fuzzer.fuzzInputRow(type)); + } + std::vector> permutations; + std::vector temp; + makePermutations(vectors, 3, temp, permutations); + for (auto i = 0; i < permutations.size(); ++i) { + serializer::presto::PrestoVectorSerde::PrestoOptions opts; + opts.useLosslessTimestamp = true; + opts.nullsFirst = i % 2 == 0; + testEncodedConcatenation(permutations[i], &opts); } } diff --git a/velox/vector/VectorStream.h b/velox/vector/VectorStream.h index e0b57e3fa4b4..ff2ac9b9a5f7 100644 --- a/velox/vector/VectorStream.h +++ b/velox/vector/VectorStream.h @@ -86,6 +86,11 @@ class IterativeVectorSerializer { /// Write serialized data to 'stream'. virtual void flush(OutputStream* stream) = 0; + + /// Resets 'this' to post construction state. + virtual void clear() { + VELOX_UNSUPPORTED("clear"); + } }; /// Serializer that writes a subset of rows from a single RowVector to the @@ -297,6 +302,11 @@ class VectorStreamGroup : public StreamArena { RowVectorPtr* result, const VectorSerde::Options* options = nullptr); + void clear() override { + StreamArena::clear(); + serializer_->clear(); + } + private: std::unique_ptr serializer_; VectorSerde* serde_{nullptr};