Skip to content

Commit

Permalink
[Enhancement] Provide clear error message if skip_headers is set in s…
Browse files Browse the repository at this point in the history
…elect from files option (#49503)

Signed-off-by: tracymacding <[email protected]>
(cherry picked from commit 51ce589)

# Conflicts:
#	be/src/exec/csv_scanner.cpp
#	be/test/exec/csv_scanner_test.cpp
  • Loading branch information
tracymacding authored and mergify[bot] committed Aug 8, 2024
1 parent f9dbba7 commit e3562d7
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 0 deletions.
57 changes: 57 additions & 0 deletions be/src/exec/csv_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,63 @@ void CSVScanner::_materialize_src_chunk_adaptive_nullable_column(ChunkPtr& chunk
}
}

<<<<<<< HEAD
=======
Status CSVScanner::_init_reader() {
if (_curr_reader == nullptr && ++_curr_file_index < _scan_range.ranges.size()) {
std::shared_ptr<SequentialFile> file;
const TBrokerRangeDesc& range_desc = _scan_range.ranges[_curr_file_index];
Status st = create_sequential_file(range_desc, _scan_range.broker_addresses[0], _scan_range.params, &file);
if (!st.ok()) {
LOG(WARNING) << "Failed to create sequential files. status: " << st.to_string();
return st;
}

_curr_reader = std::make_unique<ScannerCSVReader>(file, _state, _parse_options);
_curr_reader->set_counter(_counter);
if (_scan_range.ranges[_curr_file_index].size > 0 &&
_scan_range.ranges[_curr_file_index].format_type == TFileFormatType::FORMAT_CSV_PLAIN) {
// Does not set limit for compressed file.
_curr_reader->set_limit(_scan_range.ranges[_curr_file_index].size);
}
if (_scan_range.ranges[_curr_file_index].start_offset > 0) {
// Skip the first record started from |start_offset|.
auto status = file->skip(_scan_range.ranges[_curr_file_index].start_offset);
if (status.is_time_out()) {
// open this file next time
--_curr_file_index;
_curr_reader.reset();
return status;
}
CSVReader::Record dummy;
RETURN_IF_ERROR(_curr_reader->next_record(&dummy));
}

if (_parse_options.skip_header) {
for (int64_t i = 0; i < _parse_options.skip_header; i++) {
CSVReader::Record dummy;
auto st = _curr_reader->next_record(&dummy);
if (!st.ok()) {
if (st.is_end_of_file()) {
auto err_msg = fmt::format(
"The parameter 'skip_header' is set to {}, but there are only {} rows in the csv file",
_parse_options.skip_header, i);

return Status::EndOfFile(err_msg);
} else {
return st;
}
}
}
}
return Status::OK();
} else if (_curr_reader == nullptr) {
return Status::EndOfFile("CSVScanner");
}
return Status::OK();
}

>>>>>>> 51ce589e6e ([Enhancement] Provide clear error message if skip_headers is set in select from files option (#49503))
StatusOr<ChunkPtr> CSVScanner::get_next() {
SCOPED_RAW_TIMER(&_counter->total_ns);

Expand Down
155 changes: 155 additions & 0 deletions be/test/exec/csv_scanner_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,161 @@ TEST_P(CSVScannerTest, test_column_count_inconsistent) {
(void)fs::remove(log_file_path);
}

<<<<<<< HEAD
=======
TEST_P(CSVScannerTest, test_get_schema) {
{
// sample 1 row
std::vector<std::pair<std::string, LogicalType>> expected_schema = {
{"$1", TYPE_BIGINT}, {"$2", TYPE_DOUBLE}, {"$3", TYPE_DOUBLE}, {"$4", TYPE_BOOLEAN}};

std::vector<TBrokerRangeDesc> ranges;
TBrokerRangeDesc range;
range.__set_path("./be/test/exec/test_data/csv_scanner/csv_file23");
range.__set_num_of_columns_from_file(0);
ranges.push_back(range);

TBrokerScanRangeParams* params = _obj_pool.add(new TBrokerScanRangeParams());
params->__set_row_delimiter('\n');
params->__set_column_separator(',');
params->__set_schema_sample_file_row_count(1);
auto scanner = create_csv_scanner({}, ranges, params);
EXPECT_OK(scanner->open());
std::vector<SlotDescriptor> schema;
EXPECT_OK(scanner->get_schema(&schema));
EXPECT_EQ(expected_schema.size(), schema.size());

for (size_t i = 0; i < schema.size(); i++) {
EXPECT_EQ(expected_schema[i].first, schema[i].col_name());
EXPECT_EQ(expected_schema[i].second, schema[i].type().type) << schema[i].col_name();
}
}

{
// sample 2 row
std::vector<std::pair<std::string, LogicalType>> expected_schema = {{"$1", TYPE_BIGINT},
{"$2", TYPE_VARCHAR},
{"$3", TYPE_VARCHAR},
{"$4", TYPE_VARCHAR},
{"$5", TYPE_BOOLEAN}};

std::vector<TBrokerRangeDesc> ranges;
TBrokerRangeDesc range;
range.__set_path("./be/test/exec/test_data/csv_scanner/csv_file23");
range.__set_num_of_columns_from_file(0);
ranges.push_back(range);

TBrokerScanRangeParams* params = _obj_pool.add(new TBrokerScanRangeParams());
params->__set_row_delimiter('\n');
params->__set_column_separator(',');
params->__set_schema_sample_file_row_count(2);
auto scanner = create_csv_scanner({}, ranges, params);
EXPECT_OK(scanner->open());
std::vector<SlotDescriptor> schema;
EXPECT_OK(scanner->get_schema(&schema));
EXPECT_EQ(expected_schema.size(), schema.size());

for (size_t i = 0; i < schema.size(); i++) {
EXPECT_EQ(expected_schema[i].first, schema[i].col_name());
EXPECT_EQ(expected_schema[i].second, schema[i].type().type) << schema[i].col_name();
}
}

{
// sample 1 row, skip header 1, enclose ", escape "\"
std::vector<std::pair<std::string, LogicalType>> expected_schema = {
{"$1", TYPE_BIGINT}, {"$2", TYPE_VARCHAR}, {"$3", TYPE_DOUBLE}, {"$4", TYPE_BOOLEAN}};

std::vector<TBrokerRangeDesc> ranges;
TBrokerRangeDesc range;
range.__set_path("./be/test/exec/test_data/csv_scanner/csv_file23");
range.__set_num_of_columns_from_file(0);
ranges.push_back(range);

TBrokerScanRangeParams* params = _obj_pool.add(new TBrokerScanRangeParams());
params->__set_row_delimiter('\n');
params->__set_column_separator(',');
params->__set_skip_header(1);
params->__set_enclose('"');
params->__set_escape('\\');
params->__set_schema_sample_file_row_count(1);
auto scanner = create_csv_scanner({}, ranges, params);
EXPECT_OK(scanner->open());
std::vector<SlotDescriptor> schema;
EXPECT_OK(scanner->get_schema(&schema));
EXPECT_EQ(expected_schema.size(), schema.size());

for (size_t i = 0; i < schema.size(); i++) {
EXPECT_EQ(expected_schema[i].first, schema[i].col_name());
EXPECT_EQ(expected_schema[i].second, schema[i].type().type) << schema[i].col_name();
}
}
}

TEST_P(CSVScannerTest, test_flexible_column_mapping) {
std::vector<TypeDescriptor> types;
types.emplace_back(TYPE_BIGINT);
types.emplace_back(TYPE_DOUBLE);
types.emplace_back(TYPE_VARCHAR);
types.emplace_back(TYPE_VARCHAR);
types.emplace_back(TYPE_VARCHAR);
// not existing column
types.emplace_back(TYPE_INT);

std::vector<TBrokerRangeDesc> ranges;
TBrokerRangeDesc range;
range.__set_start_offset(0);
range.__set_path("./be/test/exec/test_data/csv_scanner/csv_file1");
range.__set_num_of_columns_from_file(types.size());
ranges.push_back(range);

TBrokerScanRangeParams* params = _obj_pool.add(new TBrokerScanRangeParams());
params->__set_row_delimiter('\n');
params->__set_column_separator('|');
params->__set_flexible_column_mapping(true);
auto scanner = create_csv_scanner(types, ranges, params);
scanner->use_v2(_use_v2);
EXPECT_OK(scanner->open());

auto res = scanner->get_next();
EXPECT_OK(res.status());

ChunkPtr chunk = res.value();
EXPECT_EQ(6, chunk->num_columns());
EXPECT_EQ(3, chunk->num_rows());

EXPECT_EQ("[1, 1.1, 'apple', '2020-01-01', 'apple', NULL]", chunk->debug_row(0));
EXPECT_EQ("[-1, -0.1, 'banana', '1998-09-01', 'banana', NULL]", chunk->debug_row(1));
EXPECT_EQ("[10, NULL, 'grapefruit', '2021-02-19', 'grapefruit', NULL]", chunk->debug_row(2));
}

TEST_P(CSVScannerTest, test_skip_headers) {
std::vector<TBrokerRangeDesc> ranges;
TBrokerRangeDesc range;
range.__set_path("./be/test/exec/test_data/csv_scanner/small.csv");
range.__set_num_of_columns_from_file(0);
ranges.push_back(range);

TBrokerScanRangeParams* params = _obj_pool.add(new TBrokerScanRangeParams());
params->__set_row_delimiter('\n');
// there are only 2 rows within file small.csv
// if we set skip first 3 line, we expect to get a clear error message
params->__set_skip_header(3);
params->__set_column_separator(',');
params->__set_enclose('"');
params->__set_escape('\\');

auto scanner = create_csv_scanner({}, ranges, params);
EXPECT_OK(scanner->open());
std::vector<SlotDescriptor> schema;
auto st = scanner->get_schema(&schema);
EXPECT_FALSE(st.ok());
EXPECT_EQ(0, schema.size());
EXPECT_EQ(st.to_string(false),
"End of file: The parameter 'skip_header' is set to 3, but there are only 2 rows in the csv file");
}

>>>>>>> 51ce589e6e ([Enhancement] Provide clear error message if skip_headers is set in select from files option (#49503))
INSTANTIATE_TEST_CASE_P(CSVScannerTestParams, CSVScannerTest, Values(true, false));
INSTANTIATE_TEST_CASE_P(CSVScannerTestParams, CSVScannerTrimSpaceTest, Values(true));

Expand Down

0 comments on commit e3562d7

Please sign in to comment.