From a3cfe7c6927877e85e6670f47202c8132362431f Mon Sep 17 00:00:00 2001 From: PHILO-HE Date: Thu, 7 Mar 2024 20:47:06 +0800 Subject: [PATCH] Revert "Move SharedArbitrator back to memory namespace for Alpha dependency (#8954)" This reverts commit 07a151827964e58fe45d26297045f3b73cf234a9. --- velox/common/memory/CMakeLists.txt | 1 - velox/common/memory/SharedArbitrator.h | 212 ------------------ velox/common/memory/tests/CMakeLists.txt | 2 - .../memory/tests/MemoryArbitratorTest.cpp | 8 +- .../common/memory/tests/MemoryManagerTest.cpp | 4 +- velox/common/memory/tests/MemoryPoolTest.cpp | 4 +- .../memory/tests/MockSharedArbitratorTest.cpp | 4 +- velox/exec/CMakeLists.txt | 1 + .../memory => exec}/SharedArbitrator.cpp | 6 +- velox/exec/SharedArbitrator.h | 197 +++++++++++++++- velox/exec/tests/CMakeLists.txt | 1 + .../tests/SharedArbitratorTest.cpp | 13 +- velox/exec/tests/utils/OperatorTestBase.cpp | 10 +- 13 files changed, 214 insertions(+), 249 deletions(-) delete mode 100644 velox/common/memory/SharedArbitrator.h rename velox/{common/memory => exec}/SharedArbitrator.cpp (99%) rename velox/{common/memory => exec}/tests/SharedArbitratorTest.cpp (99%) diff --git a/velox/common/memory/CMakeLists.txt b/velox/common/memory/CMakeLists.txt index dc0e618d93f1..0080d298b595 100644 --- a/velox/common/memory/CMakeLists.txt +++ b/velox/common/memory/CMakeLists.txt @@ -28,7 +28,6 @@ add_library( MemoryPool.cpp MmapAllocator.cpp MmapArena.cpp - SharedArbitrator.cpp StreamArena.cpp) target_link_libraries( diff --git a/velox/common/memory/SharedArbitrator.h b/velox/common/memory/SharedArbitrator.h deleted file mode 100644 index 40c2e85e4fb4..000000000000 --- a/velox/common/memory/SharedArbitrator.h +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Copyright (c) Facebook, Inc. and its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#pragma once - -#include "velox/common/memory/MemoryArbitrator.h" - -#include "velox/common/future/VeloxPromise.h" -#include "velox/common/memory/Memory.h" - -namespace facebook::velox::memory { - -/// Used to achieve dynamic memory sharing among running queries. When a -/// memory pool exceeds its current memory capacity, the arbitrator tries to -/// grow its capacity by reclaim the overused memory from the query with -/// more memory usage. We can configure memory arbitrator the way to reclaim -/// memory. For Prestissimo, we can configure it to reclaim memory by -/// aborting a query. For Prestissimo-on-Spark, we can configure it to -/// reclaim from a running query through techniques such as disk-spilling, -/// partial aggregation or persistent shuffle data flushes. -class SharedArbitrator : public memory::MemoryArbitrator { - public: - explicit SharedArbitrator(const Config& config); - - ~SharedArbitrator() override; - - static void registerFactory(); - - static void unregisterFactory(); - - uint64_t growCapacity(MemoryPool* pool, uint64_t targetBytes) final; - - bool growCapacity( - MemoryPool* pool, - const std::vector>& candidatePools, - uint64_t targetBytes) final; - - uint64_t shrinkCapacity(MemoryPool* pool, uint64_t freedBytes) final; - - uint64_t shrinkCapacity( - const std::vector>& pools, - uint64_t targetBytes) override final; - - Stats stats() const final; - - std::string kind() const override; - - std::string toString() const final; - - // The candidate memory pool stats used by arbitration. - struct Candidate { - bool reclaimable{false}; - uint64_t reclaimableBytes{0}; - uint64_t freeBytes{0}; - MemoryPool* pool; - - std::string toString() const; - }; - - private: - // The kind string of shared arbitrator. - inline static const std::string kind_{"SHARED"}; - - class ScopedArbitration { - public: - // Used by arbitration request NOT initiated from memory pool, e.g. through - // shrinkPools() API. - explicit ScopedArbitration(SharedArbitrator* arbitrator); - - // Used by arbitration request initiated from a memory pool. - explicit ScopedArbitration( - MemoryPool* requestor, - SharedArbitrator* arbitrator); - - ~ScopedArbitration(); - - private: - MemoryPool* const requestor_; - SharedArbitrator* const arbitrator_; - const std::chrono::steady_clock::time_point startTime_; - const ScopedMemoryArbitrationContext arbitrationCtx_; - }; - - // Invoked to check if the memory growth will exceed the memory pool's max - // capacity limit or the arbitrator's node capacity limit. - bool checkCapacityGrowth(const MemoryPool& pool, uint64_t targetBytes) const; - - // Invoked to ensure the memory growth request won't exceed the requestor's - // max capacity as well as the arbitrator's node capacity. If it does, then we - // first need to reclaim the used memory from the requestor itself to ensure - // the memory growth won't exceed the capacity limit, and then proceed with - // the memory arbitration process. The reclaimed memory capacity returns to - // the arbitrator, and let the memory arbitration process to grow the - // requestor capacity accordingly. - bool ensureCapacity(MemoryPool* requestor, uint64_t targetBytes); - - // Invoked to capture the candidate memory pools stats for arbitration. - static std::vector getCandidateStats( - const std::vector>& pools); - - void sortCandidatesByReclaimableMemory( - std::vector& candidates) const; - - void sortCandidatesByFreeCapacity(std::vector& candidates) const; - - // Finds the candidate with the largest capacity. For 'requestor', the - // capacity for comparison including its current capacity and the capacity to - // grow. - const Candidate& findCandidateWithLargestCapacity( - MemoryPool* requestor, - uint64_t targetBytes, - const std::vector& candidates) const; - - bool arbitrateMemory( - MemoryPool* requestor, - std::vector& candidates, - uint64_t targetBytes); - - // Invoked to start next memory arbitration request, and it will wait for the - // serialized execution if there is a running or other waiting arbitration - // requests. - void startArbitration(const std::string& contextMsg); - - // Invoked by a finished memory arbitration request to kick off the next - // arbitration request execution if there are any ones waiting. - void finishArbitration(); - - // Invoked to reclaim free memory capacity from 'candidates' without actually - // freeing used memory. - // - // NOTE: the function might sort 'candidates' based on each candidate's free - // capacity internally. - uint64_t reclaimFreeMemoryFromCandidates( - std::vector& candidates, - uint64_t targetBytes); - - // Invoked to reclaim used memory capacity from 'candidates'. - // - // NOTE: the function might sort 'candidates' based on each candidate's - // reclaimable memory internally. - uint64_t reclaimUsedMemoryFromCandidates( - MemoryPool* requestor, - std::vector& candidates, - uint64_t targetBytes); - - // Invoked to reclaim used memory from 'pool' with specified 'targetBytes'. - // The function returns the actually freed capacity. - uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes) noexcept; - - // Invoked to abort memory 'pool'. - void abort(MemoryPool* pool, const std::exception_ptr& error); - - // Invoked to handle the memory arbitration failure to abort the memory pool - // with the largest capacity to free up memory. The function returns true on - // success and false if the requestor itself has been selected as the victim. - // We don't abort the requestor itself but just fails the arbitration to let - // the user decide to either proceed with the query or fail it. - bool handleOOM( - MemoryPool* requestor, - uint64_t targetBytes, - std::vector& candidates); - - // Decrement free capacity from the arbitrator with up to 'bytes'. The - // arbitrator might have less free available capacity. The function returns - // the actual decremented free capacity bytes. - uint64_t decrementFreeCapacity(uint64_t bytes); - uint64_t decrementFreeCapacityLocked(uint64_t bytes); - - // Increment free capacity by 'bytes'. - void incrementFreeCapacity(uint64_t bytes); - void incrementFreeCapacityLocked(uint64_t bytes); - - std::string toStringLocked() const; - - Stats statsLocked() const; - - mutable std::mutex mutex_; - uint64_t freeCapacity_{0}; - // Indicates if there is a running arbitration request or not. - bool running_{false}; - - // The promises of the arbitration requests waiting for the serialized - // execution. - std::vector waitPromises_; - - tsan_atomic numRequests_{0}; - std::atomic numSucceeded_{0}; - tsan_atomic numAborted_{0}; - tsan_atomic numFailures_{0}; - tsan_atomic queueTimeUs_{0}; - tsan_atomic arbitrationTimeUs_{0}; - tsan_atomic numShrunkBytes_{0}; - tsan_atomic numReclaimedBytes_{0}; - tsan_atomic reclaimTimeUs_{0}; - tsan_atomic numNonReclaimableAttempts_{0}; - tsan_atomic numReserves_{0}; - tsan_atomic numReleases_{0}; -}; -} // namespace facebook::velox::memory diff --git a/velox/common/memory/tests/CMakeLists.txt b/velox/common/memory/tests/CMakeLists.txt index b7af34457f83..c58b0dd86821 100644 --- a/velox/common/memory/tests/CMakeLists.txt +++ b/velox/common/memory/tests/CMakeLists.txt @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. include(GoogleTest) - add_executable( velox_memory_test AllocationTest.cpp @@ -26,7 +25,6 @@ add_executable( MemoryManagerTest.cpp MemoryPoolTest.cpp MockSharedArbitratorTest.cpp - SharedArbitratorTest.cpp StreamArenaTest.cpp) target_link_libraries( diff --git a/velox/common/memory/tests/MemoryArbitratorTest.cpp b/velox/common/memory/tests/MemoryArbitratorTest.cpp index 59668ae85b49..9ea3434dac9a 100644 --- a/velox/common/memory/tests/MemoryArbitratorTest.cpp +++ b/velox/common/memory/tests/MemoryArbitratorTest.cpp @@ -22,7 +22,7 @@ #include "velox/common/memory/MallocAllocator.h" #include "velox/common/memory/Memory.h" #include "velox/common/memory/MemoryArbitrator.h" -#include "velox/common/memory/SharedArbitrator.h" +#include "velox/exec/SharedArbitrator.h" using namespace ::testing; @@ -112,8 +112,8 @@ TEST_F(MemoryArbitrationTest, queryMemoryCapacity) { }); } { - // Reserved memory is e`nforced when SharedMemoryArbitrator is used. - SharedArbitrator::registerFactory(); + // Reserved memory is enforced when SharedMemoryArbitrator is used. + exec::SharedArbitrator::registerFactory(); MemoryManagerOptions options; options.allocatorCapacity = 8L << 20; options.arbitratorCapacity = 4L << 20; @@ -147,7 +147,7 @@ TEST_F(MemoryArbitrationTest, queryMemoryCapacity) { ASSERT_EQ(manager.arbitrator()->shrinkCapacity(leafPool.get(), 0), 2 << 20); ASSERT_EQ(rootPool->capacity(), 0); ASSERT_EQ(leafPool->capacity(), 0); - memory::SharedArbitrator::unregisterFactory(); + exec::SharedArbitrator::unregisterFactory(); } } diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index 522dd0a38d32..17d83ef9cfcb 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -23,7 +23,7 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/memory/MallocAllocator.h" #include "velox/common/memory/Memory.h" -#include "velox/common/memory/SharedArbitrator.h" +#include "velox/exec/SharedArbitrator.h" DECLARE_int32(velox_memory_num_shared_leaf_pools); DECLARE_bool(velox_enable_memory_usage_track_in_default_memory_pool); @@ -43,7 +43,7 @@ MemoryManager& toMemoryManager(MemoryManager& manager) { class MemoryManagerTest : public testing::Test { protected: static void SetUpTestCase() { - SharedArbitrator::registerFactory(); + exec::SharedArbitrator::registerFactory(); } inline static const std::string arbitratorKind_{"SHARED"}; diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index ba3c99b42f71..6781e0c3f9bb 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -25,8 +25,8 @@ #include "velox/common/memory/Memory.h" #include "velox/common/memory/MemoryPool.h" #include "velox/common/memory/MmapAllocator.h" -#include "velox/common/memory/SharedArbitrator.h" #include "velox/common/testutil/TestValue.h" +#include "velox/exec/SharedArbitrator.h" DECLARE_bool(velox_memory_leak_check_enabled); DECLARE_bool(velox_memory_pool_debug_enabled); @@ -76,7 +76,7 @@ class MemoryPoolTest : public testing::TestWithParam { protected: static constexpr uint64_t kDefaultCapacity = 8 * GB; // 8GB static void SetUpTestCase() { - SharedArbitrator::registerFactory(); + exec::SharedArbitrator::registerFactory(); FLAGS_velox_memory_leak_check_enabled = true; TestValue::enable(); } diff --git a/velox/common/memory/tests/MockSharedArbitratorTest.cpp b/velox/common/memory/tests/MockSharedArbitratorTest.cpp index ccb19f4dfbd3..4cb675736618 100644 --- a/velox/common/memory/tests/MockSharedArbitratorTest.cpp +++ b/velox/common/memory/tests/MockSharedArbitratorTest.cpp @@ -26,8 +26,8 @@ #include "velox/common/memory/MallocAllocator.h" #include "velox/common/memory/Memory.h" #include "velox/common/memory/MemoryArbitrator.h" -#include "velox/common/memory/SharedArbitrator.h" #include "velox/common/testutil/TestValue.h" +#include "velox/exec/SharedArbitrator.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -390,7 +390,7 @@ MockTask::~MockTask() { class MockSharedArbitrationTest : public testing::Test { protected: static void SetUpTestCase() { - SharedArbitrator::registerFactory(); + exec::SharedArbitrator::registerFactory(); FLAGS_velox_memory_leak_check_enabled = true; TestValue::enable(); } diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index 69d6f7d70f19..898b152799cd 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -61,6 +61,7 @@ add_library( ProbeOperatorState.cpp RowContainer.cpp RowNumber.cpp + SharedArbitrator.cpp SortBuffer.cpp SortedAggregations.cpp SortWindowBuild.cpp diff --git a/velox/common/memory/SharedArbitrator.cpp b/velox/exec/SharedArbitrator.cpp similarity index 99% rename from velox/common/memory/SharedArbitrator.cpp rename to velox/exec/SharedArbitrator.cpp index 67dc7c1c0ef3..c859a611d7b5 100644 --- a/velox/common/memory/SharedArbitrator.cpp +++ b/velox/exec/SharedArbitrator.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "velox/common/memory/SharedArbitrator.h" +#include "velox/exec/SharedArbitrator.h" #include "velox/common/base/Counters.h" #include "velox/common/base/Exceptions.h" @@ -26,7 +26,7 @@ using facebook::velox::common::testutil::TestValue; -namespace facebook::velox::memory { +namespace facebook::velox::exec { namespace { @@ -708,4 +708,4 @@ void SharedArbitrator::registerFactory() { void SharedArbitrator::unregisterFactory() { MemoryArbitrator::unregisterFactory(kind_); } -} // namespace facebook::velox::memory +} // namespace facebook::velox::exec diff --git a/velox/exec/SharedArbitrator.h b/velox/exec/SharedArbitrator.h index ed64f7d28616..dcc5fe67a332 100644 --- a/velox/exec/SharedArbitrator.h +++ b/velox/exec/SharedArbitrator.h @@ -16,14 +16,199 @@ #pragma once -// TODO: remove this once after Prestissimo remove this reference. -#include "velox/common/memory/SharedArbitrator.h" +#include "velox/common/memory/MemoryArbitrator.h" + +#include "velox/common/future/VeloxPromise.h" +#include "velox/common/memory/Memory.h" + +using namespace facebook::velox::memory; namespace facebook::velox::exec { -class SharedArbitrator { + +/// Used to achieve dynamic memory sharing among running queries. When a +/// memory pool exceeds its current memory capacity, the arbitrator tries to +/// grow its capacity by reclaim the overused memory from the query with +/// more memory usage. We can configure memory arbitrator the way to reclaim +/// memory. For Prestissimo, we can configure it to reclaim memory by +/// aborting a query. For Prestissimo-on-Spark, we can configure it to +/// reclaim from a running query through techniques such as disk-spilling, +/// partial aggregation or persistent shuffle data flushes. +class SharedArbitrator : public memory::MemoryArbitrator { public: - static void registerFactory() { - memory::SharedArbitrator::registerFactory(); - } + explicit SharedArbitrator(const Config& config); + + ~SharedArbitrator() override; + + static void registerFactory(); + + static void unregisterFactory(); + + uint64_t growCapacity(MemoryPool* pool, uint64_t targetBytes) final; + + bool growCapacity( + MemoryPool* pool, + const std::vector>& candidatePools, + uint64_t targetBytes) final; + + uint64_t shrinkCapacity(MemoryPool* pool, uint64_t freedBytes) final; + + uint64_t shrinkCapacity( + const std::vector>& pools, + uint64_t targetBytes) override final; + + Stats stats() const final; + + std::string kind() const override; + + std::string toString() const final; + + // The candidate memory pool stats used by arbitration. + struct Candidate { + bool reclaimable{false}; + uint64_t reclaimableBytes{0}; + uint64_t freeBytes{0}; + MemoryPool* pool; + + std::string toString() const; + }; + + private: + // The kind string of shared arbitrator. + inline static const std::string kind_{"SHARED"}; + + class ScopedArbitration { + public: + // Used by arbitration request NOT initiated from memory pool, e.g. through + // shrinkPools() API. + explicit ScopedArbitration(SharedArbitrator* arbitrator); + + // Used by arbitration request initiated from a memory pool. + explicit ScopedArbitration( + MemoryPool* requestor, + SharedArbitrator* arbitrator); + + ~ScopedArbitration(); + + private: + MemoryPool* const requestor_; + SharedArbitrator* const arbitrator_; + const std::chrono::steady_clock::time_point startTime_; + const ScopedMemoryArbitrationContext arbitrationCtx_; + }; + + // Invoked to check if the memory growth will exceed the memory pool's max + // capacity limit or the arbitrator's node capacity limit. + bool checkCapacityGrowth(const MemoryPool& pool, uint64_t targetBytes) const; + + // Invoked to ensure the memory growth request won't exceed the requestor's + // max capacity as well as the arbitrator's node capacity. If it does, then we + // first need to reclaim the used memory from the requestor itself to ensure + // the memory growth won't exceed the capacity limit, and then proceed with + // the memory arbitration process. The reclaimed memory capacity returns to + // the arbitrator, and let the memory arbitration process to grow the + // requestor capacity accordingly. + bool ensureCapacity(MemoryPool* requestor, uint64_t targetBytes); + + // Invoked to capture the candidate memory pools stats for arbitration. + static std::vector getCandidateStats( + const std::vector>& pools); + + void sortCandidatesByReclaimableMemory( + std::vector& candidates) const; + + void sortCandidatesByFreeCapacity(std::vector& candidates) const; + + // Finds the candidate with the largest capacity. For 'requestor', the + // capacity for comparison including its current capacity and the capacity to + // grow. + const Candidate& findCandidateWithLargestCapacity( + MemoryPool* requestor, + uint64_t targetBytes, + const std::vector& candidates) const; + + bool arbitrateMemory( + MemoryPool* requestor, + std::vector& candidates, + uint64_t targetBytes); + + // Invoked to start next memory arbitration request, and it will wait for the + // serialized execution if there is a running or other waiting arbitration + // requests. + void startArbitration(const std::string& contextMsg); + + // Invoked by a finished memory arbitration request to kick off the next + // arbitration request execution if there are any ones waiting. + void finishArbitration(); + + // Invoked to reclaim free memory capacity from 'candidates' without actually + // freeing used memory. + // + // NOTE: the function might sort 'candidates' based on each candidate's free + // capacity internally. + uint64_t reclaimFreeMemoryFromCandidates( + std::vector& candidates, + uint64_t targetBytes); + + // Invoked to reclaim used memory capacity from 'candidates'. + // + // NOTE: the function might sort 'candidates' based on each candidate's + // reclaimable memory internally. + uint64_t reclaimUsedMemoryFromCandidates( + MemoryPool* requestor, + std::vector& candidates, + uint64_t targetBytes); + + // Invoked to reclaim used memory from 'pool' with specified 'targetBytes'. + // The function returns the actually freed capacity. + uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes) noexcept; + + // Invoked to abort memory 'pool'. + void abort(MemoryPool* pool, const std::exception_ptr& error); + + // Invoked to handle the memory arbitration failure to abort the memory pool + // with the largest capacity to free up memory. The function returns true on + // success and false if the requestor itself has been selected as the victim. + // We don't abort the requestor itself but just fails the arbitration to let + // the user decide to either proceed with the query or fail it. + bool handleOOM( + MemoryPool* requestor, + uint64_t targetBytes, + std::vector& candidates); + + // Decrement free capacity from the arbitrator with up to 'bytes'. The + // arbitrator might have less free available capacity. The function returns + // the actual decremented free capacity bytes. + uint64_t decrementFreeCapacity(uint64_t bytes); + uint64_t decrementFreeCapacityLocked(uint64_t bytes); + + // Increment free capacity by 'bytes'. + void incrementFreeCapacity(uint64_t bytes); + void incrementFreeCapacityLocked(uint64_t bytes); + + std::string toStringLocked() const; + + Stats statsLocked() const; + + mutable std::mutex mutex_; + uint64_t freeCapacity_{0}; + // Indicates if there is a running arbitration request or not. + bool running_{false}; + + // The promises of the arbitration requests waiting for the serialized + // execution. + std::vector waitPromises_; + + tsan_atomic numRequests_{0}; + std::atomic numSucceeded_{0}; + tsan_atomic numAborted_{0}; + tsan_atomic numFailures_{0}; + tsan_atomic queueTimeUs_{0}; + tsan_atomic arbitrationTimeUs_{0}; + tsan_atomic numShrunkBytes_{0}; + tsan_atomic numReclaimedBytes_{0}; + tsan_atomic reclaimTimeUs_{0}; + tsan_atomic numNonReclaimableAttempts_{0}; + tsan_atomic numReserves_{0}; + tsan_atomic numReleases_{0}; }; } // namespace facebook::velox::exec diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 09cc2af0175f..21f1b70609ef 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -64,6 +64,7 @@ add_executable( RowContainerTest.cpp RowNumberTest.cpp MarkDistinctTest.cpp + SharedArbitratorTest.cpp SpillTest.cpp SpillOperatorGroupTest.cpp SpillerTest.cpp diff --git a/velox/common/memory/tests/SharedArbitratorTest.cpp b/velox/exec/tests/SharedArbitratorTest.cpp similarity index 99% rename from velox/common/memory/tests/SharedArbitratorTest.cpp rename to velox/exec/tests/SharedArbitratorTest.cpp index 3489f0991e6d..311d836b01f1 100644 --- a/velox/common/memory/tests/SharedArbitratorTest.cpp +++ b/velox/exec/tests/SharedArbitratorTest.cpp @@ -19,20 +19,19 @@ #include #include -#include #include #include #include "folly/experimental/EventCount.h" #include "velox/common/base/Exceptions.h" #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/memory/MallocAllocator.h" -#include "velox/common/memory/SharedArbitrator.h" #include "velox/common/testutil/TestValue.h" #include "velox/connectors/hive/HiveConfig.h" #include "velox/core/PlanNode.h" #include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/Driver.h" #include "velox/exec/HashBuild.h" +#include "velox/exec/SharedArbitrator.h" #include "velox/exec/TableWriter.h" #include "velox/exec/Values.h" #include "velox/exec/tests/utils/ArbitratorTestUtil.h" @@ -46,7 +45,7 @@ using namespace facebook::velox::common::testutil; using namespace facebook::velox::exec; using namespace facebook::velox::exec::test; -namespace facebook::velox::memory { +namespace facebook::velox::exec::test { // Custom node for the custom factory. class FakeMemoryNode : public core::PlanNode { public: @@ -1189,10 +1188,4 @@ TEST_F(SharedArbitrationTest, reserveReleaseCounters) { ASSERT_EQ(arbitrator_->stats().numReleases, numRootPools); } } -} // namespace facebook::velox::memory - -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - folly::Init init{&argc, &argv, false}; - return RUN_ALL_TESTS(); -} +} // namespace facebook::velox::exec::test diff --git a/velox/exec/tests/utils/OperatorTestBase.cpp b/velox/exec/tests/utils/OperatorTestBase.cpp index 4f1ae5c15238..ac19629621de 100644 --- a/velox/exec/tests/utils/OperatorTestBase.cpp +++ b/velox/exec/tests/utils/OperatorTestBase.cpp @@ -18,11 +18,12 @@ #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/file/FileSystems.h" #include "velox/common/memory/MallocAllocator.h" -#include "velox/common/memory/SharedArbitrator.h" #include "velox/common/testutil/TestValue.h" #include "velox/dwio/common/FileSink.h" #include "velox/exec/Exchange.h" #include "velox/exec/OutputBufferManager.h" +#include "velox/exec/SharedArbitrator.h" +#include "velox/exec/tests/utils/ArbitratorTestUtil.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" #include "velox/functions/prestosql/registration/RegistrationFunctions.h" @@ -58,7 +59,7 @@ OperatorTestBase::~OperatorTestBase() { void OperatorTestBase::SetUpTestCase() { FLAGS_velox_enable_memory_usage_track_in_default_memory_pool = true; FLAGS_velox_memory_leak_check_enabled = true; - memory::SharedArbitrator::registerFactory(); + exec::SharedArbitrator::registerFactory(); memory::MemoryManagerOptions options; options.allocatorCapacity = 8L << 30; if (FLAGS_velox_testing_enable_arbitration) { @@ -68,8 +69,7 @@ void OperatorTestBase::SetUpTestCase() { options.arbitrationStateCheckCb = memoryArbitrationStateCheck; } memory::MemoryManager::testingSetInstance(options); - asyncDataCache_ = - cache::AsyncDataCache::create(memory::memoryManager()->allocator()); + asyncDataCache_ = cache::AsyncDataCache::create(memoryManager()->allocator()); cache::AsyncDataCache::setInstance(asyncDataCache_.get()); functions::prestosql::registerAllScalarFunctions(); aggregate::prestosql::registerAllAggregateFunctions(); @@ -79,7 +79,7 @@ void OperatorTestBase::SetUpTestCase() { void OperatorTestBase::TearDownTestCase() { asyncDataCache_->shutdown(); waitForAllTasksToBeDeleted(); - memory::SharedArbitrator::unregisterFactory(); + exec::SharedArbitrator::unregisterFactory(); } void OperatorTestBase::SetUp() {