Skip to content

Commit

Permalink
Revert "Revert "Reuse Serializer (facebookincubator#8849)""
Browse files Browse the repository at this point in the history
This reverts commit 8f68446.
  • Loading branch information
PHILO-HE committed Mar 7, 2024
1 parent 8f68446 commit d40b4b5
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 34 deletions.
9 changes: 9 additions & 0 deletions velox/common/memory/ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,16 @@ class ByteOutputStream {
return ranges_;
}

/// Prepares 'this' for writing. Can be called several times,
/// e.g. PrestoSerializer resets these. The memory formerly backing
/// 'ranges_' is not owned and the caller needs to recycle or free
/// this independently.
void startWrite(int32_t initialSize) {
ranges_.clear();
isReversed_ = false;
allocatedBytes_ = 0;
current_ = nullptr;
lastRangeEnd_ = 0;
extend(initialSize);
}

Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/HashStringAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class HashStringAllocator : public StreamArena {
}

// Frees all memory associated with 'this' and leaves 'this' ready for reuse.
void clear();
void clear() override;

memory::MemoryPool* FOLLY_NONNULL pool() const {
return pool_.pool();
Expand Down
10 changes: 10 additions & 0 deletions velox/common/memory/StreamArena.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,14 @@ void StreamArena::newTinyRange(
range->buffer = reinterpret_cast<uint8_t*>(tinyRanges_.back().data());
range->size = bytes;
}
void StreamArena::clear() {
allocations_.clear();
pool_->freeNonContiguous(allocation_);
currentRun_ = 0;
currentOffset_ = 0;
largeAllocations_.clear();
size_ = 0;
tinyRanges_.clear();
}

} // namespace facebook::velox
4 changes: 4 additions & 0 deletions velox/common/memory/StreamArena.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class StreamArena {
return pool_;
}

/// Restores 'this' to post-construction state. Used in recycling streams for
/// serilizers.
virtual void clear();

private:
memory::MemoryPool* const pool_;
const memory::MachinePageCount allocationQuantum_{2};
Expand Down
12 changes: 12 additions & 0 deletions velox/common/memory/tests/ByteStreamTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,15 @@ TEST_F(ByteStreamTest, nextViewNegativeSize) {
ByteInputStream byteStream({ByteRange{buffer, kBufferSize, 0}});
EXPECT_THROW(byteStream.nextView(-100), VeloxRuntimeError);
}

TEST_F(ByteStreamTest, reuse) {
auto arena = newArena();
ByteOutputStream stream(arena.get());
char bytes[10000] = {};
for (auto i = 0; i < 10; ++i) {
arena->clear();
stream.startWrite(i * 100);
stream.appendStringView(std::string_view(bytes, sizeof(bytes)));
EXPECT_EQ(sizeof(bytes), stream.size());
}
}
4 changes: 2 additions & 2 deletions velox/exec/PartitionedOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ BlockingReason Destination::flush(
OutputBufferManager& bufferManager,
const std::function<void()>& bufferReleaseFn,
ContinueFuture* future) {
if (!current_) {
if (!current_ || rowsInCurrent_ == 0) {
return BlockingReason::kNotBlocked;
}

Expand All @@ -87,7 +87,7 @@ BlockingReason Destination::flush(
const int64_t flushedRows = rowsInCurrent_;

current_->flush(&stream);
current_.reset();
current_->clear();

const int64_t flushedBytes = stream.tellp();

Expand Down
50 changes: 42 additions & 8 deletions velox/serializers/PrestoSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1351,13 +1351,15 @@ class VectorStream {
int32_t initialNumRows,
const SerdeOpts& opts)
: type_(type),
encoding_(getEncoding(encoding, vector)),
opts_(opts),
streamArena_(streamArena),
useLosslessTimestamp_(opts.useLosslessTimestamp),
nullsFirst_(opts.nullsFirst),
isLongDecimal_(type_->isLongDecimal()),
opts_(opts),
encoding_(getEncoding(encoding, vector)),
nulls_(streamArena, true, true),
lengths_(streamArena),
values_(streamArena),
isLongDecimal_(type_->isLongDecimal()) {
values_(streamArena) {
if (initialNumRows == 0) {
initializeHeader(typeToEncodingName(type), *streamArena);
return;
Expand Down Expand Up @@ -1680,6 +1682,27 @@ class VectorStream {
return isLongDecimal_;
}

void clear() {
encoding_ = std::nullopt;
initializeHeader(typeToEncodingName(type_), *streamArena_);
nonNullCount_ = 0;
nullCount_ = 0;
totalLength_ = 0;
if (hasLengths_) {
lengths_.startWrite(lengths_.size());
if (type_->kind() == TypeKind::ROW || type_->kind() == TypeKind::ARRAY ||
type_->kind() == TypeKind::MAP) {
// A complex type has a 0 as first length.
lengths_.appendOne<int32_t>(0);
}
}
nulls_.startWrite(nulls_.size());
values_.startWrite(values_.size());
for (auto& child : children_) {
child->clear();
}
}

private:
void initializeFlatStream(
std::optional<VectorPtr> vector,
Expand Down Expand Up @@ -1727,10 +1750,15 @@ class VectorStream {
}

const TypePtr type_;
std::optional<VectorEncoding::Simple> encoding_;
StreamArena* const streamArena_;
/// Indicates whether to serialize timestamps with nanosecond precision.
/// If false, they are serialized with millisecond precision which is
/// compatible with presto.
const bool useLosslessTimestamp_;
const bool nullsFirst_;
const bool isLongDecimal_;
const SerdeOpts opts_;

StreamArena* streamArena_;
std::optional<VectorEncoding::Simple> encoding_;
int32_t nonNullCount_{0};
int32_t nullCount_{0};
int32_t totalLength_{0};
Expand All @@ -1740,7 +1768,6 @@ class VectorStream {
ByteOutputStream lengths_;
ByteOutputStream values_;
std::vector<std::unique_ptr<VectorStream>> children_;
const bool isLongDecimal_;
bool isDictionaryStream_{false};
bool isConstantStream_{false};
};
Expand Down Expand Up @@ -3633,6 +3660,13 @@ class PrestoIterativeVectorSerializer : public IterativeVectorSerializer {
flushStreams(streams_, numRows_, *streamArena_, *codec_, out);
}

void clear() override {
numRows_ = 0;
for (auto& stream : streams_) {
stream->clear();
}
}

private:
StreamArena* const streamArena_;
const std::unique_ptr<folly::io::Codec> codec_;
Expand Down
Loading

0 comments on commit d40b4b5

Please sign in to comment.