From 9094bd4445f7edc33d00ec683507a56fb7d9ada6 Mon Sep 17 00:00:00 2001 From: Xiaoxuan Meng Date: Wed, 25 Sep 2024 14:11:05 -0700 Subject: [PATCH] Add separate writer threads config for bucketed table write to avoid max open file exceeded error (#11087) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11087 The bucket table write could run into max open file exceeding errors. There are problems here: (1) on GBM cluster which only has 300 nodes, so the query will run on 300 nodes and the bucket table writer has 4 driver threads per node. And the remote exchange and local exchange both uses the same hive partition function but the remote exchange has 300 partitions while the local has 4 partitions and 300 is a multiple of 4 which could cause the data skew which cause one driver thread has received all the data which exceeds the open file limit. Confirmed with both 9 and 7 writer threads work (2) native cluster is generally 1/2 of the java cluster size so it is more easily run into the max open file limit so we need to bump up the write driver threads for native cluster. This PR adds a separate config at the native side to bump up the bucket table write driver threads to solve the problem. We don't want to bump up the partition writer threads as non-bucketed partition table won't have any open files (partitions) and we don't want to create too many small files unnecessarily. This PR only adds support at Velox side: extends the plan node to include a bucket table property flag to indicate if this is bucketed table as well as configure that table writer threads based on in local query planner. The followups will add support at Prestissimo side. Reviewed By: tanjialiang Differential Revision: D63325016 --- velox/core/PlanNode.cpp | 3 + velox/core/PlanNode.h | 39 ++- velox/core/QueryConfig.h | 9 + velox/core/tests/QueryConfigTest.cpp | 45 ++- velox/docs/configs.rst | 4 + velox/exec/LocalPlanner.cpp | 4 +- velox/exec/tests/TableWriteTest.cpp | 362 ++++++++++++++++--------- velox/exec/tests/utils/PlanBuilder.cpp | 5 +- 8 files changed, 332 insertions(+), 139 deletions(-) diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index f1e56ed7cfd6..5a09581edcb5 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -1853,6 +1853,7 @@ folly::dynamic TableWriteNode::serialize() const { obj["connectorInsertTableHandle"] = insertTableHandle_->connectorInsertTableHandle()->serialize(); obj["hasPartitioningScheme"] = hasPartitioningScheme_; + obj["hasBucketProperty"] = hasBucketProperty_; obj["outputType"] = outputType_->serialize(); obj["commitStrategy"] = connector::commitStrategyToString(commitStrategy_); return obj; @@ -1875,6 +1876,7 @@ PlanNodePtr TableWriteNode::create(const folly::dynamic& obj, void* context) { ISerializable::deserialize( obj["connectorInsertTableHandle"])); const bool hasPartitioningScheme = obj["hasPartitioningScheme"].asBool(); + const bool hasBucketProperty = obj["hasBucketProperty"].asBool(); auto outputType = deserializeRowType(obj["outputType"]); auto commitStrategy = connector::stringToCommitStrategy(obj["commitStrategy"].asString()); @@ -1887,6 +1889,7 @@ PlanNodePtr TableWriteNode::create(const folly::dynamic& obj, void* context) { std::make_shared( connectorId, connectorInsertTableHandle), hasPartitioningScheme, + hasBucketProperty, outputType, commitStrategy, source); diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index f3ff68e802fb..3600a86897ed 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -671,6 +671,7 @@ class TableWriteNode : public PlanNode { std::shared_ptr aggregationNode, std::shared_ptr insertTableHandle, bool hasPartitioningScheme, + bool hasBucketProperty, RowTypePtr outputType, connector::CommitStrategy commitStrategy, const PlanNodePtr& source) @@ -681,6 +682,7 @@ class TableWriteNode : public PlanNode { aggregationNode_(std::move(aggregationNode)), insertTableHandle_(std::move(insertTableHandle)), hasPartitioningScheme_(hasPartitioningScheme), + hasBucketProperty_(hasBucketProperty), outputType_(std::move(outputType)), commitStrategy_(commitStrategy) { VELOX_USER_CHECK_EQ(columns->size(), columnNames.size()); @@ -693,6 +695,30 @@ class TableWriteNode : public PlanNode { } } +#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY + TableWriteNode( + const PlanNodeId& id, + const RowTypePtr& columns, + const std::vector& columnNames, + std::shared_ptr aggregationNode, + std::shared_ptr insertTableHandle, + bool hasPartitioningScheme, + RowTypePtr outputType, + connector::CommitStrategy commitStrategy, + const PlanNodePtr& source) + : TableWriteNode( + id, + columns, + columnNames, + std::move(aggregationNode), + std::move(insertTableHandle), + hasPartitioningScheme, + false, + std::move(outputType), + commitStrategy, + source) {} +#endif + const std::vector& sources() const override { return sources_; } @@ -720,12 +746,20 @@ class TableWriteNode : public PlanNode { /// Indicates if this table write has specified partitioning scheme. If true, /// the task creates a number of table write operators based on the query /// config 'task_partitioned_writer_count', otherwise based on - /// 'task__writer_count'. As for now, this is only true for hive bucketed - /// table write. + /// 'task_writer_count'. bool hasPartitioningScheme() const { return hasPartitioningScheme_; } + /// Indicates if this table write has specified bucket property. If true, the + /// task creates a number of table write operators based on the query config + /// 'task_partitioned_bucket_writer_count', otherwise based on + /// 'task_partitioned_writer_count' or 'task__writer_count' depending on + /// whether paritition scheme is specified or not. + bool hasBucketProperty() const { + return hasBucketProperty_; + } + connector::CommitStrategy commitStrategy() const { return commitStrategy_; } @@ -756,6 +790,7 @@ class TableWriteNode : public PlanNode { const std::shared_ptr aggregationNode_; const std::shared_ptr insertTableHandle_; const bool hasPartitioningScheme_; + const bool hasBucketProperty_; const RowTypePtr outputType_; const connector::CommitStrategy commitStrategy_; }; diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 6bd934d0c4f5..ad42655b69c1 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -300,6 +300,11 @@ class QueryConfig { static constexpr const char* kTaskPartitionedWriterCount = "task_partitioned_writer_count"; + /// The number of local parallel table writer operators per task for + /// bucketed writes. If not set, use "task_writer_count". + static constexpr const char* kTaskBucketedWriterCount = + "task_bucketed_writer_count"; + /// If true, finish the hash probe on an empty build table for a specific set /// of hash joins. static constexpr const char* kHashProbeFinishEarlyOnEmptyBuild = @@ -767,6 +772,10 @@ class QueryConfig { .value_or(taskWriterCount()); } + uint32_t taskBucketedWriterCount() const { + return get(kTaskBucketedWriterCount).value_or(taskWriterCount()); + } + bool hashProbeFinishEarlyOnEmptyBuild() const { return get(kHashProbeFinishEarlyOnEmptyBuild, false); } diff --git a/velox/core/tests/QueryConfigTest.cpp b/velox/core/tests/QueryConfigTest.cpp index c89d44d2fdfa..227f85a2fbbe 100644 --- a/velox/core/tests/QueryConfigTest.cpp +++ b/velox/core/tests/QueryConfigTest.cpp @@ -57,27 +57,42 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) { struct { std::optional numWriterCounter; std::optional numPartitionedWriterCounter; + std::optional numBucketedWriterCounter; int expectedWriterCounter; int expectedPartitionedWriterCounter; + int expectedBucketedWriterCounter; std::string debugString() const { return fmt::format( - "numWriterCounter[{}] numPartitionedWriterCounter[{}] expectedWriterCounter[{}] expectedPartitionedWriterCounter[{}]", + "numWriterCounter[{}] numPartitionedWriterCounter[{}] numBucketedWriterCounter[{}] expectedPartitionedWriterCounter[{}] expectedBucketedWriterCounter[{}]", numWriterCounter.value_or(0), numPartitionedWriterCounter.value_or(0), + numBucketedWriterCounter.value_or(0), expectedWriterCounter, - expectedPartitionedWriterCounter); + expectedPartitionedWriterCounter, + expectedBucketedWriterCounter); } } testSettings[] = { - {std::nullopt, std::nullopt, 4, 4}, - {std::nullopt, 1, 4, 1}, - {std::nullopt, 6, 4, 6}, - {2, 4, 2, 4}, - {4, 2, 4, 2}, - {4, 6, 4, 6}, - {6, 5, 6, 5}, - {6, 4, 6, 4}, - {6, std::nullopt, 6, 6}}; + {std::nullopt, std::nullopt, std::nullopt, 4, 4, 4}, + {std::nullopt, 1, std::nullopt, 4, 1, 4}, + {std::nullopt, 6, std::nullopt, 4, 6, 4}, + {2, 4, std::nullopt, 2, 4, 2}, + {4, 2, std::nullopt, 4, 2, 4}, + {4, 6, std::nullopt, 4, 6, 4}, + {6, 5, std::nullopt, 6, 5, 6}, + {6, 4, std::nullopt, 6, 4, 6}, + {6, std::nullopt, 6, 6, 6, 6}, + {6, std::nullopt, 1, 6, 6, 1}, + {std::nullopt, std::nullopt, 4, 4, 4, 4}, + {std::nullopt, std::nullopt, 1, 4, 4, 1}, + {std::nullopt, 1, 1, 4, 1, 1}, + {std::nullopt, 1, 2, 4, 1, 2}, + {std::nullopt, 6, 6, 4, 6, 6}, + {std::nullopt, 6, 3, 4, 6, 3}, + {2, 4, 3, 2, 4, 3}, + {4, 2, 1, 4, 2, 1}, + {4, 6, 7, 4, 6, 7}, + {6, std::nullopt, 4, 6, 6, 4}}; for (const auto& testConfig : testSettings) { SCOPED_TRACE(testConfig.debugString()); std::unordered_map configData; @@ -91,6 +106,11 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) { QueryConfig::kTaskPartitionedWriterCount, std::to_string(testConfig.numPartitionedWriterCounter.value())); } + if (testConfig.numBucketedWriterCounter.has_value()) { + configData.emplace( + QueryConfig::kTaskBucketedWriterCount, + std::to_string(testConfig.numBucketedWriterCounter.value())); + } auto queryCtx = QueryCtx::create(nullptr, QueryConfig{std::move(configData)}); const QueryConfig& config = queryCtx->queryConfig(); @@ -98,6 +118,9 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) { ASSERT_EQ( config.taskPartitionedWriterCount(), testConfig.expectedPartitionedWriterCounter); + ASSERT_EQ( + config.taskBucketedWriterCount(), + testConfig.expectedBucketedWriterCounter); } } diff --git a/velox/docs/configs.rst b/velox/docs/configs.rst index 87f12b445ce9..2abf0ba77e62 100644 --- a/velox/docs/configs.rst +++ b/velox/docs/configs.rst @@ -382,6 +382,10 @@ Table Writer - 1 - The number of parallel table writer threads per task. * - task_partitioned_writer_count + - integer + - task_writer_count + - The number of parallel table writer threads per task for partitioned table writes. If not set, use 'task_writer_count' as default. + * - task_bucketed_writer_count - integer - task_writer_count - The number of parallel table writer threads per task for bucketed table writes. If not set, use 'task_writer_count' as default. diff --git a/velox/exec/LocalPlanner.cpp b/velox/exec/LocalPlanner.cpp index bf99d78e4cf0..77d538e3aac1 100644 --- a/velox/exec/LocalPlanner.cpp +++ b/velox/exec/LocalPlanner.cpp @@ -238,7 +238,9 @@ uint32_t maxDrivers( if (!connectorInsertHandle->supportsMultiThreading()) { return 1; } else { - if (tableWrite->hasPartitioningScheme()) { + if (tableWrite->hasBucketProperty()) { + return queryConfig.taskBucketedWriterCount(); + } else if (tableWrite->hasPartitioningScheme()) { return queryConfig.taskPartitionedWriterCount(); } else { return queryConfig.taskWriterCount(); diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index eac3342dedd2..15b0247033cb 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -49,6 +49,8 @@ using namespace facebook::velox::dwio::common; using namespace facebook::velox::common::testutil; using namespace facebook::velox::common::hll; +constexpr uint64_t kQueryMemoryCapacity = 512 * MB; + enum class TestMode { kUnpartitioned, kPartitioned, @@ -100,6 +102,7 @@ std::function addTableWriter( const std::shared_ptr& aggregationNode, const std::shared_ptr& insertHandle, bool hasPartitioningScheme, + bool hasBucketProperty, connector::CommitStrategy commitStrategy = connector::CommitStrategy::kNoCommit) { return [=](core::PlanNodeId nodeId, @@ -111,6 +114,7 @@ std::function addTableWriter( aggregationNode, insertHandle, hasPartitioningScheme, + hasBucketProperty, TableWriteTraits::outputType(aggregationNode), commitStrategy, std::move(source)); @@ -604,6 +608,7 @@ class TableWriteTest : public HiveConnectorTestBase { partitionedBy, bucketProperty, compressionKind), + !partitionedBy.empty(), bucketProperty != nullptr, outputCommitStrategy)) .capturePlanNodeId(tableWriteNodeId_); @@ -628,6 +633,7 @@ class TableWriteTest : public HiveConnectorTestBase { partitionedBy, bucketProperty, compressionKind), + !partitionedBy.empty(), bucketProperty != nullptr, outputCommitStrategy)) .capturePlanNodeId(tableWriteNodeId_) @@ -669,6 +675,7 @@ class TableWriteTest : public HiveConnectorTestBase { partitionedBy, bucketProperty, compressionKind), + !partitionedBy.empty(), bucketProperty != nullptr, outputCommitStrategy)) .capturePlanNodeId(tableWriteNodeId_) @@ -1023,13 +1030,6 @@ class TableWriteTest : public HiveConnectorTestBase { const TestParam testParam_; const FileFormat fileFormat_; const TestMode testMode_; - // Returns all available table types to test insert without any - // partitions (used in "immutablePartitions" set of tests). - const std::vector tableTypes_ = { - // Velox does not currently support TEMPORARY table type. - // Once supported, it should be added to this list. - connector::hive::LocationHandle::TableType::kNew, - connector::hive::LocationHandle::TableType::kExisting}; const int numTableWriterCount_; const int numPartitionedTableWriterCount_; @@ -1605,6 +1605,65 @@ TEST_P(AllTableWriterTest, scanFilterProjectWrite) { } } +TEST_P(AllTableWriterTest, writerDriverThreads) { + const int batchSize = 1'000; + const int numBatches = 20; + const std::vector vectors = makeVectors(numBatches, batchSize); + + createDuckDbTable(vectors); + + auto queryCtx = core::QueryCtx::create(executor_.get()); + auto outputDirectory = TempDirectoryPath::create(); + core::PlanNodeId tableWriteNodeId; + auto writerPlan = + PlanBuilder() + .values(vectors, /*parallelizable=*/true) + .tableWrite( + outputDirectory->getPath(), + partitionedBy_, + bucketProperty_ != nullptr ? bucketProperty_->bucketCount() : 0, + bucketProperty_ != nullptr ? bucketProperty_->bucketedBy() + : std::vector{}, + bucketProperty_ != nullptr + ? bucketProperty_->sortedBy() + : std::vector>{}) + .capturePlanNodeId(tableWriteNodeId) + .localPartition({}) + .tableWriteMerge() + .project({TableWriteTraits::rowCountColumnName()}) + .singleAggregation( + {}, + {fmt::format("sum({})", TableWriteTraits::rowCountColumnName())}) + .planNode(); + const int taskWriterCount = 4; + const int taskPartitionedWriterCount = 8; + const int taskBucketWriterCount = 9; + const auto expectedNumWriterDrivers = bucketProperty_ != nullptr + ? taskBucketWriterCount + : partitionedBy_.empty() ? taskWriterCount + : taskPartitionedWriterCount; + const auto expectedNumRows = + numBatches * batchSize * expectedNumWriterDrivers; + auto task = AssertQueryBuilder(duckDbQueryRunner_) + .queryCtx(queryCtx) + .maxDrivers(10) + .config( + core::QueryConfig::kTaskWriterCount, + std::to_string(taskWriterCount)) + .config( + core::QueryConfig::kTaskPartitionedWriterCount, + std::to_string(taskPartitionedWriterCount)) + .config( + core::QueryConfig::kTaskBucketedWriterCount, + std::to_string(taskBucketWriterCount)) + .plan(std::move(writerPlan)) + .assertResults(fmt::format("SELECT {}", expectedNumRows)); + auto planStats = exec::toPlanStats(task->taskStats()); + auto& tableWriteStats = planStats.at(tableWriteNodeId); + ASSERT_EQ(tableWriteStats.numDrivers, expectedNumWriterDrivers); +} + TEST_P(AllTableWriterTest, renameAndReorderColumns) { auto filePaths = makeFilePaths(5); auto vectors = makeVectors(filePaths.size(), 500); @@ -2841,6 +2900,7 @@ TEST_P(AllTableWriterTest, columnStatsDataTypes) { partitionedBy_, nullptr, makeLocationHandle(outputDirectory->getPath()))), + !partitionedBy_.empty(), false, CommitStrategy::kNoCommit)) .planNode(); @@ -2930,7 +2990,8 @@ TEST_P(AllTableWriterTest, columnStats) { partitionedBy_, bucketProperty_, makeLocationHandle(outputDirectory->getPath()))), - false, + !partitionedBy_.empty(), + bucketProperty_ != nullptr, commitStrategy_)) .planNode(); @@ -2957,8 +3018,9 @@ TEST_P(AllTableWriterTest, columnStats) { // null partition2_update x null null // null partition3_update x null null // - // Note that we can have multiple same partition_update, they're for different - // files, but for stats, we would only have one record for each partition + // Note that we can have multiple same partition_update, they're for + // different files, but for stats, we would only have one record for each + // partition // // For unpartitioned, expected result is: // Row Fragment Context partition c1_min_value @@ -3029,7 +3091,8 @@ TEST_P(AllTableWriterTest, columnStatsWithTableWriteMerge) { partitionedBy_, bucketProperty_, makeLocationHandle(outputDirectory->getPath()))), - false, + !partitionedBy_.empty(), + bucketProperty_ != nullptr, commitStrategy_)); auto mergeAggregationNode = generateAggregationNode( @@ -3066,8 +3129,9 @@ TEST_P(AllTableWriterTest, columnStatsWithTableWriteMerge) { // null partition2_update x null null // null partition3_update x null null // - // Note that we can have multiple same partition_update, they're for different - // files, but for stats, we would only have one record for each partition + // Note that we can have multiple same partition_update, they're for + // different files, but for stats, we would only have one record for each + // partition // // For unpartitioned, expected result is: // Row Fragment Context partition c1_min_value @@ -3478,13 +3542,17 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromTableWriter) { for (bool writerSpillEnabled : {false, true}) { { SCOPED_TRACE(fmt::format("writerSpillEnabled: {}", writerSpillEnabled)); - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); - - std::atomic numInputs{0}; + auto queryPool = memory::memoryManager()->addRootPool( + "reclaimFromTableWriter", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + const int numPrevArbitrationFailures = arbitrator->stats().numFailures; + const int numPrevNonReclaimableAttempts = + arbitrator->stats().numNonReclaimableAttempts; + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); + + std::atomic_int numInputs{0}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::addInput", std::function(([&](Operator* op) { @@ -3499,8 +3567,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromTableWriter) { } const auto fakeAllocationSize = - arbitrator->stats().maxCapacityBytes - - op->pool()->parent()->reservedBytes(); + kQueryMemoryCapacity - op->pool()->parent()->reservedBytes(); if (writerSpillEnabled) { auto* buffer = op->pool()->allocate(fakeAllocationSize); op->pool()->free(buffer, fakeAllocationSize); @@ -3534,8 +3601,8 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromTableWriter) { .config(core::QueryConfig::kSpillEnabled, writerSpillEnabled) .config( core::QueryConfig::kWriterSpillEnabled, writerSpillEnabled) - // Set 0 file writer flush threshold to always trigger flush in - // test. + // Set 0 file writer flush threshold to always trigger flush + // in test. .config(core::QueryConfig::kWriterFlushThresholdBytes, 0) .plan(std::move(writerPlan)) .assertResults(fmt::format("SELECT {}", numRows)); @@ -3553,15 +3620,19 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromTableWriter) { .at(HiveDataSink::kEarlyFlushedRawBytes) .sum, 0); - ASSERT_EQ(arbitrator->stats().numFailures, 0); + ASSERT_EQ( + arbitrator->stats().numFailures, numPrevArbitrationFailures); } else { ASSERT_EQ( tableWriteStats->customStats.count( HiveDataSink::kEarlyFlushedRawBytes), 0); - ASSERT_EQ(arbitrator->stats().numFailures, 1); + ASSERT_EQ( + arbitrator->stats().numFailures, numPrevArbitrationFailures + 1); } - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0); + ASSERT_EQ( + arbitrator->stats().numNonReclaimableAttempts, + numPrevNonReclaimableAttempts); } waitForAllTasksToBeDeleted(3'000'000); } @@ -3590,11 +3661,15 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) { for (bool writerSpillEnabled : {false, true}) { { SCOPED_TRACE(fmt::format("writerSpillEnabled: {}", writerSpillEnabled)); - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); + auto queryPool = memory::memoryManager()->addRootPool( + "reclaimFromSortTableWriter", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + const int numPrevArbitrationFailures = arbitrator->stats().numFailures; + const int numPrevNonReclaimableAttempts = + arbitrator->stats().numNonReclaimableAttempts; + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); const auto spillStats = common::globalSpillStats(); std::atomic numInputs{0}; @@ -3612,8 +3687,7 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) { } const auto fakeAllocationSize = - arbitrator->stats().maxCapacityBytes - - op->pool()->parent()->reservedBytes(); + kQueryMemoryCapacity - op->pool()->parent()->reservedBytes(); if (writerSpillEnabled) { auto* buffer = op->pool()->allocate(fakeAllocationSize); op->pool()->free(buffer, fakeAllocationSize); @@ -3651,13 +3725,18 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, reclaimFromSortTableWriter) { .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, writerSpillEnabled) .config(core::QueryConfig::kWriterSpillEnabled, writerSpillEnabled) - // Set 0 file writer flush threshold to always trigger flush in test. + // Set 0 file writer flush threshold to always trigger flush in + // test. .config(core::QueryConfig::kWriterFlushThresholdBytes, 0) .plan(std::move(writerPlan)) .assertResults(fmt::format("SELECT {}", numRows)); - ASSERT_EQ(arbitrator->stats().numFailures, writerSpillEnabled ? 0 : 1); - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0); + ASSERT_EQ( + arbitrator->stats().numFailures, + numPrevArbitrationFailures + (writerSpillEnabled ? 0 : 1)); + ASSERT_EQ( + arbitrator->stats().numNonReclaimableAttempts, + numPrevNonReclaimableAttempts); waitForAllTasksToBeDeleted(3'000'000); const auto updatedSpillStats = common::globalSpillStats(); @@ -3696,11 +3775,16 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { succinctBytes(testParam.bytesToReserve), succinctBytes(testParam.writerFlushThreshold))); - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); + auto queryPool = memory::memoryManager()->addRootPool( + "writerFlushThreshold", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + const int numPrevArbitrationFailures = arbitrator->stats().numFailures; + const int numPrevNonReclaimableAttempts = + arbitrator->stats().numNonReclaimableAttempts; + const int numPrevShrinks = arbitrator->stats().numShrinks; + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); memory::MemoryPool* compressionPool{nullptr}; SCOPED_TESTVALUE_SET( @@ -3729,8 +3813,8 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { compressionPool->maybeReserve(testParam.bytesToReserve); } - const auto fakeAllocationSize = arbitrator->stats().maxCapacityBytes - - op->pool()->parent()->usedBytes(); + const auto fakeAllocationSize = + kQueryMemoryCapacity - op->pool()->parent()->usedBytes(); if (testParam.writerFlushThreshold == 0) { auto* buffer = op->pool()->allocate(fakeAllocationSize); op->pool()->free(buffer, fakeAllocationSize); @@ -3768,14 +3852,17 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { ASSERT_EQ( arbitrator->stats().numFailures, - testParam.writerFlushThreshold == 0 ? 0 : 1); - // We don't trigger reclaim on a writer if it doesn't meet the writer flush - // threshold. - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0); + numPrevArbitrationFailures + + (testParam.writerFlushThreshold == 0 ? 0 : 1)); + // We don't trigger reclaim on a writer if it doesn't meet the writer + // flush threshold. + ASSERT_EQ( + arbitrator->stats().numNonReclaimableAttempts, + numPrevNonReclaimableAttempts); ASSERT_GE(arbitrator->stats().numReclaimedBytes, testParam.bytesToReserve); waitForAllTasksToBeDeleted(3'000'000); queryCtx.reset(); - ASSERT_EQ(arbitrator->stats().numShrinks, 1); + ASSERT_EQ(arbitrator->stats().numShrinks, numPrevShrinks + 1); } } @@ -3798,11 +3885,15 @@ DEBUG_ONLY_TEST_F( createDuckDbTable(vectors); - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); + auto queryPool = memory::memoryManager()->addRootPool( + "reclaimFromNonReclaimableTableWriter", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + const int numPrevArbitrationFailures = arbitrator->stats().numFailures; + const int numPrevNonReclaimableAttempts = + arbitrator->stats().numNonReclaimableAttempts; + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); std::atomic injectFakeAllocationOnce{true}; SCOPED_TESTVALUE_SET( @@ -3814,7 +3905,7 @@ DEBUG_ONLY_TEST_F( auto& pool = writer->getContext().getMemoryPool( dwrf::MemoryUsageCategory::GENERAL); const auto fakeAllocationSize = - arbitrator->stats().maxCapacityBytes - pool.reservedBytes(); + kQueryMemoryCapacity - pool.reservedBytes(); VELOX_ASSERT_THROW( pool.allocate(fakeAllocationSize), "Exceeded memory pool"); }))); @@ -3853,8 +3944,10 @@ DEBUG_ONLY_TEST_F( .plan(std::move(writerPlan)) .assertResults(fmt::format("SELECT {}", numRows)); - ASSERT_EQ(arbitrator->stats().numFailures, 1); - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 1); + ASSERT_EQ(arbitrator->stats().numFailures, numPrevArbitrationFailures + 1); + ASSERT_EQ( + arbitrator->stats().numNonReclaimableAttempts, + numPrevNonReclaimableAttempts + 1); waitForAllTasksToBeDeleted(); } @@ -3876,11 +3969,16 @@ DEBUG_ONLY_TEST_F( } createDuckDbTable(vectors); - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); + auto queryPool = memory::memoryManager()->addRootPool( + "arbitrationFromTableWriterWithNoMoreInput", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + const int numPrevArbitrationFailures = arbitrator->stats().numFailures; + const int numPrevNonReclaimableAttempts = + arbitrator->stats().numNonReclaimableAttempts; + const int numPrevReclaimedBytes = arbitrator->stats().numReclaimedBytes; + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); std::atomic writerNoMoreInput{false}; SCOPED_TESTVALUE_SET( @@ -3905,8 +4003,8 @@ DEBUG_ONLY_TEST_F( if (!injectGetOutputOnce.exchange(false)) { return; } - const auto fakeAllocationSize = arbitrator->stats().maxCapacityBytes - - op->pool()->parent()->reservedBytes(); + const auto fakeAllocationSize = + kQueryMemoryCapacity - op->pool()->parent()->reservedBytes(); auto* buffer = op->pool()->allocate(fakeAllocationSize); op->pool()->free(buffer, fakeAllocationSize); }))); @@ -3944,9 +4042,11 @@ DEBUG_ONLY_TEST_F( .plan(std::move(writerPlan)) .assertResults(fmt::format("SELECT {}", numRows)); - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0); - ASSERT_EQ(arbitrator->stats().numFailures, 0); - ASSERT_GT(arbitrator->stats().numReclaimedBytes, 0); + ASSERT_EQ( + arbitrator->stats().numNonReclaimableAttempts, + numPrevArbitrationFailures); + ASSERT_EQ(arbitrator->stats().numFailures, numPrevNonReclaimableAttempts); + ASSERT_GT(arbitrator->stats().numReclaimedBytes, numPrevReclaimedBytes); waitForAllTasksToBeDeleted(); } @@ -3972,11 +4072,15 @@ DEBUG_ONLY_TEST_F( createDuckDbTable(vectors); - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); + auto queryPool = memory::memoryManager()->addRootPool( + "reclaimFromNonReclaimableSortTableWriter", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + const int numPrevArbitrationFailures = arbitrator->stats().numFailures; + const int numPrevNonReclaimableAttempts = + arbitrator->stats().numNonReclaimableAttempts; + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); std::atomic injectFakeAllocationOnce{true}; SCOPED_TESTVALUE_SET( @@ -3993,8 +4097,8 @@ DEBUG_ONLY_TEST_F( if (!injectFakeAllocationOnce.exchange(false)) { return; } - const auto fakeAllocationSize = arbitrator->stats().maxCapacityBytes - - pool->parent()->reservedBytes(); + const auto fakeAllocationSize = + kQueryMemoryCapacity - pool->parent()->reservedBytes(); VELOX_ASSERT_THROW( pool->allocate(fakeAllocationSize), "Exceeded memory pool"); }))); @@ -4042,8 +4146,10 @@ DEBUG_ONLY_TEST_F( .plan(std::move(writerPlan)) .assertResults(fmt::format("SELECT {}", numRows)); - ASSERT_EQ(arbitrator->stats().numFailures, 1); - ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 1); + ASSERT_EQ(arbitrator->stats().numFailures, numPrevArbitrationFailures + 1); + ASSERT_EQ( + arbitrator->stats().numNonReclaimableAttempts, + numPrevNonReclaimableAttempts + 1); const auto updatedSpillStats = common::globalSpillStats(); ASSERT_EQ(updatedSpillStats, spillStats); waitForAllTasksToBeDeleted(); @@ -4065,13 +4171,14 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableFileWriteError) { createDuckDbTable(vectors); - auto memoryManager = - createMemoryManager(memoryCapacity, kMemoryPoolInitCapacity); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), memoryCapacity); - ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); - std::atomic injectWriterErrorOnce{true}; + auto queryPool = memory::memoryManager()->addRootPool( + "tableFileWriteError", kQueryMemoryCapacity); + auto* arbitrator = memory::memoryManager()->arbitrator(); + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); + + std::atomic_bool injectWriterErrorOnce{true}; SCOPED_TESTVALUE_SET( "facebook::velox::dwrf::Writer::write", std::function(([&](dwrf::Writer* writer) { @@ -4104,8 +4211,8 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableFileWriteError) { // Set 0 file writer flush threshold to always reclaim memory from // file writer. .config(core::QueryConfig::kWriterFlushThresholdBytes, 0) - // Set stripe size to extreme large to avoid writer internal triggered - // flush. + // Set stripe size to extreme large to avoid writer internal + // triggered flush. .connectorSessionProperty( kHiveConnectorId, connector::hive::HiveConfig::kOrcWriterMaxStripeSizeSession, @@ -4134,20 +4241,19 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteSpillUseMoreMemory) { vectors.push_back(fuzzer.fuzzInputRow(rowType_)); } - auto memoryManager = - createMemoryManager(memoryCapacity, kMemoryPoolInitCapacity); - auto arbitrator = memoryManager->arbitrator(); + auto queryPool = memory::memoryManager()->addRootPool( + "tableWriteSpillUseMoreMemory", kQueryMemoryCapacity / 4); + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity / 4); - std::shared_ptr queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), memoryCapacity / 8); - std::shared_ptr fakeQueryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), memoryCapacity); - auto fakePool = fakeQueryCtx->pool()->addLeafChild( - "fakePool", true, FakeMemoryReclaimer::create()); + auto fakeLeafPool = queryCtx->pool()->addLeafChild( + "fakeLeaf", true, FakeMemoryReclaimer::create()); + const int fakeAllocationSize = kQueryMemoryCapacity * 3 / 16; TestAllocation injectedFakeAllocation{ - fakePool.get(), - fakePool->allocate(memoryCapacity * 3 / 4), - memoryCapacity * 3 / 4}; + fakeLeafPool.get(), + fakeLeafPool->allocate(fakeAllocationSize), + fakeAllocationSize}; void* allocatedBuffer; TestAllocation injectedWriterAllocation; @@ -4159,21 +4265,21 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteSpillUseMoreMemory) { auto& pool = writer->getContext().getMemoryPool( dwrf::MemoryUsageCategory::GENERAL); injectedWriterAllocation.pool = &pool; - injectedWriterAllocation.size = memoryCapacity / 8; + injectedWriterAllocation.size = kQueryMemoryCapacity / 8; injectedWriterAllocation.buffer = pool.allocate(injectedWriterAllocation.size); }))); - // Free the extra fake memory allocations to make memory pool state consistent - // at the end of test. - std::atomic clearAllocationOnce{true}; + // Free the extra fake memory allocations to make memory pool state + // consistent at the end of test. + std::atomic_bool clearAllocationOnce{true}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Task::setError", std::function(([&](Task* task) { if (!clearAllocationOnce.exchange(false)) { return; } - ASSERT_EQ(injectedWriterAllocation.size, memoryCapacity / 8); + ASSERT_EQ(injectedWriterAllocation.size, kQueryMemoryCapacity / 8); injectedWriterAllocation.free(); }))); @@ -4190,10 +4296,11 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteSpillUseMoreMemory) { .spillDirectory(spillDirectory->getPath()) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kWriterSpillEnabled, true) - // Set 0 file writer flush threshold to always trigger flush in test. + // Set 0 file writer flush threshold to always trigger flush in + // test. .config(core::QueryConfig::kWriterFlushThresholdBytes, 0) - // Set stripe size to extreme large to avoid writer internal triggered - // flush. + // Set stripe size to extreme large to avoid writer internal + // triggered flush. .connectorSessionProperty( kHiveConnectorId, connector::hive::HiveConfig::kOrcWriterMaxStripeSizeSession, @@ -4222,16 +4329,23 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteReclaimOnClose) { numRows += vectors.back()->size(); } - auto memoryManager = createMemoryManager(); - auto arbitrator = memoryManager->arbitrator(); - auto queryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - auto fakeQueryCtx = - newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); - auto fakePool = fakeQueryCtx->pool()->addLeafChild( - "fakePool", true, FakeMemoryReclaimer::create()); + auto* arbitrator = memory::memoryManager()->arbitrator(); + auto queryPool = memory::memoryManager()->addRootPool( + "tableWriteSpillUseMoreMemory", kQueryMemoryCapacity); + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); - std::atomic writerNoMoreInput{false}; + auto fakeQueryPool = + memory::memoryManager()->addRootPool("fake", kQueryMemoryCapacity); + auto fakeQueryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(fakeQueryPool)); + ASSERT_EQ(fakeQueryCtx->pool()->capacity(), kQueryMemoryCapacity); + + auto fakeLeafPool = fakeQueryCtx->pool()->addLeafChild( + "fakeLeaf", true, FakeMemoryReclaimer::create()); + + std::atomic_bool writerNoMoreInput{false}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::noMoreInput", std::function(([&](Operator* op) { @@ -4252,14 +4366,13 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, tableWriteReclaimOnClose) { if (!maybeReserveInjectOnce.exchange(false)) { return; } - // The injection memory allocation to cause maybeReserve on writer close - // to trigger memory arbitration. The latter tries to reclaim memory - // from this file writer. - const size_t injectAllocationSize = - pool->freeBytes() + arbitrator->stats().freeCapacityBytes; + // The injection memory allocation to cause maybeReserve on writer + // close to trigger memory arbitration. The latter tries to reclaim + // memory from this file writer. + const size_t injectAllocationSize = kQueryMemoryCapacity; fakeAllocation = TestAllocation{ - .pool = fakePool.get(), - .buffer = fakePool->allocate(injectAllocationSize), + .pool = fakeLeafPool.get(), + .buffer = fakeLeafPool->allocate(injectAllocationSize), .size = injectAllocationSize}; })); @@ -4312,8 +4425,11 @@ DEBUG_ONLY_TEST_F( const auto expectedResult = runWriteTask(vectors, nullptr, false, 1, pool(), kHiveConnectorId, false) .data; - auto queryCtx = - newQueryCtx(memory::memoryManager(), executor_.get(), memoryCapacity); + auto queryPool = memory::memoryManager()->addRootPool( + "tableWriteSpillUseMoreMemory", kQueryMemoryCapacity); + auto queryCtx = core::QueryCtx::create( + executor_.get(), QueryConfig{{}}, {}, nullptr, std::move(queryPool)); + ASSERT_EQ(queryCtx->pool()->capacity(), kQueryMemoryCapacity); std::atomic_bool writerCloseWaitFlag{true}; folly::EventCount writerCloseWait; diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 27baada36c32..ae0a40b26019 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -434,7 +434,7 @@ PlanBuilder& PlanBuilder::tableWrite( connector::hive::LocationHandle::TableType::kNew, outputFileName); std::shared_ptr bucketProperty; - if (!partitionBy.empty() && bucketCount != 0) { + if (bucketCount != 0) { bucketProperty = buildHiveBucketProperty(rowType, bucketCount, bucketedBy, sortBy); } @@ -471,7 +471,8 @@ PlanBuilder& PlanBuilder::tableWrite( rowType->names(), aggregationNode, insertHandle, - false, + !partitionBy.empty(), + bucketProperty != nullptr, TableWriteTraits::outputType(aggregationNode), connector::CommitStrategy::kNoCommit, planNode_);