Skip to content

Commit

Permalink
Use ArbitratorTestUtil::newQueryCtx in SharedArbitrationTest (faceboo…
Browse files Browse the repository at this point in the history
…kincubator#8664)

Summary:
Use ArbitratorTestUtil::newQueryCtx in SharedArbitrationTest,
add a VectorTestBase::makeRowVector API and use it in
SharedArbitrationTest as well.

Pull Request resolved: facebookincubator#8664

Reviewed By: bikramSingh91

Differential Revision: D54301770

Pulled By: xiaoxmeng

fbshipit-source-id: 15775647c1bfe549cbba77655b3a0b8b5332793b
  • Loading branch information
duanmeng authored and facebook-github-bot committed Feb 28, 2024
1 parent 490aa4c commit 6a68ddd
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 61 deletions.
95 changes: 36 additions & 59 deletions velox/exec/tests/SharedArbitratorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include <gtest/gtest.h>

#include <folly/Singleton.h>
#include <re2/re2.h>
#include <deque>

Expand All @@ -27,24 +26,17 @@
#include "velox/common/base/Exceptions.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/memory/MallocAllocator.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/core/PlanNode.h"
#include "velox/dwio/dwrf/writer/Writer.h"
#include "velox/exec/Driver.h"
#include "velox/exec/HashBuild.h"
#include "velox/exec/HashJoinBridge.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/SharedArbitrator.h"
#include "velox/exec/TableWriter.h"
#include "velox/exec/Values.h"
#include "velox/exec/tests/utils/ArbitratorTestUtil.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/HiveConnectorTestBase.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/vector/fuzzer/VectorFuzzer.h"

DECLARE_bool(velox_memory_leak_check_enabled);
DECLARE_bool(velox_suppress_memory_capacity_exceeding_error_message);
Expand Down Expand Up @@ -257,8 +249,7 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {
fuzzerOpts_.stringVariableLength = false;
fuzzerOpts_.stringLength = 1024;
fuzzerOpts_.allowLazyVector = false;
VectorFuzzer fuzzer(fuzzerOpts_, pool());
vector_ = newVector();
vector_ = makeRowVector(rowType_, fuzzerOpts_);
executor_ = std::make_unique<folly::CPUThreadPoolExecutor>(32);
numAddedPools_ = 0;
}
Expand Down Expand Up @@ -289,58 +280,35 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase {
numAddedPools_ = 0;
}

RowVectorPtr newVector() {
VectorFuzzer fuzzer(fuzzerOpts_, pool());
return fuzzer.fuzzRow(rowType_);
}

std::shared_ptr<core::QueryCtx> newQueryCtx(
int64_t memoryCapacity = kMaxMemory,
std::unique_ptr<MemoryReclaimer>&& reclaimer = nullptr) {
std::unordered_map<std::string, std::shared_ptr<Config>> configs;
std::shared_ptr<MemoryPool> pool = memoryManager_->addRootPool(
"",
memoryCapacity,
reclaimer != nullptr ? std::move(reclaimer)
: MemoryReclaimer::create());
auto queryCtx = std::make_shared<core::QueryCtx>(
executor_.get(),
core::QueryConfig({}),
configs,
cache::AsyncDataCache::getInstance(),
std::move(pool));
++numAddedPools_;
return queryCtx;
}

static inline FakeMemoryOperatorFactory* fakeOperatorFactory_;
std::unique_ptr<MemoryManager> memoryManager_;
SharedArbitrator* arbitrator_;
RowTypePtr rowType_;
VectorFuzzer::Options fuzzerOpts_;
RowVectorPtr vector_;
std::unique_ptr<folly::CPUThreadPoolExecutor> executor_;
std::atomic_uint64_t numAddedPools_{0};
};

DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
vectors.push_back(newVector());
vectors.push_back(makeRowVector(rowType_, fuzzerOpts_));
}
createDuckDbTable(vectors);
std::vector<bool> sameQueries = {false, true};
for (bool sameQuery : sameQueries) {
SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery));
const auto oldStats = arbitrator_->stats();
std::shared_ptr<core::QueryCtx> fakeMemoryQueryCtx =
newQueryCtx(kMemoryCapacity);
newQueryCtx(memoryManager_, executor_, kMemoryCapacity);
++numAddedPools_;
std::shared_ptr<core::QueryCtx> orderByQueryCtx;
if (sameQuery) {
orderByQueryCtx = fakeMemoryQueryCtx;
} else {
orderByQueryCtx = newQueryCtx(kMemoryCapacity);
orderByQueryCtx = newQueryCtx(memoryManager_, executor_, kMemoryCapacity);
++numAddedPools_;
}

folly::EventCount orderByWait;
Expand Down Expand Up @@ -418,20 +386,23 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
vectors.push_back(newVector());
vectors.push_back(makeRowVector(rowType_, fuzzerOpts_));
}
createDuckDbTable(vectors);
std::vector<bool> sameQueries = {false, true};
for (bool sameQuery : sameQueries) {
SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery));
const auto oldStats = arbitrator_->stats();
std::shared_ptr<core::QueryCtx> fakeMemoryQueryCtx =
newQueryCtx(kMemoryCapacity);
newQueryCtx(memoryManager_, executor_, kMemoryCapacity);
++numAddedPools_;
std::shared_ptr<core::QueryCtx> aggregationQueryCtx;
if (sameQuery) {
aggregationQueryCtx = fakeMemoryQueryCtx;
} else {
aggregationQueryCtx = newQueryCtx(kMemoryCapacity);
aggregationQueryCtx =
newQueryCtx(memoryManager_, executor_, kMemoryCapacity);
++numAddedPools_;
}

folly::EventCount aggregationWait;
Expand Down Expand Up @@ -511,20 +482,22 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) {
const int numVectors = 32;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
vectors.push_back(newVector());
vectors.push_back(makeRowVector(rowType_, fuzzerOpts_));
}
createDuckDbTable(vectors);
std::vector<bool> sameQueries = {false, true};
for (bool sameQuery : sameQueries) {
SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery));
const auto oldStats = arbitrator_->stats();
std::shared_ptr<core::QueryCtx> fakeMemoryQueryCtx =
newQueryCtx(kMemoryCapacity);
newQueryCtx(memoryManager_, executor_, kMemoryCapacity);
++numAddedPools_;
std::shared_ptr<core::QueryCtx> joinQueryCtx;
if (sameQuery) {
joinQueryCtx = fakeMemoryQueryCtx;
} else {
joinQueryCtx = newQueryCtx(kMemoryCapacity);
joinQueryCtx = newQueryCtx(memoryManager_, executor_, kMemoryCapacity);
++numAddedPools_;
}

folly::EventCount joinWait;
Expand Down Expand Up @@ -617,7 +590,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, driverInitTriggeredArbitration) {
const int vectorSize = 100;
fuzzerOpts_.vectorSize = vectorSize;
for (int i = 0; i < numVectors; ++i) {
vectors.push_back(newVector());
vectors.push_back(makeRowVector(rowType_, fuzzerOpts_));
}
const int expectedResultVectorSize = numVectors * vectorSize;
const auto expectedVector = makeRowVector(
Expand All @@ -629,7 +602,8 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, driverInitTriggeredArbitration) {

createDuckDbTable(vectors);
setupMemory(kMemoryCapacity, 0);
std::shared_ptr<core::QueryCtx> queryCtx = newQueryCtx(kMemoryCapacity);
std::shared_ptr<core::QueryCtx> queryCtx =
newQueryCtx(memoryManager_, executor_, kMemoryCapacity);
ASSERT_EQ(queryCtx->pool()->capacity(), 0);
ASSERT_EQ(queryCtx->pool()->maxCapacity(), kMemoryCapacity);

Expand All @@ -653,11 +627,12 @@ DEBUG_ONLY_TEST_F(
const int numVectors = 10;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
vectors.push_back(newVector());
vectors.push_back(makeRowVector(rowType_, fuzzerOpts_));
}
createDuckDbTable(vectors);

std::shared_ptr<core::QueryCtx> queryCtx = newQueryCtx(kMemoryCapacity);
std::shared_ptr<core::QueryCtx> queryCtx =
newQueryCtx(memoryManager_, executor_, kMemoryCapacity);
ASSERT_EQ(queryCtx->pool()->capacity(), 0);

// Allocate a large chunk of memory to trigger memory reclaim during the query
Expand Down Expand Up @@ -747,15 +722,15 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenMaybeReserveAndTaskAbort) {
const int numVectors = 10;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
vectors.push_back(newVector());
vectors.push_back(makeRowVector(rowType_, fuzzerOpts_));
}
createDuckDbTable(vectors);

auto queryCtx = newQueryCtx(kMemoryCapacity);
auto queryCtx = newQueryCtx(memoryManager_, executor_, kMemoryCapacity);
ASSERT_EQ(queryCtx->pool()->capacity(), 0);

// Create a fake query to hold some memory to trigger memory arbitration.
auto fakeQueryCtx = newQueryCtx(kMemoryCapacity);
auto fakeQueryCtx = newQueryCtx(memoryManager_, executor_, kMemoryCapacity);
auto fakeLeafPool = fakeQueryCtx->pool()->addLeafChild(
"fakeLeaf", true, FakeMemoryReclaimer::create());
TestAllocation fakeAllocation{
Expand Down Expand Up @@ -813,10 +788,11 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, asyncArbitratonFromNonDriverContext) {
const int numVectors = 10;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
vectors.push_back(newVector());
vectors.push_back(makeRowVector(rowType_, fuzzerOpts_));
}
createDuckDbTable(vectors);
std::shared_ptr<core::QueryCtx> queryCtx = newQueryCtx(kMemoryCapacity);
std::shared_ptr<core::QueryCtx> queryCtx =
newQueryCtx(memoryManager_, executor_, kMemoryCapacity);
ASSERT_EQ(queryCtx->pool()->capacity(), 0);

folly::EventCount aggregationAllocationWait;
Expand Down Expand Up @@ -914,7 +890,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, runtimeStats) {

const auto spillDirectory = exec::test::TempDirectoryPath::create();
const auto outputDirectory = TempDirectoryPath::create();
const auto queryCtx = newQueryCtx(memoryCapacity);
const auto queryCtx = newQueryCtx(memoryManager_, executor_, memoryCapacity);
auto writerPlan =
PlanBuilder()
.values(vectors)
Expand Down Expand Up @@ -965,13 +941,14 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, arbitrateMemoryFromOtherOperator) {
const int numVectors = 10;
std::vector<RowVectorPtr> vectors;
for (int i = 0; i < numVectors; ++i) {
vectors.push_back(newVector());
vectors.push_back(makeRowVector(rowType_, fuzzerOpts_));
}
createDuckDbTable(vectors);

for (bool sameDriver : {false, true}) {
SCOPED_TRACE(fmt::format("sameDriver {}", sameDriver));
std::shared_ptr<core::QueryCtx> queryCtx = newQueryCtx(kMemoryCapacity);
std::shared_ptr<core::QueryCtx> queryCtx =
newQueryCtx(memoryManager_, executor_, kMemoryCapacity);
ASSERT_EQ(queryCtx->pool()->capacity(), 0);

std::atomic<bool> injectAllocationOnce{true};
Expand Down Expand Up @@ -1064,7 +1041,7 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) {
fuzzerOpts_.stringLength = 32;
vectors.reserve(numVectors);
for (int i = 0; i < numVectors; ++i) {
vectors.push_back(newVector());
vectors.push_back(makeRowVector(rowType_, fuzzerOpts_));
}
const int numDrivers = 4;
const auto expectedWriteResult =
Expand Down Expand Up @@ -1111,7 +1088,7 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) {
queryThreads.emplace_back([&, i]() {
std::shared_ptr<Task> task;
try {
auto queryCtx = newQueryCtx(queryCapacity);
auto queryCtx = newQueryCtx(memoryManager_, executor_, queryCapacity);
if (i == 0) {
// Write task contains aggregate node, which does not support
// multithread aggregation type resolver, so make sure it is built
Expand Down Expand Up @@ -1205,7 +1182,7 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) {
{
std::lock_guard<std::mutex> l(mutex);
auto oldNum = arbitrator_->stats().numReserves;
queries.emplace_back(newQueryCtx());
queries.emplace_back(newQueryCtx(memoryManager_, executor_));
ASSERT_EQ(arbitrator_->stats().numReserves, oldNum + 1);
}
});
Expand Down
2 changes: 0 additions & 2 deletions velox/exec/tests/utils/ArbitratorTestUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/

#include "velox/exec/tests/utils/ArbitratorTestUtil.h"
#include "velox/common/memory/Memory.h"
#include "velox/core/QueryCtx.h"
#include "velox/exec/TableWriter.h"

using namespace facebook::velox;
Expand Down
7 changes: 7 additions & 0 deletions velox/vector/tests/utils/VectorTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,13 @@ class VectorTestBase {
return vectorMaker_.rowVector(rowType, size);
}

RowVectorPtr makeRowVector(
const RowTypePtr& type,
const VectorFuzzer::Options& fuzzerOpts) {
VectorFuzzer fuzzer(fuzzerOpts, pool());
return fuzzer.fuzzRow(type);
}

std::vector<RowVectorPtr> createVectors(
const RowTypePtr& type,
uint64_t byteSize,
Expand Down

0 comments on commit 6a68ddd

Please sign in to comment.