From 6a68ddd9254dfe4bcfd09e38e46a3ce4f6f5508f Mon Sep 17 00:00:00 2001 From: duanmeng Date: Wed, 28 Feb 2024 11:32:04 -0800 Subject: [PATCH] Use ArbitratorTestUtil::newQueryCtx in SharedArbitrationTest (#8664) Summary: Use ArbitratorTestUtil::newQueryCtx in SharedArbitrationTest, add a VectorTestBase::makeRowVector API and use it in SharedArbitrationTest as well. Pull Request resolved: https://github.com/facebookincubator/velox/pull/8664 Reviewed By: bikramSingh91 Differential Revision: D54301770 Pulled By: xiaoxmeng fbshipit-source-id: 15775647c1bfe549cbba77655b3a0b8b5332793b --- velox/exec/tests/SharedArbitratorTest.cpp | 95 +++++++------------ velox/exec/tests/utils/ArbitratorTestUtil.cpp | 2 - velox/vector/tests/utils/VectorTestBase.h | 7 ++ 3 files changed, 43 insertions(+), 61 deletions(-) diff --git a/velox/exec/tests/SharedArbitratorTest.cpp b/velox/exec/tests/SharedArbitratorTest.cpp index 51e6f85d9f16..3dd632f48dd8 100644 --- a/velox/exec/tests/SharedArbitratorTest.cpp +++ b/velox/exec/tests/SharedArbitratorTest.cpp @@ -16,7 +16,6 @@ #include -#include #include #include @@ -27,24 +26,17 @@ #include "velox/common/base/Exceptions.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/memory/MallocAllocator.h" -#include "velox/common/memory/Memory.h" #include "velox/common/testutil/TestValue.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/core/PlanNode.h" #include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/Driver.h" #include "velox/exec/HashBuild.h" -#include "velox/exec/HashJoinBridge.h" -#include "velox/exec/PlanNodeStats.h" #include "velox/exec/SharedArbitrator.h" #include "velox/exec/TableWriter.h" #include "velox/exec/Values.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" -#include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" -#include "velox/exec/tests/utils/PlanBuilder.h" -#include "velox/exec/tests/utils/TempDirectoryPath.h" -#include "velox/vector/fuzzer/VectorFuzzer.h" DECLARE_bool(velox_memory_leak_check_enabled); DECLARE_bool(velox_suppress_memory_capacity_exceeding_error_message); @@ -257,8 +249,7 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { fuzzerOpts_.stringVariableLength = false; fuzzerOpts_.stringLength = 1024; fuzzerOpts_.allowLazyVector = false; - VectorFuzzer fuzzer(fuzzerOpts_, pool()); - vector_ = newVector(); + vector_ = makeRowVector(rowType_, fuzzerOpts_); executor_ = std::make_unique(32); numAddedPools_ = 0; } @@ -289,37 +280,12 @@ class SharedArbitrationTest : public exec::test::HiveConnectorTestBase { numAddedPools_ = 0; } - RowVectorPtr newVector() { - VectorFuzzer fuzzer(fuzzerOpts_, pool()); - return fuzzer.fuzzRow(rowType_); - } - - std::shared_ptr newQueryCtx( - int64_t memoryCapacity = kMaxMemory, - std::unique_ptr&& reclaimer = nullptr) { - std::unordered_map> configs; - std::shared_ptr pool = memoryManager_->addRootPool( - "", - memoryCapacity, - reclaimer != nullptr ? std::move(reclaimer) - : MemoryReclaimer::create()); - auto queryCtx = std::make_shared( - executor_.get(), - core::QueryConfig({}), - configs, - cache::AsyncDataCache::getInstance(), - std::move(pool)); - ++numAddedPools_; - return queryCtx; - } - static inline FakeMemoryOperatorFactory* fakeOperatorFactory_; std::unique_ptr memoryManager_; SharedArbitrator* arbitrator_; RowTypePtr rowType_; VectorFuzzer::Options fuzzerOpts_; RowVectorPtr vector_; - std::unique_ptr executor_; std::atomic_uint64_t numAddedPools_{0}; }; @@ -327,7 +293,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { const int numVectors = 32; std::vector vectors; for (int i = 0; i < numVectors; ++i) { - vectors.push_back(newVector()); + vectors.push_back(makeRowVector(rowType_, fuzzerOpts_)); } createDuckDbTable(vectors); std::vector sameQueries = {false, true}; @@ -335,12 +301,14 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToOrderBy) { SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery)); const auto oldStats = arbitrator_->stats(); std::shared_ptr fakeMemoryQueryCtx = - newQueryCtx(kMemoryCapacity); + newQueryCtx(memoryManager_, executor_, kMemoryCapacity); + ++numAddedPools_; std::shared_ptr orderByQueryCtx; if (sameQuery) { orderByQueryCtx = fakeMemoryQueryCtx; } else { - orderByQueryCtx = newQueryCtx(kMemoryCapacity); + orderByQueryCtx = newQueryCtx(memoryManager_, executor_, kMemoryCapacity); + ++numAddedPools_; } folly::EventCount orderByWait; @@ -418,7 +386,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { const int numVectors = 32; std::vector vectors; for (int i = 0; i < numVectors; ++i) { - vectors.push_back(newVector()); + vectors.push_back(makeRowVector(rowType_, fuzzerOpts_)); } createDuckDbTable(vectors); std::vector sameQueries = {false, true}; @@ -426,12 +394,15 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToAggregation) { SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery)); const auto oldStats = arbitrator_->stats(); std::shared_ptr fakeMemoryQueryCtx = - newQueryCtx(kMemoryCapacity); + newQueryCtx(memoryManager_, executor_, kMemoryCapacity); + ++numAddedPools_; std::shared_ptr aggregationQueryCtx; if (sameQuery) { aggregationQueryCtx = fakeMemoryQueryCtx; } else { - aggregationQueryCtx = newQueryCtx(kMemoryCapacity); + aggregationQueryCtx = + newQueryCtx(memoryManager_, executor_, kMemoryCapacity); + ++numAddedPools_; } folly::EventCount aggregationWait; @@ -511,7 +482,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) { const int numVectors = 32; std::vector vectors; for (int i = 0; i < numVectors; ++i) { - vectors.push_back(newVector()); + vectors.push_back(makeRowVector(rowType_, fuzzerOpts_)); } createDuckDbTable(vectors); std::vector sameQueries = {false, true}; @@ -519,12 +490,14 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, reclaimToJoinBuilder) { SCOPED_TRACE(fmt::format("sameQuery {}", sameQuery)); const auto oldStats = arbitrator_->stats(); std::shared_ptr fakeMemoryQueryCtx = - newQueryCtx(kMemoryCapacity); + newQueryCtx(memoryManager_, executor_, kMemoryCapacity); + ++numAddedPools_; std::shared_ptr joinQueryCtx; if (sameQuery) { joinQueryCtx = fakeMemoryQueryCtx; } else { - joinQueryCtx = newQueryCtx(kMemoryCapacity); + joinQueryCtx = newQueryCtx(memoryManager_, executor_, kMemoryCapacity); + ++numAddedPools_; } folly::EventCount joinWait; @@ -617,7 +590,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, driverInitTriggeredArbitration) { const int vectorSize = 100; fuzzerOpts_.vectorSize = vectorSize; for (int i = 0; i < numVectors; ++i) { - vectors.push_back(newVector()); + vectors.push_back(makeRowVector(rowType_, fuzzerOpts_)); } const int expectedResultVectorSize = numVectors * vectorSize; const auto expectedVector = makeRowVector( @@ -629,7 +602,8 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, driverInitTriggeredArbitration) { createDuckDbTable(vectors); setupMemory(kMemoryCapacity, 0); - std::shared_ptr queryCtx = newQueryCtx(kMemoryCapacity); + std::shared_ptr queryCtx = + newQueryCtx(memoryManager_, executor_, kMemoryCapacity); ASSERT_EQ(queryCtx->pool()->capacity(), 0); ASSERT_EQ(queryCtx->pool()->maxCapacity(), kMemoryCapacity); @@ -653,11 +627,12 @@ DEBUG_ONLY_TEST_F( const int numVectors = 10; std::vector vectors; for (int i = 0; i < numVectors; ++i) { - vectors.push_back(newVector()); + vectors.push_back(makeRowVector(rowType_, fuzzerOpts_)); } createDuckDbTable(vectors); - std::shared_ptr queryCtx = newQueryCtx(kMemoryCapacity); + std::shared_ptr queryCtx = + newQueryCtx(memoryManager_, executor_, kMemoryCapacity); ASSERT_EQ(queryCtx->pool()->capacity(), 0); // Allocate a large chunk of memory to trigger memory reclaim during the query @@ -747,15 +722,15 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, raceBetweenMaybeReserveAndTaskAbort) { const int numVectors = 10; std::vector vectors; for (int i = 0; i < numVectors; ++i) { - vectors.push_back(newVector()); + vectors.push_back(makeRowVector(rowType_, fuzzerOpts_)); } createDuckDbTable(vectors); - auto queryCtx = newQueryCtx(kMemoryCapacity); + auto queryCtx = newQueryCtx(memoryManager_, executor_, kMemoryCapacity); ASSERT_EQ(queryCtx->pool()->capacity(), 0); // Create a fake query to hold some memory to trigger memory arbitration. - auto fakeQueryCtx = newQueryCtx(kMemoryCapacity); + auto fakeQueryCtx = newQueryCtx(memoryManager_, executor_, kMemoryCapacity); auto fakeLeafPool = fakeQueryCtx->pool()->addLeafChild( "fakeLeaf", true, FakeMemoryReclaimer::create()); TestAllocation fakeAllocation{ @@ -813,10 +788,11 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, asyncArbitratonFromNonDriverContext) { const int numVectors = 10; std::vector vectors; for (int i = 0; i < numVectors; ++i) { - vectors.push_back(newVector()); + vectors.push_back(makeRowVector(rowType_, fuzzerOpts_)); } createDuckDbTable(vectors); - std::shared_ptr queryCtx = newQueryCtx(kMemoryCapacity); + std::shared_ptr queryCtx = + newQueryCtx(memoryManager_, executor_, kMemoryCapacity); ASSERT_EQ(queryCtx->pool()->capacity(), 0); folly::EventCount aggregationAllocationWait; @@ -914,7 +890,7 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, runtimeStats) { const auto spillDirectory = exec::test::TempDirectoryPath::create(); const auto outputDirectory = TempDirectoryPath::create(); - const auto queryCtx = newQueryCtx(memoryCapacity); + const auto queryCtx = newQueryCtx(memoryManager_, executor_, memoryCapacity); auto writerPlan = PlanBuilder() .values(vectors) @@ -965,13 +941,14 @@ DEBUG_ONLY_TEST_F(SharedArbitrationTest, arbitrateMemoryFromOtherOperator) { const int numVectors = 10; std::vector vectors; for (int i = 0; i < numVectors; ++i) { - vectors.push_back(newVector()); + vectors.push_back(makeRowVector(rowType_, fuzzerOpts_)); } createDuckDbTable(vectors); for (bool sameDriver : {false, true}) { SCOPED_TRACE(fmt::format("sameDriver {}", sameDriver)); - std::shared_ptr queryCtx = newQueryCtx(kMemoryCapacity); + std::shared_ptr queryCtx = + newQueryCtx(memoryManager_, executor_, kMemoryCapacity); ASSERT_EQ(queryCtx->pool()->capacity(), 0); std::atomic injectAllocationOnce{true}; @@ -1064,7 +1041,7 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { fuzzerOpts_.stringLength = 32; vectors.reserve(numVectors); for (int i = 0; i < numVectors; ++i) { - vectors.push_back(newVector()); + vectors.push_back(makeRowVector(rowType_, fuzzerOpts_)); } const int numDrivers = 4; const auto expectedWriteResult = @@ -1111,7 +1088,7 @@ TEST_F(SharedArbitrationTest, concurrentArbitration) { queryThreads.emplace_back([&, i]() { std::shared_ptr task; try { - auto queryCtx = newQueryCtx(queryCapacity); + auto queryCtx = newQueryCtx(memoryManager_, executor_, queryCapacity); if (i == 0) { // Write task contains aggregate node, which does not support // multithread aggregation type resolver, so make sure it is built @@ -1205,7 +1182,7 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) { { std::lock_guard l(mutex); auto oldNum = arbitrator_->stats().numReserves; - queries.emplace_back(newQueryCtx()); + queries.emplace_back(newQueryCtx(memoryManager_, executor_)); ASSERT_EQ(arbitrator_->stats().numReserves, oldNum + 1); } }); diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.cpp b/velox/exec/tests/utils/ArbitratorTestUtil.cpp index 62761c9f33d8..61558df69b4c 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.cpp +++ b/velox/exec/tests/utils/ArbitratorTestUtil.cpp @@ -15,8 +15,6 @@ */ #include "velox/exec/tests/utils/ArbitratorTestUtil.h" -#include "velox/common/memory/Memory.h" -#include "velox/core/QueryCtx.h" #include "velox/exec/TableWriter.h" using namespace facebook::velox; diff --git a/velox/vector/tests/utils/VectorTestBase.h b/velox/vector/tests/utils/VectorTestBase.h index bb443bf8b604..11df58621bf9 100644 --- a/velox/vector/tests/utils/VectorTestBase.h +++ b/velox/vector/tests/utils/VectorTestBase.h @@ -129,6 +129,13 @@ class VectorTestBase { return vectorMaker_.rowVector(rowType, size); } + RowVectorPtr makeRowVector( + const RowTypePtr& type, + const VectorFuzzer::Options& fuzzerOpts) { + VectorFuzzer fuzzer(fuzzerOpts, pool()); + return fuzzer.fuzzRow(type); + } + std::vector createVectors( const RowTypePtr& type, uint64_t byteSize,