Skip to content

Commit

Permalink
Revert "Allow HiveSplit info columns like '$file_size' and '$file_mod…
Browse files Browse the repository at this point in the history
…ified_time' to be queried in SQL (facebookincubator#8800)"

This reverts commit b9afa14.
  • Loading branch information
PHILO-HE committed Mar 7, 2024
1 parent 275a8a3 commit d3dc172
Show file tree
Hide file tree
Showing 13 changed files with 24 additions and 235 deletions.
10 changes: 2 additions & 8 deletions velox/connectors/hive/HiveConnectorSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
std::shared_ptr<std::string> extraFileInfo;
std::unordered_map<std::string, std::string> serdeParameters;

/// These represent columns like $file_size, $file_modified_time that are
/// associated with the HiveSplit.
std::unordered_map<std::string, std::string> infoColumns;

HiveConnectorSplit(
const std::string& connectorId,
const std::string& _filePath,
Expand All @@ -55,8 +51,7 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {},
const std::unordered_map<std::string, std::string>& _serdeParameters = {},
int64_t _splitWeight = 0,
const std::unordered_map<std::string, std::string>& _infoColumns = {})
int64_t _splitWeight = 0)
: ConnectorSplit(connectorId, _splitWeight),
filePath(_filePath),
fileFormat(_fileFormat),
Expand All @@ -66,8 +61,7 @@ struct HiveConnectorSplit : public connector::ConnectorSplit {
tableBucketNumber(_tableBucketNumber),
customSplitInfo(_customSplitInfo),
extraFileInfo(_extraFileInfo),
serdeParameters(_serdeParameters),
infoColumns(_infoColumns) {}
serdeParameters(_serdeParameters) {}

std::string toString() const override {
if (tableBucketNumber.has_value()) {
Expand Down
28 changes: 6 additions & 22 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,6 @@ inline uint8_t parseDelimiter(const std::string& delim) {
return stoi(delim);
}

inline bool isSynthesizedColumn(
const std::string& name,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns) {
return name == kPath || name == kBucket || infoColumns.count(name) != 0;
}

} // namespace

const std::string& getColumnName(const common::Subfield& subfield) {
Expand Down Expand Up @@ -280,13 +273,9 @@ void checkColumnNameLowerCase(const std::shared_ptr<const Type>& type) {
}
}

void checkColumnNameLowerCase(
const SubfieldFilters& filters,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns) {
void checkColumnNameLowerCase(const SubfieldFilters& filters) {
for (auto& pair : filters) {
if (auto name = pair.first.toString();
isSynthesizedColumn(name, infoColumns)) {
if (auto name = pair.first.toString(); name == kPath || name == kBucket) {
continue;
}
auto& path = pair.first.path();
Expand Down Expand Up @@ -321,17 +310,14 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& dataColumns,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
memory::MemoryPool* pool) {
auto spec = std::make_shared<common::ScanSpec>("root");
folly::F14FastMap<std::string, std::vector<const common::Subfield*>>
filterSubfields;
std::vector<SubfieldSpec> subfieldSpecs;
for (auto& [subfield, _] : filters) {
if (auto name = subfield.toString();
!isSynthesizedColumn(name, infoColumns) &&
partitionKeys.count(name) == 0) {
name != kPath && name != kBucket && partitionKeys.count(name) == 0) {
filterSubfields[getColumnName(subfield)].push_back(&subfield);
}
}
Expand Down Expand Up @@ -378,13 +364,11 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
// SelectiveColumnReader doesn't support constant columns with filters,
// hence, we can't have a filter for a $path or $bucket column.
//
// Unfortunately, Presto happens to specify a filter for $path, $file_size,
// $file_modified_time or $bucket column. This filter is redundant and needs
// to be removed.
// Unfortunately, Presto happens to specify a filter for $path or
// $bucket column. This filter is redundant and needs to be removed.
// TODO Remove this check when Presto is fixed to not specify a filter
// on $path and $bucket column.
if (auto name = pair.first.toString();
isSynthesizedColumn(name, infoColumns)) {
if (auto name = pair.first.toString(); name == kPath || name == kBucket) {
continue;
}
auto fieldSpec = spec->getOrCreateChild(pair.first);
Expand Down
7 changes: 1 addition & 6 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ const std::string& getColumnName(const common::Subfield& subfield);

void checkColumnNameLowerCase(const std::shared_ptr<const Type>& type);

void checkColumnNameLowerCase(
const SubfieldFilters& filters,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns);
void checkColumnNameLowerCase(const SubfieldFilters& filters);

void checkColumnNameLowerCase(const core::TypedExprPtr& typeExpr);

Expand All @@ -55,8 +52,6 @@ std::shared_ptr<common::ScanSpec> makeScanSpec(
const RowTypePtr& dataColumns,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
partitionKeys,
const std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>&
infoColumns,
memory::MemoryPool* pool);

void configureReaderOptions(
Expand Down
7 changes: 1 addition & 6 deletions velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,6 @@ HiveDataSource::HiveDataSource(
if (handle->columnType() == HiveColumnHandle::ColumnType::kPartitionKey) {
partitionKeys_.emplace(handle->name(), handle);
}

if (handle->columnType() == HiveColumnHandle::ColumnType::kSynthesized) {
infoColumns_.emplace(handle->name(), handle);
}
}

std::vector<std::string> readerRowNames;
Expand Down Expand Up @@ -92,7 +88,7 @@ HiveDataSource::HiveDataSource(
if (hiveConfig_->isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->sessionProperties())) {
checkColumnNameLowerCase(outputType_);
checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters(), infoColumns_);
checkColumnNameLowerCase(hiveTableHandle_->subfieldFilters());
checkColumnNameLowerCase(hiveTableHandle_->remainingFilter());
}

Expand Down Expand Up @@ -156,7 +152,6 @@ HiveDataSource::HiveDataSource(
filters,
hiveTableHandle_->dataColumns(),
partitionKeys_,
infoColumns_,
pool_);
if (remainingFilter) {
metadataFilter_ = std::make_shared<common::MetadataFilter>(
Expand Down
4 changes: 0 additions & 4 deletions velox/connectors/hive/HiveDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,6 @@ class HiveDataSource : public DataSource {

// The row type for the data source output, not including filter-only columns
const RowTypePtr outputType_;

// Column handles for the Split info columns keyed on their column names.
std::unordered_map<std::string, std::shared_ptr<HiveColumnHandle>>
infoColumns_;
std::shared_ptr<common::MetadataFilter> metadataFilter_;
std::unique_ptr<exec::ExprSet> remainingFilterExprSet_;
RowVectorPtr emptyOutput_;
Expand Down
18 changes: 3 additions & 15 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ std::vector<TypePtr> SplitReader::adaptColumns(
auto* childSpec = childrenSpecs[i].get();
const std::string& fieldName = childSpec->fieldName();

if (auto it = hiveSplit_->partitionKeys.find(fieldName);
it != hiveSplit_->partitionKeys.end()) {
setPartitionValue(childSpec, fieldName, it->second);
auto iter = hiveSplit_->partitionKeys.find(fieldName);
if (iter != hiveSplit_->partitionKeys.end()) {
setPartitionValue(childSpec, fieldName, iter->second);
} else if (fieldName == kPath) {
auto constantVec = std::make_shared<ConstantVector<StringView>>(
connectorQueryCtx_->memoryPool(),
Expand All @@ -240,18 +240,6 @@ std::vector<TypePtr> SplitReader::adaptColumns(
std::move(bucket));
childSpec->setConstantValue(constantVec);
}
} else if (auto iter = hiveSplit_->infoColumns.find(fieldName);
iter != hiveSplit_->infoColumns.end()) {
auto infoColumnType =
readerOutputType_->childAt(readerOutputType_->getChildIdx(fieldName));
auto constant = VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH_ALL(
newConstantFromString,
infoColumnType->kind(),
infoColumnType,
iter->second,
1,
connectorQueryCtx_->memoryPool());
childSpec->setConstantValue(constant);
} else {
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
if (!fileTypeIdx.has_value()) {
Expand Down
18 changes: 4 additions & 14 deletions velox/connectors/hive/iceberg/IcebergSplit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,15 @@ HiveIcebergSplit::HiveIcebergSplit(
_partitionKeys,
std::optional<int32_t> _tableBucketNumber,
const std::unordered_map<std::string, std::string>& _customSplitInfo,
const std::shared_ptr<std::string>& _extraFileInfo,
const std::unordered_map<std::string, std::string>& _infoColumns)
const std::shared_ptr<std::string>& _extraFileInfo)
: HiveConnectorSplit(
_connectorId,
_filePath,
_fileFormat,
_start,
_length,
_partitionKeys,
_tableBucketNumber,
_customSplitInfo,
_extraFileInfo,
{},
0,
_infoColumns) {
_tableBucketNumber) {
// TODO: Deserialize _extraFileInfo to get deleteFiles;
}

Expand All @@ -60,8 +54,7 @@ HiveIcebergSplit::HiveIcebergSplit(
std::optional<int32_t> _tableBucketNumber,
const std::unordered_map<std::string, std::string>& _customSplitInfo,
const std::shared_ptr<std::string>& _extraFileInfo,
std::vector<IcebergDeleteFile> _deletes,
const std::unordered_map<std::string, std::string>& _infoColumns)
std::vector<IcebergDeleteFile> _deletes)
: HiveConnectorSplit(
_connectorId,
_filePath,
Expand All @@ -71,9 +64,6 @@ HiveIcebergSplit::HiveIcebergSplit(
_partitionKeys,
_tableBucketNumber,
_customSplitInfo,
_extraFileInfo,
{},
0,
_infoColumns),
_extraFileInfo),
deleteFiles(_deletes) {}
} // namespace facebook::velox::connector::hive::iceberg
6 changes: 2 additions & 4 deletions velox/connectors/hive/iceberg/IcebergSplit.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit {
_partitionKeys = {},
std::optional<int32_t> _tableBucketNumber = std::nullopt,
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {},
const std::unordered_map<std::string, std::string>& _infoColumns = {});
const std::shared_ptr<std::string>& _extraFileInfo = {});

// For tests only
HiveIcebergSplit(
Expand All @@ -51,8 +50,7 @@ struct HiveIcebergSplit : public connector::hive::HiveConnectorSplit {
std::optional<int32_t> _tableBucketNumber = std::nullopt,
const std::unordered_map<std::string, std::string>& _customSplitInfo = {},
const std::shared_ptr<std::string>& _extraFileInfo = {},
std::vector<IcebergDeleteFile> deletes = {},
const std::unordered_map<std::string, std::string>& _infoColumns = {});
std::vector<IcebergDeleteFile> deletes = {});
};

} // namespace facebook::velox::connector::hive::iceberg
17 changes: 3 additions & 14 deletions velox/connectors/hive/tests/HiveConnectorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_multilevel) {
auto rowType = ROW({{"c0", columnType}});
auto subfields = makeSubfields({"c0.c0c1[3][\"foo\"].c0c1c0"});
auto scanSpec = makeScanSpec(
rowType, groupSubfields(subfields), {}, nullptr, {}, {}, pool_.get());
rowType, groupSubfields(subfields), {}, nullptr, {}, pool_.get());
auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0");
validateNullConstant(*c0c0, *BIGINT());
auto* c0c1 = scanSpec->childByName("c0")->childByName("c0c1");
Expand Down Expand Up @@ -122,7 +122,6 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeFields) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0c0 = scanSpec->childByName("c0")->childByName("c0c0");
ASSERT_FALSE(c0c0->childByName("c0c0c0")->isConstant());
Expand All @@ -145,7 +144,6 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArray) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_EQ(c0->maxArrayElementsCount(), 2);
Expand All @@ -162,7 +160,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeArrayNegative) {
auto subfields = makeSubfields({"c0[1].c0c0", "c0[-1].c0c2"});
auto groupedSubfields = groupSubfields(subfields);
VELOX_ASSERT_USER_THROW(
makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, {}, pool_.get()),
makeScanSpec(rowType, groupedSubfields, {}, nullptr, {}, pool_.get()),
"Non-positive array subscript cannot be push down");
}

Expand All @@ -177,7 +175,6 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_mergeMap) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
auto* keysFilter = c0->childByName(ScanSpec::kMapKeysFieldName)->filter();
Expand All @@ -203,7 +200,6 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter());
Expand All @@ -222,7 +218,6 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_allSubscripts) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->childByName(ScanSpec::kMapKeysFieldName)->filter());
Expand All @@ -245,7 +240,6 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
auto* keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand Down Expand Up @@ -273,7 +267,6 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand All @@ -292,7 +285,6 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand All @@ -308,7 +300,6 @@ TEST_F(HiveConnectorTest, makeScanSpec_requiredSubfields_doubleMapKey) {
{},
nullptr,
{},
{},
pool_.get());
keysFilter = scanSpec->childByName("c0")
->childByName(ScanSpec::kMapKeysFieldName)
Expand Down Expand Up @@ -344,7 +335,6 @@ TEST_F(HiveConnectorTest, makeScanSpec_filtersNotInRequiredSubfields) {
filters,
ROW({{"c0", c0Type}, {"c1", c1Type}}),
{},
{},
pool_.get());
auto c0 = scanSpec->childByName("c0");
ASSERT_FALSE(c0->isConstant());
Expand Down Expand Up @@ -389,7 +379,6 @@ TEST_F(HiveConnectorTest, makeScanSpec_duplicateSubfields) {
{},
nullptr,
{},
{},
pool_.get());
auto* c0 = scanSpec->childByName("c0");
ASSERT_EQ(c0->children().size(), 2);
Expand All @@ -403,7 +392,7 @@ TEST_F(HiveConnectorTest, makeScanSpec_filterPartitionKey) {
SubfieldFilters filters;
filters.emplace(Subfield("ds"), exec::equal("2023-10-13"));
auto scanSpec = makeScanSpec(
rowType, {}, filters, rowType, {{"ds", nullptr}}, {}, pool_.get());
rowType, {}, filters, rowType, {{"ds", nullptr}}, pool_.get());
ASSERT_TRUE(scanSpec->childByName("c0")->projectOut());
ASSERT_FALSE(scanSpec->childByName("ds")->projectOut());
}
Expand Down
Loading

0 comments on commit d3dc172

Please sign in to comment.