Skip to content

Commit

Permalink
Fix the concurrency issue in duplicate row allocations (facebookincub…
Browse files Browse the repository at this point in the history
…ator#10865)

Summary: Pull Request resolved: facebookincubator#10865

Differential Revision: D61881526

Pulled By: xiaoxmeng
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Sep 7, 2024
1 parent 24dd2e9 commit e66cd37
Show file tree
Hide file tree
Showing 11 changed files with 257 additions and 82 deletions.
13 changes: 6 additions & 7 deletions velox/common/memory/AllocationPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
#include "velox/common/memory/Memory.h"

namespace facebook::velox::memory {
// A set of Allocations holding the fixed width payload
// rows. The Runs are filled to the end except for the last one. This
// is used for iterating over the payload for rehashing, returning
// results etc. This is used via HashStringAllocator for variable length
// allocation for backing ByteStreams for complex objects. In that case, there
// is a current run that is appended to and when this is exhausted a new run is
// started.
/// A set of Allocations holding the fixed width payload ows. The Runs are
/// filled to the end except for the last one. This is used for iterating over
/// the payload for rehashing, returning results etc. This is used via
/// HashStringAllocator for variable length allocation for backing ByteStreams
/// for complex objects. In that case, there is a current run that is appended
/// to and when this is exhausted a new run is started.
class AllocationPool {
public:
static constexpr int32_t kMinPages = 16;
Expand Down
9 changes: 8 additions & 1 deletion velox/common/memory/HashStringAllocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,11 @@ int32_t HashStringAllocator::freeListIndex(int size) {
}

void HashStringAllocator::removeFromFreeList(Header* header) {
VELOX_CHECK(header->isFree());
//LOG(ERROR) << "header " << header << " " << this;
if (!header->isFree()) {
LOG(ERROR) << "bad header " << header;
VELOX_FAIL("header");
}
header->clearFree();
const auto index = freeListIndex(header->size());
reinterpret_cast<CompactDoubleList*>(header->begin())->remove();
Expand Down Expand Up @@ -448,6 +452,9 @@ HashStringAllocator::Header* HashStringAllocator::allocateFromFreeList(
return nullptr;
}
auto* found = headerOf(item);
if (!(found->isFree() && (!mustHaveSize || found->size() >= preferredSize))) {
LOG(ERROR) << "bad found " << found << " free " << found->isFree() << " size " << found->size() << " preferredSize " << preferredSize << " mustHaveSize " << mustHaveSize;
}
VELOX_CHECK(
found->isFree() && (!mustHaveSize || found->size() >= preferredSize));
--state_.numFree();
Expand Down
176 changes: 147 additions & 29 deletions velox/common/memory/HashStringAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ class HashStringAllocator : public StreamArena {
// header is below the first byte of the StringView's data. StringViews
// written by this are to be read with contiguousString(). This is nearly
// always zero copy but will accommodate the odd extra large string.
void copyMultipart(const StringView& str, char* group, int32_t offset) {
virtual void
copyMultipart(const StringView& str, char* group, int32_t offset) {
if (str.isInline()) {
*reinterpret_cast<StringView*>(group + offset) = str;
return;
Expand All @@ -201,7 +202,7 @@ class HashStringAllocator : public StreamArena {

/// Allocates 'size' contiguous bytes preceded by a Header. Returns the
/// address of Header.
Header* allocate(int32_t size) {
virtual Header* allocate(int32_t size) {
VELOX_CHECK_NULL(
state_.currentHeader(),
"Do not call allocate() when a write is in progress");
Expand All @@ -211,11 +212,11 @@ class HashStringAllocator : public StreamArena {
/// Allocates a block that is independently freeable but is freed on
/// destruction of 'this'. The block has no header and must be freed by
/// freeToPool() if to be freed before destruction of 'this'.
void* allocateFromPool(size_t size);
virtual void* allocateFromPool(size_t size);

/// Frees a block allocated with allocateFromPool(). The pointer and size must
/// match.
void freeToPool(void* ptr, size_t size);
virtual void freeToPool(void* ptr, size_t size);

/// Returns the header immediately below 'data'.
static Header* headerOf(const void* data) {
Expand All @@ -228,11 +229,6 @@ class HashStringAllocator : public StreamArena {
const_cast<char*>(reinterpret_cast<const char*>(data)));
}

/// Returns the byte size of block pointed by 'header'.
inline size_t blockBytes(const Header* header) const {
return header->size() + kHeaderSize;
}

/// Returns ByteInputStream over the data in the range of 'header' and
/// possible continuation ranges.
/// @param maxBytes If provided, the returned stream will cover at most that
Expand All @@ -256,27 +252,27 @@ class HashStringAllocator : public StreamArena {
/// without allocating more space. 'position' can be changed but will
/// logically point at the same data. Data to the right of 'position is not
/// preserved.
void ensureAvailable(int32_t bytes, Position& position);
virtual void ensureAvailable(int32_t bytes, Position& position);

/// Sets stream to write to this pool. The write can span multiple
/// non-contiguous runs. Each contiguous run will have at least kMinContiguous
/// bytes of contiguous space. finishWrite finalizes the allocation
/// information after the write is done. Returns the position at the start of
/// the allocated block.
Position newWrite(
virtual Position newWrite(
ByteOutputStream& stream,
int32_t preferredSize = kMinContiguous);

// Sets 'stream' to write starting at 'position'. If new ranges have to
// be allocated when writing, headers will be updated accordingly.
void extendWrite(Position position, ByteOutputStream& stream);
virtual void extendWrite(Position position, ByteOutputStream& stream);

/// Completes a write prepared with newWrite or extendWrite. Up to
/// 'numReserveBytes' unused bytes, if available, are left after the end of
/// the write to accommodate another write. Returns a pair of positions: (1)
/// position at the start of this 'write', (2) position immediately after the
/// last written byte.
std::pair<Position, Position> finishWrite(
virtual std::pair<Position, Position> finishWrite(
ByteOutputStream& stream,
int32_t numReserveBytes);

Expand All @@ -291,30 +287,26 @@ class HashStringAllocator : public StreamArena {
/// ranges will not overwrite the next pointer.
///
/// May allocate less than 'bytes'.
void newRange(int32_t bytes, ByteRange* lastRange, ByteRange* range) override;
virtual void newRange(int32_t bytes, ByteRange* lastRange, ByteRange* range)
override;

/// Allocates a new range of at least 'bytes' size.
void newContiguousRange(int32_t bytes, ByteRange* range);

void newTinyRange(int32_t bytes, ByteRange* lastRange, ByteRange* range)
override {
newRange(bytes, lastRange, range);
}
virtual void newContiguousRange(int32_t bytes, ByteRange* range);

/// Returns the total memory footprint of 'this'.
int64_t retainedSize() const {
virtual int64_t retainedSize() const {
return state_.pool().allocatedBytes() + state_.sizeFromPool();
}

/// Adds the allocation of 'header' and any extensions (if header has
/// kContinued set) to the free list.
void free(Header* header);
virtual void free(Header* header);

/// Returns a lower bound on bytes available without growing 'this'. This is
/// the sum of free block sizes minus size of pointer for each. We subtract
/// the pointer because in the worst case we would have one allocation that
/// chains many small free blocks together via kContinued.
uint64_t freeSpace() const {
virtual uint64_t freeSpace() const {
const int64_t minFree = state_.freeBytes() -
state_.numFree() * (kHeaderSize + Header::kContinuedPtrSize);
VELOX_CHECK_GE(minFree, 0, "Guaranteed free space cannot be negative");
Expand All @@ -328,26 +320,26 @@ class HashStringAllocator : public StreamArena {
return state_.pool().pool();
}

uint64_t currentBytes() const {
virtual uint64_t currentBytes() const {
return state_.currentBytes();
}

/// Checks the free space accounting and consistency of Headers. Throws when
/// detects corruption. Returns the number of allocated payload bytes,
/// excluding headers, continue links and other overhead.
int64_t checkConsistency() const;
virtual int64_t checkConsistency() const;

/// Returns 'true' if this is empty. The implementation includes a call to
/// checkConsistency() which makes it slow. Do not use in hot paths.
bool isEmpty() const;
virtual bool isEmpty() const;

/// Throws if 'this' is not empty. Checks consistency of
/// 'this'. This is a fast check for RowContainer users freeing the
/// variable length data they store. Can be used in non-debug
/// builds.
void checkEmpty() const;
virtual void checkEmpty() const;

std::string toString() const;
virtual std::string toString() const;

/// Effectively makes this immutable while executing f, any attempt to access
/// state_ in a mutable way while f is executing will cause an exception to be
Expand All @@ -363,12 +355,18 @@ class HashStringAllocator : public StreamArena {
f();
}

private:
protected:
static constexpr int32_t kUnitSize = 16 * memory::AllocationTraits::kPageSize;
static constexpr int32_t kMinContiguous = 48;
static constexpr int32_t kNumFreeLists = kMaxAlloc - kMinAlloc + 2;
static constexpr uint32_t kHeaderSize = sizeof(Header);

private:
// Returns the byte size of block pointed by 'header'.
inline size_t blockBytes(const Header* header) const {
return header->size() + kHeaderSize;
}

void newRange(
int32_t bytes,
ByteRange* lastRange,
Expand Down Expand Up @@ -603,6 +601,126 @@ struct StlAllocator {
HashStringAllocator* allocator_;
};

class ThreadSafeHashStringAllocator : public HashStringAllocator {
public:
explicit ThreadSafeHashStringAllocator(HashStringAllocator* allocator)
: HashStringAllocator(allocator->pool()), allocator_(allocator) {
VELOX_CHECK_NOT_NULL(allocator_);
}

void copyMultipart(const StringView& str, char* group, int32_t offset)
override {
std::lock_guard<std::mutex> l(mu_);
return allocator_->copyMultipart(str, group, offset);
}

Header* allocate(int32_t size) override {
std::lock_guard<std::mutex> l(mu_);
auto* head = allocator_->allocate(size);
//LOG(ERROR) << "allocate " << head << " " << this;
return head;
// return allocator_->allocate(size);
}

void* allocateFromPool(size_t size) override {
void* ptr;
{
std::lock_guard<std::mutex> l(mu_);
ptr = allocator_->allocateFromPool(size);
}
//LOG(ERROR) << "allocateFromPool " << ptr << " size " << size << " " << this;
return ptr;
}

void freeToPool(void* ptr, size_t size) override {
//LOG(ERROR) << "freeToPool " << ptr << " size " << size << " " << this;
{
std::lock_guard<std::mutex> l(mu_);
allocator_->freeToPool(ptr, size);
}
}

void ensureAvailable(int32_t bytes, Position& position) override {
std::lock_guard<std::mutex> l(mu_);
allocator_->ensureAvailable(bytes, position);
}

Position newWrite(
ByteOutputStream& stream,
int32_t preferredSize = kMinContiguous) {
std::lock_guard<std::mutex> l(mu_);
return allocator_->newWrite(stream, preferredSize);
}

void extendWrite(Position position, ByteOutputStream& stream) override {
std::lock_guard<std::mutex> l(mu_);
allocator_->extendWrite(position, stream);
}

std::pair<Position, Position> finishWrite(
ByteOutputStream& stream,
int32_t numReserveBytes) override {
std::lock_guard<std::mutex> l(mu_);
return allocator_->finishWrite(stream, numReserveBytes);
}

void newRange(int32_t bytes, ByteRange* lastRange, ByteRange* range)
override {
std::lock_guard<std::mutex> l(mu_);
allocator_->newRange(bytes, lastRange, range);
}

void newContiguousRange(int32_t bytes, ByteRange* range) override {
std::lock_guard<std::mutex> l(mu_);
allocator_->newContiguousRange(bytes, range);
}

void free(Header* header) override {
std::lock_guard<std::mutex> l(mu_);
//LOG(ERROR) << "free " << header << " " << this;
allocator_->free(header);
}

int64_t retainedSize() const override {
std::lock_guard<std::mutex> l(mu_);
return allocator_->retainedSize();
}

uint64_t freeSpace() const override {
std::lock_guard<std::mutex> l(mu_);
return allocator_->freeSpace();
}

void clear() override {
std::lock_guard<std::mutex> l(mu_);
allocator_->clear();
}

uint64_t currentBytes() const override {
std::lock_guard<std::mutex> l(mu_);
return allocator_->currentBytes();
}

int64_t checkConsistency() const override {
std::lock_guard<std::mutex> l(mu_);
return allocator_->checkConsistency();
}

void checkEmpty() const override {
std::lock_guard<std::mutex> l(mu_);
allocator_->checkEmpty();
}

std::string toString() const override {
std::lock_guard<std::mutex> l(mu_);
return allocator_->toString();
}

private:
HashStringAllocator* const allocator_;
mutable std::mutex mu_;
};

/// An allocator backed by HashStringAllocator that guaratees a configurable
/// alignment. The alignment must be a power of 2 and not be 0. This allocator
/// can be used with folly F14 containers that requires 16-bytes alignment.
Expand Down
2 changes: 1 addition & 1 deletion velox/common/memory/MemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ MemoryPoolImpl::~MemoryPoolImpl() {
kMetricMemoryPoolReservationLeakBytes, minReservationBytes_);
}
}
VELOX_DCHECK(
VELOX_CHECK(
(usedReservationBytes_ == 0) && (reservationBytes_ == 0) &&
(minReservationBytes_ == 0),
"Bad memory usage track state: {}",
Expand Down
2 changes: 2 additions & 0 deletions velox/common/memory/tests/HashStringAllocatorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ TEST_F(HashStringAllocatorTest, headerToString) {
}

TEST_F(HashStringAllocatorTest, allocate) {
auto* buffer = allocator_->allocate(8192);
return;
for (auto count = 0; count < 3; ++count) {
std::vector<HSA::Header*> headers;
for (auto i = 0; i < 10'000; ++i) {
Expand Down
Loading

0 comments on commit e66cd37

Please sign in to comment.