From 30f2776f6ec171076a1047df68a145b55c94db6e Mon Sep 17 00:00:00 2001 From: Jimmy Lu Date: Mon, 3 Jun 2024 15:28:02 -0700 Subject: [PATCH] Move Velox batch reader and writer to OSS nimble repo Summary: Just move files around, no functionality change. Differential Revision: D58109796 --- dwio/nimble/velox/NimbleConfig.cpp | 162 +++++++++ dwio/nimble/velox/NimbleConfig.h | 51 +++ dwio/nimble/velox/NimbleReader.cpp | 178 ++++++++++ dwio/nimble/velox/NimbleReader.h | 36 ++ dwio/nimble/velox/NimbleWriter.cpp | 77 +++++ dwio/nimble/velox/NimbleWriter.h | 24 ++ .../velox/NimbleWriterOptionBuilder.cpp | 133 ++++++++ dwio/nimble/velox/NimbleWriterOptionBuilder.h | 106 ++++++ dwio/nimble/velox/VeloxUtil.cpp | 165 +++++++++ dwio/nimble/velox/VeloxUtil.h | 33 ++ dwio/nimble/velox/tests/NimbleReaderTest.cpp | 317 ++++++++++++++++++ dwio/nimble/velox/tests/NimbleWriterTest.cpp | 99 ++++++ 12 files changed, 1381 insertions(+) create mode 100644 dwio/nimble/velox/NimbleConfig.cpp create mode 100644 dwio/nimble/velox/NimbleConfig.h create mode 100644 dwio/nimble/velox/NimbleReader.cpp create mode 100644 dwio/nimble/velox/NimbleReader.h create mode 100644 dwio/nimble/velox/NimbleWriter.cpp create mode 100644 dwio/nimble/velox/NimbleWriter.h create mode 100644 dwio/nimble/velox/NimbleWriterOptionBuilder.cpp create mode 100644 dwio/nimble/velox/NimbleWriterOptionBuilder.h create mode 100644 dwio/nimble/velox/VeloxUtil.cpp create mode 100644 dwio/nimble/velox/VeloxUtil.h create mode 100644 dwio/nimble/velox/tests/NimbleReaderTest.cpp create mode 100644 dwio/nimble/velox/tests/NimbleWriterTest.cpp diff --git a/dwio/nimble/velox/NimbleConfig.cpp b/dwio/nimble/velox/NimbleConfig.cpp new file mode 100644 index 0000000..66aa9a0 --- /dev/null +++ b/dwio/nimble/velox/NimbleConfig.cpp @@ -0,0 +1,162 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "dwio/nimble/velox/NimbleConfig.h" + +#include "dwio/nimble/encodings/EncodingSelectionPolicy.h" + +#include +#include + +DEFINE_string( + nimble_selection_read_factors, + "Constant=1.0;Trivial=0.5;FixedBitWidth=0.9;MainlyConstant=1.0;SparseBool=1.0;Dictionary=1.0;RLE=1.0;Varint=1.0", + "Encoding selection read factors, in the format: " + "=;=;..."); + +DEFINE_double( + nimble_selection_compression_accept_ratio, + 0.97, + "Encoding selection compression accept ratio."); + +DEFINE_bool( + nimble_zstrong_enable_variable_bit_width_compressor, + false, + "Enable zstrong variable bit width compressor at write time. Transparent at read time."); + +DEFINE_string( + nimble_writer_input_buffer_default_growth_config, + "{\"32\":4.0,\"512\":1.414,\"4096\":1.189}", + "Default growth config for writer input buffers, each entry in the format of {range_start,growth_factor}"); + +namespace facebook::nimble { +namespace { +template +std::vector parseVector(const std::string& str) { + std::vector result; + if (!str.empty()) { + std::vector pieces; + folly::split(',', str, pieces, true); + for (auto& p : pieces) { + const auto& trimmedCol = folly::trimWhitespace(p); + if (!trimmedCol.empty()) { + result.push_back(folly::to(trimmedCol)); + } + } + } + return result; +} + +std::map parseGrowthConfigMap(const std::string& str) { + std::map ret; + NIMBLE_CHECK(!str.empty(), "Can't supply an empty growth config."); + folly::dynamic json = folly::parseJson(str); + for (const auto& pair : json.items()) { + auto [_, inserted] = ret.emplace( + folly::to(pair.first.asString()), pair.second.asDouble()); + NIMBLE_CHECK( + inserted, fmt::format("Duplicate key: {}.", pair.first.asString())); + } + return ret; +} +} // namespace + +/* static */ Config::Entry Config::FLATTEN_MAP("orc.flatten.map", false); + +/* static */ Config::Entry> Config::MAP_FLAT_COLS( + "orc.map.flat.cols", + {}, + [](const std::vector& val) { return folly::join(",", val); }, + [](const std::string& /* key */, const std::string& val) { + return parseVector(val); + }); + +/* static */ Config::Entry> + Config::DEDUPLICATED_COLS( + "alpha.map.deduplicated.cols", + {}, + [](const std::vector& val) { return folly::join(",", val); }, + [](const std::string& /* key */, const std::string& val) { + return parseVector(val); + }); + +/* static */ Config::Entry> + Config::BATCH_REUSE_COLS( + "alpha.dictionaryarray.cols", + {}, + [](const std::vector& val) { return folly::join(",", val); }, + [](const std::string& /* key */, const std::string& val) { + return parseVector(val); + }); + +/* static */ Config::Entry Config::RAW_STRIPE_SIZE( + "alpha.raw.stripe.size", + 512L * 1024L * 1024L); + +/* static */ Config::Entry>> + Config::MANUAL_ENCODING_SELECTION_READ_FACTORS( + "alpha.encodingselection.read.factors", + ManualEncodingSelectionPolicyFactory::parseReadFactors( + FLAGS_nimble_selection_read_factors), + [](const std::vector>& val) { + std::vector encodingFactorStrings; + std::transform( + val.cbegin(), + val.cend(), + std::back_inserter(encodingFactorStrings), + [](const auto& readFactor) { + return fmt::format( + "{}={}", toString(readFactor.first), readFactor.second); + }); + return folly::join(";", encodingFactorStrings); + }, + [](const std::string& /* key */, const std::string& val) { + return ManualEncodingSelectionPolicyFactory::parseReadFactors(val); + }); + +/* static */ Config::Entry + Config::ENCODING_SELECTION_COMPRESSION_ACCEPT_RATIO( + "alpha.encodingselection.compression.accept.ratio", + FLAGS_nimble_selection_compression_accept_ratio); + +/* static */ Config::Entry Config::ZSTRONG_COMPRESSION_LEVEL( + "alpha.zstrong.compression.level", + 4); + +/* static */ Config::Entry Config::ZSTRONG_DECOMPRESSION_LEVEL( + "alpha.zstrong.decompression.level", + 2); + +/* static */ Config::Entry + Config::ENABLE_ZSTRONG_VARIABLE_BITWIDTH_COMPRESSOR( + "alpha.zstrong.enable.variable.bit.width.compressor", + FLAGS_nimble_zstrong_enable_variable_bit_width_compressor); + +/* static */ Config::Entry> + Config::INPUT_BUFFER_DEFAULT_GROWTH_CONFIGS( + "alpha.writer.input.buffer.default.growth.configs", + parseGrowthConfigMap( + FLAGS_nimble_writer_input_buffer_default_growth_config), + [](const std::map& val) { + folly::dynamic obj = folly::dynamic::object; + for (const auto& [rangeStart, growthFactor] : val) { + obj[folly::to(rangeStart)] = growthFactor; + } + return folly::toJson(obj); + }, + [](const std::string& /* key */, const std::string& val) { + return parseGrowthConfigMap(val); + }); +} // namespace facebook::nimble diff --git a/dwio/nimble/velox/NimbleConfig.h b/dwio/nimble/velox/NimbleConfig.h new file mode 100644 index 0000000..4381d6a --- /dev/null +++ b/dwio/nimble/velox/NimbleConfig.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "dwio/nimble/common/Types.h" + +#include "velox/common/config/Config.h" + +namespace facebook::nimble { + +class Config : public velox::common::ConfigBase { + public: + template + using Entry = velox::common::ConfigBase::Entry; + + static Entry FLATTEN_MAP; + static Entry> MAP_FLAT_COLS; + static Entry> BATCH_REUSE_COLS; + static Entry> DEDUPLICATED_COLS; + static Entry RAW_STRIPE_SIZE; + static Entry>> + MANUAL_ENCODING_SELECTION_READ_FACTORS; + static Entry ENCODING_SELECTION_COMPRESSION_ACCEPT_RATIO; + static Entry ZSTRONG_COMPRESSION_LEVEL; + static Entry ZSTRONG_DECOMPRESSION_LEVEL; + static Entry ENABLE_ZSTRONG_VARIABLE_BITWIDTH_COMPRESSOR; + static Entry> + INPUT_BUFFER_DEFAULT_GROWTH_CONFIGS; + + static std::shared_ptr fromMap( + const std::map& map) { + auto ret = std::make_shared(); + ret->configs_.insert(map.cbegin(), map.cend()); + return ret; + } +}; + +} // namespace facebook::nimble diff --git a/dwio/nimble/velox/NimbleReader.cpp b/dwio/nimble/velox/NimbleReader.cpp new file mode 100644 index 0000000..0339686 --- /dev/null +++ b/dwio/nimble/velox/NimbleReader.cpp @@ -0,0 +1,178 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "dwio/nimble/velox/NimbleReader.h" + +#include "dwio/nimble/tablet/Constants.h" +#include "dwio/nimble/velox/VeloxUtil.h" + +namespace facebook::velox::nimble { + +namespace { + +const std::vector kPreloadOptionalSections = { + std::string(facebook::nimble::kSchemaSection)}; + +class NimbleRowReader : public dwio::common::RowReader { + public: + NimbleRowReader( + std::unique_ptr reader, + const std::shared_ptr& scanSpec) + : reader_(std::move(reader)), scanSpec_(scanSpec) { + reader_->loadStripeIfAny(); + } + + int64_t nextRowNumber() override { + VELOX_NYI(); + } + + int64_t nextReadSize(uint64_t /*size*/) override { + VELOX_NYI(); + } + + uint64_t next( + uint64_t size, + VectorPtr& result, + const dwio::common::Mutation* mutation) override { + TypePtr resultType; + VectorPtr rawResult; + if (result) { + resultType = result->type(); + rawResult = std::move(rawVectorForBatchReader(*result)); + result.reset(); + } + if (!reader_->next(size, rawResult)) { + if (rawResult) { + result = BaseVector::create(resultType, 0, &reader_->memoryPool()); + rawVectorForBatchReader(*result) = std::move(rawResult); + } + return 0; + } + auto scanned = rawResult->size(); + result = projectColumns(rawResult, *scanSpec_, mutation); + rawVectorForBatchReader(*result) = std::move(rawResult); + return scanned; + } + + void updateRuntimeStats( + dwio::common::RuntimeStatistics& /*stats*/) const override { + // No-op for non-selective reader. + } + + void resetFilterCaches() override { + // No-op for non-selective reader. + } + + std::optional estimatedRowSize() const override { + return std::optional(reader_->estimatedRowSize()); + } + + private: + std::unique_ptr reader_; + std::shared_ptr scanSpec_; + + static VectorPtr& rawVectorForBatchReader(BaseVector& vector) { + auto* rowVector = vector.as(); + VELOX_CHECK_NOT_NULL(rowVector); + return rowVector->rawVectorForBatchReader(); + } +}; + +class NimbleReader : public dwio::common::Reader { + public: + NimbleReader( + const dwio::common::ReaderOptions& options, + const std::shared_ptr& readFile) + : options_(options), + readFile_(readFile), + tabletReader_(std::make_shared( + options.getMemoryPool(), + readFile_.get(), + kPreloadOptionalSections)) { + if (!options_.getFileSchema()) { + facebook::nimble::VeloxReader tmpReader( + options.getMemoryPool(), tabletReader_); + options_.setFileSchema(tmpReader.type()); + } + } + + std::optional numberOfRows() const override { + return tabletReader_->tabletRowCount(); + } + + std::unique_ptr columnStatistics( + uint32_t /*index*/) const override { + // TODO + return nullptr; + } + + const RowTypePtr& rowType() const override { + return options_.getFileSchema(); + } + + const std::shared_ptr& typeWithId() + const override { + if (!typeWithId_) { + typeWithId_ = dwio::common::TypeWithId::create(rowType()); + } + return typeWithId_; + } + + std::unique_ptr createRowReader( + const dwio::common::RowReaderOptions& options) const override { + facebook::nimble::VeloxReadParams params; + params.fileRangeStartOffset = options.getOffset(); + params.fileRangeEndOffset = options.getLimit(); + params.decodingExecutor = options.getDecodingExecutor(); + auto selector = options.getSelector(); + if (!selector) { + selector = std::make_shared(rowType()); + } + facebook::dwio::api::populateFeatureSelector( + *selector, options.getMapColumnIdAsStruct(), params); + auto reader = std::make_unique( + options_.getMemoryPool(), + tabletReader_, + std::move(selector), + std::move(params)); + return std::make_unique( + std::move(reader), options.getScanSpec()); + } + + private: + dwio::common::ReaderOptions options_; + std::shared_ptr readFile_; + std::shared_ptr tabletReader_; + mutable std::shared_ptr typeWithId_; +}; + +} // namespace + +std::unique_ptr NimbleReaderFactory::createReader( + std::unique_ptr input, + const dwio::common::ReaderOptions& options) { + return std::make_unique(options, input->getReadFile()); +} + +void registerNimbleReaderFactory() { + dwio::common::registerReaderFactory(std::make_shared()); +} + +void unregisterNimbleReaderFactory() { + dwio::common::unregisterReaderFactory(dwio::common::FileFormat::NIMBLE); +} + +} // namespace facebook::velox::nimble diff --git a/dwio/nimble/velox/NimbleReader.h b/dwio/nimble/velox/NimbleReader.h new file mode 100644 index 0000000..3c9e83a --- /dev/null +++ b/dwio/nimble/velox/NimbleReader.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/dwio/common/ReaderFactory.h" + +namespace facebook::velox::nimble { + +class NimbleReaderFactory : public dwio::common::ReaderFactory { + public: + NimbleReaderFactory() : ReaderFactory(dwio::common::FileFormat::NIMBLE) {} + + std::unique_ptr createReader( + std::unique_ptr, + const dwio::common::ReaderOptions&) override; +}; + +void registerNimbleReaderFactory(); + +void unregisterNimbleReaderFactory(); + +} // namespace facebook::velox::nimble diff --git a/dwio/nimble/velox/NimbleWriter.cpp b/dwio/nimble/velox/NimbleWriter.cpp new file mode 100644 index 0000000..411f355 --- /dev/null +++ b/dwio/nimble/velox/NimbleWriter.cpp @@ -0,0 +1,77 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "dwio/nimble/velox/NimbleWriter.h" + +#include "dwio/nimble/velox/NimbleWriterOptionBuilder.h" +#include "dwio/nimble/velox/VeloxWriter.h" + +#include "velox/exec/MemoryReclaimer.h" + +namespace facebook::velox::nimble { + +namespace { + +class NimbleWriterAdapter : public dwio::common::Writer { + public: + NimbleWriterAdapter( + memory::MemoryPool& memoryPool, + const velox::TypePtr& schema, + std::unique_ptr file, + facebook::nimble::VeloxWriterOptions options) + : writer_(memoryPool, schema, std::move(file), std::move(options)) {} + + void write(const VectorPtr& data) override { + writer_.write(data); + } + + void flush() override { + writer_.flush(); + } + + void close() override { + writer_.close(); + } + + void abort() override {} + + private: + facebook::nimble::VeloxWriter writer_; +}; + +} // namespace + +std::unique_ptr NimbleWriterFactory::createWriter( + std::unique_ptr sink, + const dwio::common::WriterOptions& options) { + // TODO: Pass the sink directly to writer. + auto* sinkWrapper = dynamic_cast(sink.get()); + VELOX_CHECK_NOT_NULL( + sinkWrapper, "Expected WriteFileSink, got {}", typeid(*sink).name()); + + return std::make_unique( + *options.memoryPool, + options.schema, + sinkWrapper->toWriteFile(), + facebook::dwio::api::NimbleWriterOptionBuilder() + .withSerdeParams(asRowType(options.schema), options.serdeParameters) + .withReclaimerFactory( + []() { return exec::MemoryReclaimer::create(); }) + .withSpillConfig(options.spillConfig) + .build()); +} + +} // namespace facebook::velox::nimble diff --git a/dwio/nimble/velox/NimbleWriter.h b/dwio/nimble/velox/NimbleWriter.h new file mode 100644 index 0000000..ad25dde --- /dev/null +++ b/dwio/nimble/velox/NimbleWriter.h @@ -0,0 +1,24 @@ +#pragma once + +#include "velox/dwio/common/WriterFactory.h" + +namespace facebook::velox::nimble { + +class NimbleWriterFactory : public dwio::common::WriterFactory { + public: + NimbleWriterFactory() : WriterFactory(dwio::common::FileFormat::NIMBLE) {} + + std::unique_ptr createWriter( + std::unique_ptr sink, + const dwio::common::WriterOptions& options) override; +}; + +inline void registerNimbleWriterFactory() { + dwio::common::registerWriterFactory(std::make_shared()); +} + +inline void unregisterNimbleWriterFactory() { + dwio::common::unregisterWriterFactory(dwio::common::FileFormat::NIMBLE); +} + +} // namespace facebook::velox::nimble diff --git a/dwio/nimble/velox/NimbleWriterOptionBuilder.cpp b/dwio/nimble/velox/NimbleWriterOptionBuilder.cpp new file mode 100644 index 0000000..bcba8b0 --- /dev/null +++ b/dwio/nimble/velox/NimbleWriterOptionBuilder.cpp @@ -0,0 +1,133 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "dwio/nimble/velox/NimbleWriterOptionBuilder.h" + +#include "dwio/nimble/common/Exceptions.h" +#include "dwio/nimble/velox/NimbleConfig.h" + +#include + +#include +#include +#include + +namespace facebook::dwio::api { +NimbleWriterOptionBuilder& NimbleWriterOptionBuilder::withSerdeParams( + const std::shared_ptr& schema, + const std::map& serdeParams) { + auto config = nimble::Config::fromMap(serdeParams); + if (config->get(nimble::Config::FLATTEN_MAP)) { + auto flatMapCols = config->get(nimble::Config::MAP_FLAT_COLS); + options_.flatMapColumns.clear(); + options_.flatMapColumns.reserve(flatMapCols.size()); + for (auto col : flatMapCols) { + LOG(INFO) << fmt::format( + "adding flat map column {} with schema width {}", + col, + schema->children().size()); + options_.flatMapColumns.insert(schema->nameOf(col)); + } + } + withDefaultFlushPolicy(config->get(nimble::Config::RAW_STRIPE_SIZE)); + + auto batchReuseCols = config->get(nimble::Config::BATCH_REUSE_COLS); + options_.dictionaryArrayColumns.clear(); + options_.dictionaryArrayColumns.reserve(batchReuseCols.size()); + for (const auto& colIdx : batchReuseCols) { + options_.dictionaryArrayColumns.insert(schema->nameOf(colIdx)); + } + + auto deduplicatedMapCols = config->get(nimble::Config::DEDUPLICATED_COLS); + options_.deduplicatedMapColumns.clear(); + options_.deduplicatedMapColumns.reserve(deduplicatedMapCols.size()); + for (const auto& colIdx : deduplicatedMapCols) { + options_.deduplicatedMapColumns.insert(schema->nameOf(colIdx)); + } + + withDefaultInputGrowthPolicy( + config->get(nimble::Config::INPUT_BUFFER_DEFAULT_GROWTH_CONFIGS)); + + options_.compressionOptions.compressionAcceptRatio = + config->get(nimble::Config::ENCODING_SELECTION_COMPRESSION_ACCEPT_RATIO); + options_.compressionOptions.useVariableBitWidthCompressor = + config->get(nimble::Config::ENABLE_ZSTRONG_VARIABLE_BITWIDTH_COMPRESSOR); + options_.compressionOptions.internalCompressionLevel = + config->get(nimble::Config::ZSTRONG_COMPRESSION_LEVEL); + options_.compressionOptions.internalDecompressionLevel = + config->get(nimble::Config::ZSTRONG_DECOMPRESSION_LEVEL); + + return withDefaultEncodingSelectionPolicy( + config->get(nimble::Config::MANUAL_ENCODING_SELECTION_READ_FACTORS), + options_.compressionOptions); +} + +NimbleWriterOptionBuilder& +NimbleWriterOptionBuilder::withDefaultEncodingSelectionPolicy( + std::vector> readFactors, + nimble::CompressionOptions compressionOptions) { + options_.encodingSelectionPolicyFactory = + [encodingFactory = + nimble::ManualEncodingSelectionPolicyFactory{ + std::move(readFactors), std::move(compressionOptions)}]( + nimble::DataType dataType) + -> std::unique_ptr { + return encodingFactory.createPolicy(dataType); + }; + return *this; +} + +NimbleWriterOptionBuilder& +NimbleWriterOptionBuilder::withDefaultEncodingSelectionPolicy( + const std::string& readFactorConfig, + nimble::CompressionOptions compressionOptions) { + return withDefaultEncodingSelectionPolicy( + nimble::ManualEncodingSelectionPolicyFactory::parseReadFactors( + readFactorConfig), + std::move(compressionOptions)); +} + +NimbleWriterOptionBuilder& +NimbleWriterOptionBuilder::withDefaultInputGrowthPolicy( + std::map rangedConfigs) { + options_.inputGrowthPolicyFactory = [rangedConfigs = + std::move(rangedConfigs)]() mutable + -> std::unique_ptr { + return std::make_unique( + std::move(rangedConfigs)); + }; + return *this; +} + +NimbleWriterOptionBuilder& NimbleWriterOptionBuilder::withFlatMapColumns( + const std::vector& flatMapCols) { + options_.flatMapColumns.clear(); + for (const auto& col : flatMapCols) { + options_.flatMapColumns.insert(col); + } + return *this; +} + +NimbleWriterOptionBuilder& +NimbleWriterOptionBuilder::withDictionaryArrayColumns( + const std::vector& dictionaryArrayCols) { + options_.dictionaryArrayColumns.clear(); + for (const auto& col : dictionaryArrayCols) { + options_.dictionaryArrayColumns.insert(col); + } + return *this; +} +} // namespace facebook::dwio::api diff --git a/dwio/nimble/velox/NimbleWriterOptionBuilder.h b/dwio/nimble/velox/NimbleWriterOptionBuilder.h new file mode 100644 index 0000000..0037370 --- /dev/null +++ b/dwio/nimble/velox/NimbleWriterOptionBuilder.h @@ -0,0 +1,106 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "dwio/nimble/encodings/EncodingSelectionPolicy.h" +#include "dwio/nimble/velox/EncodingLayoutTree.h" +#include "dwio/nimble/velox/VeloxWriterOptions.h" + +#include "velox/common/memory/MemoryArbitrator.h" + +#include + +namespace facebook::dwio::api { +class NimbleWriterOptionBuilder { + public: + NimbleWriterOptionBuilder& withDefaultFlushPolicy(uint64_t rawStripeSize) { + options_.flushPolicyFactory = [rawStripeSize]() { + // The threshold of raw stripe size buffered before encoding stripes. + return std::make_unique(rawStripeSize); + }; + return *this; + } + + NimbleWriterOptionBuilder& withFlushPolicyFactory( + std::function()> + flushPolicyFactory) { + options_.flushPolicyFactory = flushPolicyFactory; + return *this; + } + + NimbleWriterOptionBuilder& withDefaultEncodingSelectionPolicy( + std::vector> readFactors, + nimble::CompressionOptions compressionOptions); + + NimbleWriterOptionBuilder& withDefaultEncodingSelectionPolicy( + const std::string& readFactorConfig, + nimble::CompressionOptions compressionOptions); + + NimbleWriterOptionBuilder& withDefaultInputGrowthPolicy( + std::map rangedConfigs); + + // TODO: This is the integration point for nimble config. + NimbleWriterOptionBuilder& withSerdeParams( + const std::shared_ptr& schema, + const std::map& serdeParams); + + NimbleWriterOptionBuilder& withFlatMapColumns( + const std::vector& flatMapCols); + + NimbleWriterOptionBuilder& withDictionaryArrayColumns( + const std::vector& dictionaryArrayCols); + + NimbleWriterOptionBuilder& withFeatureReordering( + std::vector>> featureReordering) { + options_.featureReordering.emplace(std::move(featureReordering)); + return *this; + } + + NimbleWriterOptionBuilder& withMetricsLogger( + const std::shared_ptr& metricsLogger) { + options_.metricsLogger = metricsLogger; + return *this; + } + + NimbleWriterOptionBuilder& withEncodingLayoutTree( + nimble::EncodingLayoutTree encodingLayoutTree) { + options_.encodingLayoutTree.emplace(std::move(encodingLayoutTree)); + return *this; + } + + NimbleWriterOptionBuilder& withReclaimerFactory( + std::function()> + reclaimerFactory) { + options_.reclaimerFactory = std::move(reclaimerFactory); + return *this; + } + + NimbleWriterOptionBuilder& withSpillConfig( + const velox::common::SpillConfig* spillConfig) { + options_.spillConfig = spillConfig; + return *this; + } + + nimble::VeloxWriterOptions build() { + return options_; + } + + private: + nimble::VeloxWriterOptions options_; +}; + +} // namespace facebook::dwio::api diff --git a/dwio/nimble/velox/VeloxUtil.cpp b/dwio/nimble/velox/VeloxUtil.cpp new file mode 100644 index 0000000..0e651f4 --- /dev/null +++ b/dwio/nimble/velox/VeloxUtil.cpp @@ -0,0 +1,165 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "dwio/nimble/velox/VeloxUtil.h" + +#include "velox/dwio/common/exception/Exception.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/FlatVector.h" + +namespace facebook::dwio::api { + +namespace { +template +void resetVectorImpl(const velox::VectorPtr&) {} + +void resetStringBuffer(const velox::VectorPtr& vector) { + auto flat = vector->asFlatVector(); + DWIO_ENSURE(flat != nullptr); + flat->clearStringBuffers(); +} + +template <> +void resetVectorImpl(const velox::VectorPtr& vector) { + resetStringBuffer(vector); +} + +template <> +void resetVectorImpl( + const velox::VectorPtr& vector) { + resetStringBuffer(vector); +} + +template <> +void resetVectorImpl(const velox::VectorPtr& vector) { + velox::ArrayVector* arrayVector = vector->as(); + DWIO_ENSURE(arrayVector != nullptr); + resetVector(arrayVector->elements()); +} + +template <> +void resetVectorImpl(const velox::VectorPtr& vector) { + velox::MapVector* mapVector = vector->as(); + DWIO_ENSURE(mapVector != nullptr); + resetVector(mapVector->mapKeys()); + resetVector(mapVector->mapValues()); +} + +template <> +void resetVectorImpl(const velox::VectorPtr& vector) { + velox::RowVector* rowVector = vector->as(); + DWIO_ENSURE(rowVector != nullptr); + std::for_each( + rowVector->children().begin(), rowVector->children().end(), resetVector); +} +} // namespace + +void resetVector(const velox::VectorPtr& vector) { + if (vector == nullptr) { + return; + } + vector->resetNulls(); + vector->resize(0); + VELOX_DYNAMIC_TYPE_DISPATCH(resetVectorImpl, vector->typeKind(), vector); +} + +namespace { + +void populateFeatureSelectorForFlatMapAsStruct( + const velox::dwio::common::ColumnSelector& selector, + const std::unordered_map>& asStructMap, + nimble::VeloxReadParams& params) { + // asStructMap is using TypeWithId identifiers as the map key. + // Nimble understands column names. Therefore, we perform id to name + // conversion here. + const auto typeWithId = + velox::dwio::common::TypeWithId::create(selector.getSchema()); + const auto& names = typeWithId->type()->as().names(); + std::unordered_map lookup; + for (auto i = 0; i < names.size(); ++i) { + lookup[typeWithId->childAt(i)->id()] = names[i]; + } + for (const auto& pair : asStructMap) { + auto it = lookup.find(pair.first); + DWIO_ENSURE( + it != lookup.end(), "Unable to resolve column name from node id."); + params.readFlatMapFieldAsStruct.emplace(it->second); + auto& featureSelector = params.flatMapFeatureSelector[it->second]; + featureSelector.mode = nimble::SelectionMode::Include; + featureSelector.features.reserve(pair.second.size()); + for (const auto& feature : pair.second) { + featureSelector.features.push_back(feature); + } + } +} + +void populateOtherFeatureSelector( + const velox::dwio::common::ColumnSelector& selector, + nimble::VeloxReadParams& params) { + if (selector.getProjection().empty()) { + return; + } + constexpr char rejectPrefix = '!'; + for (const auto& filterNode : selector.getProjection()) { + if (filterNode.expression.empty() || + params.readFlatMapFieldAsStruct.count(filterNode.name) != 0) { + continue; + } + auto expressionJson = folly::parseJson(filterNode.expression); + if (expressionJson.empty()) { + continue; + } + auto& featureSelector = params.flatMapFeatureSelector[filterNode.name]; + featureSelector.features.reserve(expressionJson.size()); + auto exp = expressionJson[0].asString(); + DWIO_ENSURE( + !exp.empty(), + fmt::format("First feature in {} is empty", filterNode.name)); + bool isInclude = exp.front() != rejectPrefix; + if (isInclude) { + featureSelector.mode = nimble::SelectionMode::Include; + } else { + featureSelector.mode = nimble::SelectionMode::Exclude; + } + for (auto itor = expressionJson.begin(); itor != expressionJson.end(); + ++itor) { + auto feature = itor->asString(); + DWIO_ENSURE( + !feature.empty() && (feature.front() != rejectPrefix) == isInclude, + fmt::format( + "Empty or mixed included/excluded feature '{}' found " + "for flatmap: {}.", + feature, + filterNode.name)); + featureSelector.features.emplace_back( + isInclude ? feature : feature.substr(1)); + } + } +} + +} // namespace + +void populateFeatureSelector( + const velox::dwio::common::ColumnSelector& selector, + const std::unordered_map>& asStructMap, + nimble::VeloxReadParams& params) { + if (!asStructMap.empty()) { + populateFeatureSelectorForFlatMapAsStruct(selector, asStructMap, params); + } + populateOtherFeatureSelector(selector, params); +} + +} // namespace facebook::dwio::api diff --git a/dwio/nimble/velox/VeloxUtil.h b/dwio/nimble/velox/VeloxUtil.h new file mode 100644 index 0000000..87c7193 --- /dev/null +++ b/dwio/nimble/velox/VeloxUtil.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "dwio/nimble/velox/VeloxReader.h" +#include "velox/vector/BaseVector.h" + +namespace facebook::dwio::api { + +// Make the vector as a clean reusable state, by resetting children size +// recursively for complex types. +void resetVector(const velox::VectorPtr& vector); + +void populateFeatureSelector( + const velox::dwio::common::ColumnSelector&, + const std::unordered_map>& asStructMap, + nimble::VeloxReadParams& outParams); + +} // namespace facebook::dwio::api diff --git a/dwio/nimble/velox/tests/NimbleReaderTest.cpp b/dwio/nimble/velox/tests/NimbleReaderTest.cpp new file mode 100644 index 0000000..252bd90 --- /dev/null +++ b/dwio/nimble/velox/tests/NimbleReaderTest.cpp @@ -0,0 +1,317 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "dwio/nimble/velox/NimbleReader.h" +#include "dwio/nimble/common/tests/NimbleFileWriter.h" +#include "dwio/nimble/velox/VeloxReader.h" + +#include "velox/dwio/common/tests/utils/BatchMaker.h" +#include "velox/vector/tests/utils/VectorMaker.h" + +#include + +namespace facebook::velox::nimble { +namespace { + +class NimbleReaderTest : public testing::Test { + protected: + void SetUp() override { + registerNimbleReaderFactory(); + } + + void TearDown() override { + unregisterNimbleReaderFactory(); + } + + memory::MemoryPool& pool() { + return *pool_; + } + + memory::MemoryPool& rootPool() { + return *rootPool_; + } + + private: + std::shared_ptr rootPool_ = + memory::deprecatedDefaultMemoryManager().addRootPool(); + std::shared_ptr pool_ = rootPool_->addLeafChild("leaf"); +}; + +void setScanSpec(const Type& type, dwio::common::RowReaderOptions& options) { + auto spec = std::make_shared("root"); + spec->addAllChildFields(type); + options.setScanSpec(spec); +} + +TEST_F(NimbleReaderTest, basic) { + auto type = ROW({{"i32", INTEGER()}}); + const auto numRows = 100; + auto vec = test::BatchMaker::createBatch(type, numRows, pool()); + auto file = facebook::nimble::test::createNimbleFile(rootPool(), vec); + auto readFile = std::make_shared(file); + auto factory = + dwio::common::getReaderFactory(dwio::common::FileFormat::NIMBLE); + auto input = std::make_unique(readFile, pool()); + dwio::common::ReaderOptions options(&pool()); + auto reader = factory->createReader(std::move(input), options); + EXPECT_EQ(reader->numberOfRows(), numRows); + EXPECT_EQ(*reader->rowType(), *type); + + dwio::common::RowReaderOptions rowOptions; + setScanSpec(*type, rowOptions); + auto rowReader = reader->createRowReader(rowOptions); + EXPECT_TRUE(rowReader->estimatedRowSize().has_value()); + EXPECT_EQ(rowReader->estimatedRowSize().value(), 4); + + VectorPtr result; + ASSERT_EQ(rowReader->next(30, result), 30); + EXPECT_EQ(rowReader->estimatedRowSize().value(), 4); + for (int i = 0; i < 30; ++i) { + EXPECT_TRUE(result->equalValueAt(vec.get(), i, i)); + } + ASSERT_EQ(rowReader->next(71, result), 70); + EXPECT_EQ( + rowReader->estimatedRowSize().value(), + ::facebook::nimble::VeloxReader::kConservativeEstimatedRowSize); + for (int i = 0; i < 70; ++i) { + EXPECT_TRUE(result->equalValueAt(vec.get(), i, 30 + i)); + } +} + +TEST_F(NimbleReaderTest, columnSelector) { + auto type = + ROW({"int_col", "double_col", "string_col"}, + {INTEGER(), DOUBLE(), VARCHAR()}); + auto vec = test::BatchMaker::createBatch(type, 100, pool()); + auto file = facebook::nimble::test::createNimbleFile(rootPool(), vec); + auto readFile = std::make_shared(file); + auto factory = + dwio::common::getReaderFactory(dwio::common::FileFormat::NIMBLE); + auto input = std::make_unique(readFile, pool()); + dwio::common::ReaderOptions options(&pool()); + auto reader = factory->createReader(std::move(input), options); + EXPECT_EQ(reader->numberOfRows(), 100); + EXPECT_EQ(*reader->rowType(), *type); + + dwio::common::RowReaderOptions rowOptions; + auto spec = std::make_shared(""); + spec->addField("string_col", 0); + spec->addField("double_col", 1); + rowOptions.setScanSpec(spec); + + auto columnSelector = std::make_shared( + type, std::vector{"string_col", "double_col"}); + rowOptions.select(columnSelector); + + auto rowReader = reader->createRowReader(rowOptions); + VectorPtr result; + + ASSERT_EQ(rowReader->next(50, result), 50); + ASSERT_EQ(result->type()->size(), 2); + auto* rowVec = result->as(); + ASSERT_TRUE(rowVec); + ASSERT_EQ(rowVec->children().size(), 2); + auto* stringCol = rowVec->childAt(0).get(); + auto* doubleCol = rowVec->childAt(1).get(); + ASSERT_EQ(rowVec->childAt(0)->type(), VARCHAR()); + ASSERT_EQ(rowVec->childAt(0)->size(), 50); + ASSERT_EQ(rowVec->childAt(1)->type(), DOUBLE()); + ASSERT_EQ(rowVec->childAt(1)->size(), 50); + + ASSERT_EQ(rowReader->next(50, result), 50); + ASSERT_EQ(result->type()->size(), 2); + rowVec = result->as(); + ASSERT_TRUE(rowVec); + ASSERT_EQ(rowVec->children().size(), 2); + ASSERT_EQ(rowVec->childAt(0).get(), stringCol); + ASSERT_EQ(rowVec->childAt(1).get(), doubleCol); + ASSERT_EQ(rowVec->childAt(0)->type(), VARCHAR()); + ASSERT_EQ(rowVec->childAt(0)->size(), 50); + ASSERT_EQ(rowVec->childAt(1)->type(), DOUBLE()); + ASSERT_EQ(rowVec->childAt(1)->size(), 50); +} + +TEST_F(NimbleReaderTest, filter) { + test::VectorMaker maker(&pool()); + auto data = maker.flatVector(10, folly::identity); + auto vec = maker.rowVector({data, data}); + auto file = facebook::nimble::test::createNimbleFile(rootPool(), vec); + auto readFile = std::make_shared(file); + auto factory = + dwio::common::getReaderFactory(dwio::common::FileFormat::NIMBLE); + auto input = std::make_unique(readFile, pool()); + dwio::common::ReaderOptions options(&pool()); + auto reader = factory->createReader(std::move(input), options); + dwio::common::RowReaderOptions rowOptions; + auto spec = std::make_shared(""); + spec->getOrCreateChild(common::Subfield("c0")) + ->setFilter(common::createBigintValues({2, 3, 5, 7}, false)); + spec->addField("c1", 0); + rowOptions.setScanSpec(spec); + auto rowReader = reader->createRowReader(rowOptions); + VectorPtr result; + ASSERT_EQ(rowReader->next(6, result), 6); + ASSERT_EQ(result->size(), 3); + auto* rowVec = result->as(); + ASSERT_TRUE(rowVec); + ASSERT_EQ(rowVec->children().size(), 1); + auto* c1 = rowVec->childAt(0).get(); + ASSERT_EQ(c1->encoding(), VectorEncoding::Simple::DICTIONARY); + c1 = c1->valueVector().get(); + ASSERT_EQ(rowReader->next(1, result), 1); + ASSERT_EQ(result->size(), 0); + ASSERT_EQ(rowReader->next(4, result), 3); + ASSERT_EQ(result->size(), 1); + rowVec = result->as(); + ASSERT_TRUE(rowVec); + ASSERT_EQ(rowVec->children().size(), 1); + auto* c1Prime = rowVec->childAt(0).get(); + ASSERT_EQ(c1Prime->encoding(), VectorEncoding::Simple::DICTIONARY); + ASSERT_EQ(c1Prime->valueVector().get(), c1); +} + +TEST_F(NimbleReaderTest, flatMap) { + test::VectorMaker maker(&pool()); + auto mapVec = maker.mapVector( + 20, + [](vector_size_t) { return 5; }, + [](vector_size_t row) { return row % 5; }, + [](vector_size_t row) { return row; }); + auto vec = maker.rowVector({"float_features"}, {mapVec}); + facebook::nimble::VeloxWriterOptions writerOptions; + writerOptions.flatMapColumns.insert("float_features"); + auto file = + facebook::nimble::test::createNimbleFile(rootPool(), vec, writerOptions); + auto readFile = std::make_shared(file); + auto factory = + dwio::common::getReaderFactory(dwio::common::FileFormat::NIMBLE); + auto input = std::make_unique(readFile, pool()); + dwio::common::ReaderOptions options(&pool()); + auto reader = factory->createReader(std::move(input), options); + ASSERT_EQ(*reader->rowType(), *vec->type()); + { + dwio::common::RowReaderOptions rowOptions; + setScanSpec(*vec->type(), rowOptions); + auto rowReader = reader->createRowReader(rowOptions); + VectorPtr result; + ASSERT_EQ(rowReader->next(20, result), 20); + ASSERT_EQ(*result->type(), *vec->type()); + for (int i = 0; i < 20; ++i) { + EXPECT_TRUE(result->equalValueAt(vec.get(), i, i)); + } + } + { + auto floatFeaturesType = ROW( + {{"0", REAL()}, + {"1", REAL()}, + {"2", REAL()}, + {"3", REAL()}, + {"4", REAL()}}); + dwio::common::RowReaderOptions rowOptions; + setScanSpec(*ROW({"float_features"}, {floatFeaturesType}), rowOptions); + rowOptions.setFlatmapNodeIdsAsStruct({{1, {"0", "1", "2", "3", "4"}}}); + auto rowReader = reader->createRowReader(rowOptions); + VectorPtr result; + ASSERT_EQ(rowReader->next(20, result), 20); + ASSERT_EQ(result->type()->size(), 1); + ASSERT_EQ(result->type()->childAt(0)->kind(), TypeKind::ROW); + auto* floatFeatures = result->as()->childAt(0)->as(); + ASSERT_EQ(*floatFeatures->type(), *floatFeaturesType); + for (int i = 0; i < 20; ++i) { + for (int j = 0; j < 5; ++j) { + EXPECT_EQ( + floatFeatures->childAt(j)->asFlatVector()->valueAt(i), + j + 5 * i); + } + } + } +} + +void testSubfieldPruningUsingFeatureSelector( + bool flatmap, + memory::MemoryPool& leafPool, + memory::MemoryPool& rootPool) { + test::VectorMaker maker(&leafPool); + auto vec = maker.rowVector( + {"float_features"}, + { + maker.mapVector( + 5, + [](vector_size_t) { return 5; }, + [](vector_size_t row) { return row % 5; }, + [](vector_size_t row) { return row; }), + }); + auto rowType = asRowType(vec->type()); + facebook::nimble::VeloxWriterOptions writerOptions; + if (flatmap) { + writerOptions.flatMapColumns.insert("float_features"); + } + auto file = + facebook::nimble::test::createNimbleFile(rootPool, vec, writerOptions); + auto readFile = std::make_shared(file); + auto factory = + dwio::common::getReaderFactory(dwio::common::FileFormat::NIMBLE); + auto input = + std::make_unique(readFile, leafPool); + dwio::common::ReaderOptions options(&leafPool); + auto reader = factory->createReader(std::move(input), options); + auto spec = std::make_shared("root"); + spec->addAllChildFields(*vec->type()); + // No effect on bulk reader but will be used by selective reader. + spec->childByName("float_features") + ->childByName(common::ScanSpec::kMapKeysFieldName) + ->setFilter(common::createBigintValues({1, 3}, false)); + dwio::common::RowReaderOptions rowOptions; + rowOptions.setScanSpec(spec); + rowOptions.select(std::make_shared( + rowType, std::vector({"float_features#[1,3]"}))); + auto rowReader = reader->createRowReader(rowOptions); + VectorPtr result; + ASSERT_EQ(rowReader->next(vec->size(), result), vec->size()); + VectorPtr expected; + if (flatmap) { + expected = maker.rowVector( + {"float_features"}, + { + maker.mapVector( + vec->size(), + [](vector_size_t) { return 2; }, + [](vector_size_t i, vector_size_t j) { return 1 + 2 * j; }, + [](vector_size_t i, vector_size_t j) { + return 5 * i + 1 + 2 * j; + }), + }); + } else { + expected = vec; + } + ASSERT_EQ(result->size(), expected->size()); + for (int i = 0; i < result->size(); ++i) { + ASSERT_TRUE(result->equalValueAt(expected.get(), i, i)) + << result->toString(0, result->size()); + } + ASSERT_EQ(rowReader->next(vec->size(), result), 0); +} + +TEST_F(NimbleReaderTest, subfieldPruningFlatMap) { + testSubfieldPruningUsingFeatureSelector(true, pool(), rootPool()); +} + +TEST_F(NimbleReaderTest, subfieldPruningMap) { + testSubfieldPruningUsingFeatureSelector(false, pool(), rootPool()); +} + +} // namespace +} // namespace facebook::velox::nimble diff --git a/dwio/nimble/velox/tests/NimbleWriterTest.cpp b/dwio/nimble/velox/tests/NimbleWriterTest.cpp new file mode 100644 index 0000000..7369f3c --- /dev/null +++ b/dwio/nimble/velox/tests/NimbleWriterTest.cpp @@ -0,0 +1,99 @@ +/* + * Copyright (c) Meta Platforms, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "dwio/nimble/velox/NimbleWriter.h" +#include "dwio/nimble/velox/VeloxReader.h" + +#include "velox/common/memory/MemoryPool.h" +#include "velox/dwio/common/tests/utils/BatchMaker.h" +#include "velox/exec/MemoryReclaimer.h" + +#include + +namespace facebook::velox::nimble { +namespace { + +class NimbleWriterTest : public testing::Test { + protected: + void SetUp() override { + registerNimbleWriterFactory(); + } + + void TearDown() override { + unregisterNimbleWriterFactory(); + } + + memory::MemoryPool& pool() { + return *pool_; + } + + memory::MemoryPool& rootPool() { + return *aggrPool_; + } + + auto factory() const { + return dwio::common::getWriterFactory(dwio::common::FileFormat::NIMBLE); + } + + private: + std::shared_ptr pool_ = + memory::deprecatedAddDefaultLeafMemoryPool(); + std::shared_ptr aggrPool_ = + memory::deprecatedDefaultMemoryManager().addRootPool( + "root_pool", + memory::kMaxMemory, + exec::MemoryReclaimer::create()); +}; + +TEST_F(NimbleWriterTest, basic) { + auto schema = ROW({ + {"i32", INTEGER()}, + {"string", VARCHAR()}, + {"struct", ROW({{"i32", INTEGER()}})}, + }); + auto expected = test::BatchMaker::createBatch(schema, 100, pool()); + std::string buf; + dwio::common::WriterOptions options = { + .schema = schema, + .memoryPool = &rootPool(), + }; + auto writer = factory()->createWriter( + std::make_unique( + std::make_unique(&buf), ""), + options); + writer->write(expected); + writer->close(); + InMemoryReadFile readFile(buf); + facebook::nimble::VeloxReader reader( + pool(), + &readFile, + std::make_shared(schema)); + VectorPtr actual; + ASSERT_TRUE(reader.next(60, actual)); + ASSERT_EQ(actual->size(), 60); + for (int i = 0; i < actual->size(); ++i) { + ASSERT_TRUE(actual->equalValueAt(expected.get(), i, i)); + } + ASSERT_TRUE(reader.next(60, actual)); + ASSERT_EQ(actual->size(), 40); + for (int i = 0; i < actual->size(); ++i) { + ASSERT_TRUE(actual->equalValueAt(expected.get(), i, 60 + i)); + } + ASSERT_FALSE(reader.next(60, actual)); +} + +} // namespace +} // namespace facebook::velox::nimble