Skip to content

Commit

Permalink
refactor(connector): extract the common logic of file_scan tvf and pa…
Browse files Browse the repository at this point in the history
…rquet source (#19398)
  • Loading branch information
wcy-fdu authored Nov 19, 2024
1 parent a05cf34 commit 72e5ad3
Show file tree
Hide file tree
Showing 6 changed files with 352 additions and 262 deletions.
37 changes: 9 additions & 28 deletions src/batch/src/executor/s3_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use parquet::arrow::ProjectionMask;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::{Field, Schema};
use risingwave_connector::source::iceberg::parquet_file_reader::create_parquet_stream_builder;
use risingwave_connector::source::iceberg::{new_s3_operator, read_parquet_file};
use risingwave_pb::batch_plan::file_scan_node;
use risingwave_pb::batch_plan::file_scan_node::StorageType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
Expand Down Expand Up @@ -85,34 +82,18 @@ impl S3FileScanExecutor {
async fn do_execute(self: Box<Self>) {
assert_eq!(self.file_format, FileFormat::Parquet);
for file in self.file_location {
let mut batch_stream_builder = create_parquet_stream_builder(
let op = new_s3_operator(
self.s3_region.clone(),
self.s3_access_key.clone(),
self.s3_secret_key.clone(),
file,
)
.await?;

let arrow_schema = batch_stream_builder.schema();
assert_eq!(arrow_schema.fields.len(), self.schema.fields.len());
for (field, arrow_field) in self.schema.fields.iter().zip(arrow_schema.fields.iter()) {
assert_eq!(*field.name, *arrow_field.name());
}

batch_stream_builder = batch_stream_builder.with_projection(ProjectionMask::all());

batch_stream_builder = batch_stream_builder.with_batch_size(self.batch_size);

let record_batch_stream = batch_stream_builder
.build()
.map_err(|e| anyhow!(e).context("fail to build arrow stream builder"))?;

file.clone(),
)?;
let chunk_stream = read_parquet_file(op, file, None, None, self.batch_size, 0).await?;
#[for_await]
for record_batch in record_batch_stream {
let record_batch = record_batch?;
let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
debug_assert_eq!(chunk.data_types(), self.schema.data_types());
yield chunk;
for stream_chunk in chunk_stream {
let stream_chunk = stream_chunk?;
let (data_chunk, _) = stream_chunk.into_parts();
yield data_chunk;
}
}
}
Expand Down
104 changes: 9 additions & 95 deletions src/connector/src/source/filesystem/opendal_source/opendal_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,19 @@ use async_compression::tokio::bufread::GzipDecoder;
use async_trait::async_trait;
use futures::TryStreamExt;
use futures_async_stream::try_stream;
use itertools::Itertools;
use opendal::Operator;
use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask};
use parquet::file::metadata::FileMetaData;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::StreamChunk;
use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt;
use tokio::io::{AsyncRead, BufReader};
use tokio_util::io::{ReaderStream, StreamReader};

use super::opendal_enumerator::OpendalEnumerator;
use super::OpendalSource;
use crate::error::ConnectorResult;
use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParquetParser, ParserConfig};
use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig};
use crate::source::filesystem::file_common::CompressionFormat;
use crate::source::filesystem::nd_streaming::need_nd_streaming;
use crate::source::filesystem::{nd_streaming, OpendalFsSplit};
use crate::source::iceberg::read_parquet_file;
use crate::source::{
BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData,
SplitReader,
Expand Down Expand Up @@ -91,38 +86,15 @@ impl<Src: OpendalSource> OpendalReader<Src> {
let msg_stream;

if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config {
// // If the format is "parquet", use `ParquetParser` to convert `record_batch` into stream chunk.
let mut reader: tokio_util::compat::Compat<opendal::FuturesAsyncReader> = self
.connector
.op
.reader_with(&object_name)
.into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`.
.await?
.into_futures_async_read(..)
.await?
.compat();
let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?;

let file_metadata = parquet_metadata.file_metadata();
let column_indices =
extract_valid_column_indices(self.columns.clone(), file_metadata)?;
let projection_mask =
ProjectionMask::leaves(file_metadata.schema_descr(), column_indices);
// For the Parquet format, we directly convert from a record batch to a stream chunk.
// Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file.
let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader)
.await?
.with_batch_size(self.source_ctx.source_ctrl_opts.chunk_size)
.with_projection(projection_mask)
.with_offset(split.offset)
.build()?;

let parquet_parser = ParquetParser::new(
self.parser_config.common.rw_columns.clone(),
msg_stream = read_parquet_file(
self.connector.op.clone(),
object_name,
self.columns.clone(),
Some(self.parser_config.common.rw_columns.clone()),
self.source_ctx.source_ctrl_opts.chunk_size,
split.offset,
)?;
msg_stream = parquet_parser.into_stream(record_batch_stream);
)
.await?;
} else {
let data_stream = Self::stream_read_object(
self.connector.op.clone(),
Expand Down Expand Up @@ -229,61 +201,3 @@ impl<Src: OpendalSource> OpendalReader<Src> {
}
}
}

/// Extracts valid column indices from a Parquet file schema based on the user's requested schema.
///
/// This function is used for column pruning of Parquet files. It calculates the intersection
/// between the columns in the currently read Parquet file and the schema provided by the user.
/// This is useful for reading a `RecordBatch` with the appropriate `ProjectionMask`, ensuring that
/// only the necessary columns are read.
///
/// # Parameters
/// - `columns`: A vector of `Column` representing the user's requested schema.
/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file.
///
/// # Returns
/// - A `ConnectorResult<Vec<usize>>`, which contains the indices of the valid columns in the
/// Parquet file schema that match the requested schema. If an error occurs during processing,
/// it returns an appropriate error.
pub fn extract_valid_column_indices(
columns: Option<Vec<Column>>,
metadata: &FileMetaData,
) -> ConnectorResult<Vec<usize>> {
match columns {
Some(rw_columns) => {
let parquet_column_names = metadata
.schema_descr()
.columns()
.iter()
.map(|c| c.name())
.collect_vec();

let converted_arrow_schema =
parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata())
.map_err(anyhow::Error::from)?;

let valid_column_indices: Vec<usize> = rw_columns
.iter()
.filter_map(|column| {
parquet_column_names
.iter()
.position(|&name| name == column.name)
.and_then(|pos| {
// We should convert Arrow field to the rw data type instead of converting the rw data type to the Arrow data type for comparison.
// The reason is that for the timestamp type, the different time units in Arrow need to match with the timestamp and timestamptz in rw.
let arrow_filed_to_rw_data_type = IcebergArrowConvert
.type_from_field(converted_arrow_schema.field(pos))
.ok()?;
if arrow_filed_to_rw_data_type == column.data_type {
Some(pos)
} else {
None
}
})
})
.collect();
Ok(valid_column_indices)
}
None => Ok(vec![]),
}
}
4 changes: 2 additions & 2 deletions src/connector/src/source/iceberg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod parquet_file_reader;
pub mod parquet_file_handler;

use std::collections::HashMap;

Expand All @@ -24,7 +24,7 @@ use iceberg::scan::FileScanTask;
use iceberg::spec::TableMetadata;
use iceberg::table::Table;
use itertools::Itertools;
pub use parquet_file_reader::*;
pub use parquet_file_handler::*;
use risingwave_common::bail;
use risingwave_common::catalog::Schema;
use risingwave_common::types::JsonbVal;
Expand Down
Loading

0 comments on commit 72e5ad3

Please sign in to comment.