diff --git a/src/batch/src/executor/s3_file_scan.rs b/src/batch/src/executor/s3_file_scan.rs index a7b0d1bacd79..38907c63f841 100644 --- a/src/batch/src/executor/s3_file_scan.rs +++ b/src/batch/src/executor/s3_file_scan.rs @@ -12,13 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::anyhow; use futures_async_stream::try_stream; use futures_util::stream::StreamExt; -use parquet::arrow::ProjectionMask; -use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::catalog::{Field, Schema}; -use risingwave_connector::source::iceberg::parquet_file_reader::create_parquet_stream_builder; +use risingwave_connector::source::iceberg::{new_s3_operator, read_parquet_file}; use risingwave_pb::batch_plan::file_scan_node; use risingwave_pb::batch_plan::file_scan_node::StorageType; use risingwave_pb::batch_plan::plan_node::NodeBody; @@ -85,34 +82,18 @@ impl S3FileScanExecutor { async fn do_execute(self: Box) { assert_eq!(self.file_format, FileFormat::Parquet); for file in self.file_location { - let mut batch_stream_builder = create_parquet_stream_builder( + let op = new_s3_operator( self.s3_region.clone(), self.s3_access_key.clone(), self.s3_secret_key.clone(), - file, - ) - .await?; - - let arrow_schema = batch_stream_builder.schema(); - assert_eq!(arrow_schema.fields.len(), self.schema.fields.len()); - for (field, arrow_field) in self.schema.fields.iter().zip(arrow_schema.fields.iter()) { - assert_eq!(*field.name, *arrow_field.name()); - } - - batch_stream_builder = batch_stream_builder.with_projection(ProjectionMask::all()); - - batch_stream_builder = batch_stream_builder.with_batch_size(self.batch_size); - - let record_batch_stream = batch_stream_builder - .build() - .map_err(|e| anyhow!(e).context("fail to build arrow stream builder"))?; - + file.clone(), + )?; + let chunk_stream = read_parquet_file(op, file, None, None, self.batch_size, 0).await?; #[for_await] - for record_batch in record_batch_stream { - let record_batch = record_batch?; - let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - debug_assert_eq!(chunk.data_types(), self.schema.data_types()); - yield chunk; + for stream_chunk in chunk_stream { + let stream_chunk = stream_chunk?; + let (data_chunk, _) = stream_chunk.into_parts(); + yield data_chunk; } } } diff --git a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs index 14258d892465..ca8ee1ae486b 100644 --- a/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs +++ b/src/connector/src/source/filesystem/opendal_source/opendal_reader.rs @@ -19,24 +19,19 @@ use async_compression::tokio::bufread::GzipDecoder; use async_trait::async_trait; use futures::TryStreamExt; use futures_async_stream::try_stream; -use itertools::Itertools; use opendal::Operator; -use parquet::arrow::async_reader::AsyncFileReader; -use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; -use parquet::file::metadata::FileMetaData; -use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::array::StreamChunk; -use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; use tokio::io::{AsyncRead, BufReader}; use tokio_util::io::{ReaderStream, StreamReader}; use super::opendal_enumerator::OpendalEnumerator; use super::OpendalSource; use crate::error::ConnectorResult; -use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParquetParser, ParserConfig}; +use crate::parser::{ByteStreamSourceParserImpl, EncodingProperties, ParserConfig}; use crate::source::filesystem::file_common::CompressionFormat; use crate::source::filesystem::nd_streaming::need_nd_streaming; use crate::source::filesystem::{nd_streaming, OpendalFsSplit}; +use crate::source::iceberg::read_parquet_file; use crate::source::{ BoxChunkSourceStream, Column, SourceContextRef, SourceMessage, SourceMeta, SplitMetaData, SplitReader, @@ -91,38 +86,15 @@ impl OpendalReader { let msg_stream; if let EncodingProperties::Parquet = &self.parser_config.specific.encoding_config { - // // If the format is "parquet", use `ParquetParser` to convert `record_batch` into stream chunk. - let mut reader: tokio_util::compat::Compat = self - .connector - .op - .reader_with(&object_name) - .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. - .await? - .into_futures_async_read(..) - .await? - .compat(); - let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; - - let file_metadata = parquet_metadata.file_metadata(); - let column_indices = - extract_valid_column_indices(self.columns.clone(), file_metadata)?; - let projection_mask = - ProjectionMask::leaves(file_metadata.schema_descr(), column_indices); - // For the Parquet format, we directly convert from a record batch to a stream chunk. - // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file. - let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader) - .await? - .with_batch_size(self.source_ctx.source_ctrl_opts.chunk_size) - .with_projection(projection_mask) - .with_offset(split.offset) - .build()?; - - let parquet_parser = ParquetParser::new( - self.parser_config.common.rw_columns.clone(), + msg_stream = read_parquet_file( + self.connector.op.clone(), object_name, + self.columns.clone(), + Some(self.parser_config.common.rw_columns.clone()), + self.source_ctx.source_ctrl_opts.chunk_size, split.offset, - )?; - msg_stream = parquet_parser.into_stream(record_batch_stream); + ) + .await?; } else { let data_stream = Self::stream_read_object( self.connector.op.clone(), @@ -229,61 +201,3 @@ impl OpendalReader { } } } - -/// Extracts valid column indices from a Parquet file schema based on the user's requested schema. -/// -/// This function is used for column pruning of Parquet files. It calculates the intersection -/// between the columns in the currently read Parquet file and the schema provided by the user. -/// This is useful for reading a `RecordBatch` with the appropriate `ProjectionMask`, ensuring that -/// only the necessary columns are read. -/// -/// # Parameters -/// - `columns`: A vector of `Column` representing the user's requested schema. -/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file. -/// -/// # Returns -/// - A `ConnectorResult>`, which contains the indices of the valid columns in the -/// Parquet file schema that match the requested schema. If an error occurs during processing, -/// it returns an appropriate error. -pub fn extract_valid_column_indices( - columns: Option>, - metadata: &FileMetaData, -) -> ConnectorResult> { - match columns { - Some(rw_columns) => { - let parquet_column_names = metadata - .schema_descr() - .columns() - .iter() - .map(|c| c.name()) - .collect_vec(); - - let converted_arrow_schema = - parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) - .map_err(anyhow::Error::from)?; - - let valid_column_indices: Vec = rw_columns - .iter() - .filter_map(|column| { - parquet_column_names - .iter() - .position(|&name| name == column.name) - .and_then(|pos| { - // We should convert Arrow field to the rw data type instead of converting the rw data type to the Arrow data type for comparison. - // The reason is that for the timestamp type, the different time units in Arrow need to match with the timestamp and timestamptz in rw. - let arrow_filed_to_rw_data_type = IcebergArrowConvert - .type_from_field(converted_arrow_schema.field(pos)) - .ok()?; - if arrow_filed_to_rw_data_type == column.data_type { - Some(pos) - } else { - None - } - }) - }) - .collect(); - Ok(valid_column_indices) - } - None => Ok(vec![]), - } -} diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index aeb642c80a01..4f99797525ab 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod parquet_file_reader; +pub mod parquet_file_handler; use std::collections::HashMap; @@ -24,7 +24,7 @@ use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; use iceberg::table::Table; use itertools::Itertools; -pub use parquet_file_reader::*; +pub use parquet_file_handler::*; use risingwave_common::bail; use risingwave_common::catalog::Schema; use risingwave_common::types::JsonbVal; diff --git a/src/connector/src/source/iceberg/parquet_file_handler.rs b/src/connector/src/source/iceberg/parquet_file_handler.rs new file mode 100644 index 000000000000..146348545fbc --- /dev/null +++ b/src/connector/src/source/iceberg/parquet_file_handler.rs @@ -0,0 +1,320 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::future::IntoFuture; +use std::ops::Range; +use std::pin::Pin; +use std::sync::Arc; + +use anyhow::anyhow; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, Stream, TryFutureExt}; +use iceberg::io::{ + FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY, +}; +use iceberg::{Error, ErrorKind}; +use itertools::Itertools; +use opendal::layers::{LoggingLayer, RetryLayer}; +use opendal::services::S3; +use opendal::Operator; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{parquet_to_arrow_schema, ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::file::metadata::{FileMetaData, ParquetMetaData, ParquetMetaDataReader}; +use risingwave_common::array::arrow::IcebergArrowConvert; +use risingwave_common::array::StreamChunk; +use risingwave_common::catalog::ColumnId; +use risingwave_common::util::tokio_util::compat::FuturesAsyncReadCompatExt; +use url::Url; + +use crate::error::ConnectorResult; +use crate::parser::ParquetParser; +use crate::source::{Column, SourceColumnDesc}; + +pub struct ParquetFileReader { + meta: FileMetadata, + r: R, +} + +impl ParquetFileReader { + pub fn new(meta: FileMetadata, r: R) -> Self { + Self { meta, r } + } +} + +impl AsyncFileReader for ParquetFileReader { + fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + Box::pin( + self.r + .read(range.start as _..range.end as _) + .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))), + ) + } + + fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { + async move { + let reader = ParquetMetaDataReader::new(); + let size = self.meta.size as usize; + let meta = reader.load_and_finish(self, size).await?; + + Ok(Arc::new(meta)) + } + .boxed() + } +} + +pub async fn create_parquet_stream_builder( + s3_region: String, + s3_access_key: String, + s3_secret_key: String, + location: String, +) -> Result>, anyhow::Error> { + let mut props = HashMap::new(); + props.insert(S3_REGION, s3_region.clone()); + props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone()); + props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone()); + + let file_io_builder = FileIOBuilder::new("s3"); + let file_io = file_io_builder + .with_props(props.into_iter()) + .build() + .map_err(|e| anyhow!(e))?; + let parquet_file = file_io.new_input(&location).map_err(|e| anyhow!(e))?; + + let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?; + let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?; + let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader); + + ParquetRecordBatchStreamBuilder::new(parquet_file_reader) + .await + .map_err(|e| anyhow!(e)) +} + +pub fn new_s3_operator( + s3_region: String, + s3_access_key: String, + s3_secret_key: String, + location: String, +) -> ConnectorResult { + // Create s3 builder. + let bucket = extract_bucket(&location); + let mut builder = S3::default().bucket(&bucket).region(&s3_region); + builder = builder.secret_access_key(&s3_access_key); + builder = builder.secret_access_key(&s3_secret_key); + builder = builder.endpoint(&format!( + "https://{}.s3.{}.amazonaws.com", + bucket, s3_region + )); + + builder = builder.disable_config_load(); + + let op: Operator = Operator::new(builder)? + .layer(LoggingLayer::default()) + .layer(RetryLayer::default()) + .finish(); + + Ok(op) +} + +fn extract_bucket(location: &str) -> String { + let prefix = "s3://"; + let start = prefix.len(); + let end = location[start..] + .find('/') + .unwrap_or(location.len() - start); + location[start..start + end].to_string() +} + +pub async fn list_s3_directory( + s3_region: String, + s3_access_key: String, + s3_secret_key: String, + dir: String, +) -> Result, anyhow::Error> { + let url = Url::parse(&dir)?; + let bucket = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {}, missing bucket", dir), + ) + })?; + + let prefix = format!("s3://{}/", bucket); + if dir.starts_with(&prefix) { + let mut builder = S3::default(); + builder = builder + .region(&s3_region) + .access_key_id(&s3_access_key) + .secret_access_key(&s3_secret_key) + .bucket(bucket); + let op = Operator::new(builder)? + .layer(RetryLayer::default()) + .finish(); + + op.list(&dir[prefix.len()..]) + .await + .map_err(|e| anyhow!(e)) + .map(|list| { + list.into_iter() + .map(|entry| prefix.to_string() + entry.path()) + .collect() + }) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {}, should start with {}", dir, prefix), + ))? + } +} + +/// Extracts valid column indices from a Parquet file schema based on the user's requested schema. +/// +/// This function is used for column pruning of Parquet files. It calculates the intersection +/// between the columns in the currently read Parquet file and the schema provided by the user. +/// This is useful for reading a `RecordBatch` with the appropriate `ProjectionMask`, ensuring that +/// only the necessary columns are read. +/// +/// # Parameters +/// - `columns`: A vector of `Column` representing the user's requested schema. +/// - `metadata`: A reference to `FileMetaData` containing the schema and metadata of the Parquet file. +/// +/// # Returns +/// - A `ConnectorResult>`, which contains the indices of the valid columns in the +/// Parquet file schema that match the requested schema. If an error occurs during processing, +/// it returns an appropriate error. +pub fn extract_valid_column_indices( + columns: Option>, + metadata: &FileMetaData, +) -> ConnectorResult> { + match columns { + Some(rw_columns) => { + let parquet_column_names = metadata + .schema_descr() + .columns() + .iter() + .map(|c| c.name()) + .collect_vec(); + + let converted_arrow_schema = + parquet_to_arrow_schema(metadata.schema_descr(), metadata.key_value_metadata()) + .map_err(anyhow::Error::from)?; + + let valid_column_indices: Vec = rw_columns + .iter() + .filter_map(|column| { + parquet_column_names + .iter() + .position(|&name| name == column.name) + .and_then(|pos| { + let arrow_field = IcebergArrowConvert + .to_arrow_field(&column.name, &column.data_type) + .ok()?; + if &arrow_field == converted_arrow_schema.field(pos) { + Some(pos) + } else { + None + } + }) + }) + .collect(); + Ok(valid_column_indices) + } + None => Ok(vec![]), + } +} + +/// Reads a specified Parquet file and converts its content into a stream of chunks. +pub async fn read_parquet_file( + op: Operator, + file_name: String, + rw_columns: Option>, + parser_columns: Option>, + batch_size: usize, + offset: usize, +) -> ConnectorResult< + Pin> + Send>>, +> { + let mut reader: tokio_util::compat::Compat = op + .reader_with(&file_name) + .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. + .await? + .into_futures_async_read(..) + .await? + .compat(); + let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; + + let file_metadata = parquet_metadata.file_metadata(); + let column_indices = extract_valid_column_indices(rw_columns, file_metadata)?; + let projection_mask = ProjectionMask::leaves(file_metadata.schema_descr(), column_indices); + // For the Parquet format, we directly convert from a record batch to a stream chunk. + // Therefore, the offset of the Parquet file represents the current position in terms of the number of rows read from the file. + let record_batch_stream = ParquetRecordBatchStreamBuilder::new(reader) + .await? + .with_batch_size(batch_size) + .with_projection(projection_mask) + .with_offset(offset) + .build()?; + let converted_arrow_schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + ) + .map_err(anyhow::Error::from)?; + let columns = match parser_columns { + Some(columns) => columns, + None => converted_arrow_schema + .fields + .iter() + .enumerate() + .map(|(index, field_ref)| { + let data_type = IcebergArrowConvert.type_from_field(field_ref).unwrap(); + SourceColumnDesc::simple( + field_ref.name().clone(), + data_type, + ColumnId::new(index as i32), + ) + }) + .collect(), + }; + + let parquet_parser = ParquetParser::new(columns, file_name, offset)?; + let msg_stream: Pin< + Box> + Send>, + > = parquet_parser.into_stream(record_batch_stream); + Ok(msg_stream) +} + +pub async fn get_parquet_fields( + op: Operator, + file_name: String, +) -> ConnectorResult { + let mut reader: tokio_util::compat::Compat = op + .reader_with(&file_name) + .into_future() // Unlike `rustc`, `try_stream` seems require manual `into_future`. + .await? + .into_futures_async_read(..) + .await? + .compat(); + let parquet_metadata = reader.get_metadata().await.map_err(anyhow::Error::from)?; + + let file_metadata = parquet_metadata.file_metadata(); + let converted_arrow_schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + ) + .map_err(anyhow::Error::from)?; + let fields: risingwave_common::array::arrow::arrow_schema_udf::Fields = + converted_arrow_schema.fields; + Ok(fields) +} diff --git a/src/connector/src/source/iceberg/parquet_file_reader.rs b/src/connector/src/source/iceberg/parquet_file_reader.rs deleted file mode 100644 index 5a01f2b0ed84..000000000000 --- a/src/connector/src/source/iceberg/parquet_file_reader.rs +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright 2024 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::ops::Range; -use std::sync::Arc; - -use anyhow::anyhow; -use bytes::Bytes; -use futures::future::BoxFuture; -use futures::{FutureExt, TryFutureExt}; -use iceberg::io::{ - FileIOBuilder, FileMetadata, FileRead, S3_ACCESS_KEY_ID, S3_REGION, S3_SECRET_ACCESS_KEY, -}; -use iceberg::{Error, ErrorKind}; -use opendal::layers::RetryLayer; -use opendal::services::S3; -use opendal::Operator; -use parquet::arrow::async_reader::AsyncFileReader; -use parquet::arrow::ParquetRecordBatchStreamBuilder; -use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; -use url::Url; - -pub struct ParquetFileReader { - meta: FileMetadata, - r: R, -} - -impl ParquetFileReader { - pub fn new(meta: FileMetadata, r: R) -> Self { - Self { meta, r } - } -} - -impl AsyncFileReader for ParquetFileReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { - Box::pin( - self.r - .read(range.start as _..range.end as _) - .map_err(|err| parquet::errors::ParquetError::External(Box::new(err))), - ) - } - - fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result>> { - async move { - let reader = ParquetMetaDataReader::new(); - let size = self.meta.size as usize; - let meta = reader.load_and_finish(self, size).await?; - - Ok(Arc::new(meta)) - } - .boxed() - } -} - -pub async fn create_parquet_stream_builder( - s3_region: String, - s3_access_key: String, - s3_secret_key: String, - location: String, -) -> Result>, anyhow::Error> { - let mut props = HashMap::new(); - props.insert(S3_REGION, s3_region.clone()); - props.insert(S3_ACCESS_KEY_ID, s3_access_key.clone()); - props.insert(S3_SECRET_ACCESS_KEY, s3_secret_key.clone()); - - let file_io_builder = FileIOBuilder::new("s3"); - let file_io = file_io_builder - .with_props(props.into_iter()) - .build() - .map_err(|e| anyhow!(e))?; - let parquet_file = file_io.new_input(&location).map_err(|e| anyhow!(e))?; - - let parquet_metadata = parquet_file.metadata().await.map_err(|e| anyhow!(e))?; - let parquet_reader = parquet_file.reader().await.map_err(|e| anyhow!(e))?; - let parquet_file_reader = ParquetFileReader::new(parquet_metadata, parquet_reader); - - ParquetRecordBatchStreamBuilder::new(parquet_file_reader) - .await - .map_err(|e| anyhow!(e)) -} - -pub async fn list_s3_directory( - s3_region: String, - s3_access_key: String, - s3_secret_key: String, - dir: String, -) -> Result, anyhow::Error> { - let url = Url::parse(&dir)?; - let bucket = url.host_str().ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, missing bucket", dir), - ) - })?; - - let prefix = format!("s3://{}/", bucket); - if dir.starts_with(&prefix) { - let mut builder = S3::default(); - builder = builder - .region(&s3_region) - .access_key_id(&s3_access_key) - .secret_access_key(&s3_secret_key) - .bucket(bucket); - let op = Operator::new(builder)? - .layer(RetryLayer::default()) - .finish(); - - op.list(&dir[prefix.len()..]) - .await - .map_err(|e| anyhow!(e)) - .map(|list| { - list.into_iter() - .map(|entry| prefix.to_string() + entry.path()) - .collect() - }) - } else { - Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid s3 url: {}, should start with {}", dir, prefix), - ))? - } -} diff --git a/src/frontend/src/expr/table_function.rs b/src/frontend/src/expr/table_function.rs index cee4188e7579..c04b52832621 100644 --- a/src/frontend/src/expr/table_function.rs +++ b/src/frontend/src/expr/table_function.rs @@ -20,7 +20,9 @@ use mysql_async::consts::ColumnType as MySqlColumnType; use mysql_async::prelude::*; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::types::{DataType, ScalarImpl, StructType}; -use risingwave_connector::source::iceberg::{create_parquet_stream_builder, list_s3_directory}; +use risingwave_connector::source::iceberg::{ + get_parquet_fields, list_s3_directory, new_s3_operator, +}; pub use risingwave_pb::expr::table_function::PbType as TableFunctionType; use risingwave_pb::expr::PbTableFunction; use thiserror_ext::AsReport; @@ -184,7 +186,7 @@ impl TableFunction { let schema = tokio::task::block_in_place(|| { RUNTIME.block_on(async { - let parquet_stream_builder = create_parquet_stream_builder( + let op = new_s3_operator( eval_args[2].clone(), eval_args[3].clone(), eval_args[4].clone(), @@ -192,11 +194,18 @@ impl TableFunction { Some(files) => files[0].clone(), None => eval_args[5].clone(), }, + )?; + let fields = get_parquet_fields( + op, + match files.as_ref() { + Some(files) => files[0].clone(), + None => eval_args[5].clone(), + }, ) .await?; let mut rw_types = vec![]; - for field in parquet_stream_builder.schema().fields() { + for field in &fields { rw_types.push(( field.name().to_string(), IcebergArrowConvert.type_from_field(field)?,