Skip to content

Commit

Permalink
handle when not all row groups should be loaded from s3
Browse files Browse the repository at this point in the history
  • Loading branch information
Boruch Chalk committed May 23, 2024
1 parent c77cf72 commit 59aa55a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 30 deletions.
20 changes: 1 addition & 19 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,25 +286,7 @@ impl FetchRowGroupsFromObjectStore {

let mut prefetched: PlHashMap<usize, DownloadedRowGroup> = PlHashMap::new();

let mut row_groups = if let Some(pred) = predicate.as_deref() {
row_groups
.iter()
.enumerate()
.filter(|(i, rg)| {
let should_be_read =
matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true));

// Already add the row groups that will be skipped to the prefetched data.
if !should_be_read {
prefetched.insert(*i, Default::default());
}
should_be_read
})
.map(|(i, rg)| (i, rg.clone()))
.collect::<Vec<_>>()
} else {
row_groups.iter().cloned().enumerate().collect()
};
let mut row_groups = row_groups.iter().cloned().enumerate().collect::<Vec<_>>();
let msg_limit = get_rg_prefetch_size();

if verbose() {
Expand Down
32 changes: 21 additions & 11 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct ParquetSource {
processed_paths: usize,
iter: Range<usize>,
paths: Arc<[PathBuf]>,
total_files_read: usize,
options: ParquetOptions,
file_options: FileScanOptions,
#[allow(dead_code)]
Expand Down Expand Up @@ -173,15 +174,15 @@ impl ParquetSource {
index: usize,
n_rows: usize,
) -> PolarsResult<BatchedParquetReader> {
let metadata = self.metadata.clone();
let predicate = self.predicate.clone();
let cloud_options = self.cloud_options.clone();
let (path, options, file_options, projection, chunk_size, reader_schema, hive_partitions) =
self.prepare_init_reader(index)?;


let batched_reader = {
let uri = path.to_string_lossy();
ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), reader_schema, metadata)
ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), reader_schema, None)
.await?
.with_n_rows(Some(n_rows))
.with_row_index(file_options.row_index)
Expand All @@ -197,15 +198,14 @@ impl ParquetSource {

#[cfg(feature = "async")]
async fn num_rows_per_reader(&self, index: usize) -> PolarsResult<usize> {
let metadata = self.metadata.clone();
let predicate = self.predicate.clone();
let cloud_options = self.cloud_options.clone();
let (path, options, _file_options, projection, _chunk_size, reader_schema, hive_partitions) =
self.prepare_init_reader(index)?;

let mut reader = {
let uri = path.to_string_lossy();
ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), reader_schema, metadata)
ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), reader_schema, None)
.await?
.with_projection(projection)
.with_predicate(predicate.clone())
Expand Down Expand Up @@ -252,6 +252,7 @@ impl ParquetSource {
file_options,
iter,
paths,
total_files_read: 0,
cloud_options,
metadata,
file_info,
Expand All @@ -269,13 +270,17 @@ impl ParquetSource {
Ok(source)
}

fn try_read_more_files(self) -> bool {
false
}

fn prefetch_files(&mut self) -> PolarsResult<()> {
// We already start downloading the next file, we can only do that if we don't have a limit.
// In the case of a limit we first must update the row count with the batch results.
//
// It is important we do this for a reasonable batch size, that's why we start this when we
// have just 2 readers left.
if self.batched_readers.len() <= 2 && self.rows_left_to_read != 0
if self.batched_readers.len() <= 2 && self.rows_left_to_read != 0 && self.total_files_read != self.paths.len()
{
let range = 0..self.prefetch_size - self.batched_readers.len();

Expand All @@ -301,18 +306,21 @@ impl ParquetSource {

let num_rows_to_read = num_rows_to_read
.into_iter()
.map(|rows_per_reader| {
.zip(range)
.map(|(rows_per_reader, index )| {
self.total_files_read += 1;
if self.rows_left_to_read == 0 {
return 0;
return (index, 0);
}
self.rows_left_to_read = self.rows_left_to_read.saturating_sub(rows_per_reader);
rows_per_reader
eprintln!("Reading {} rows for index {}", rows_per_reader, index);
(index, rows_per_reader)
})
.filter(|(index, rows_per_reader)| *rows_per_reader != 0)
.collect::<Vec<_>>();

let init_iter = range
let init_iter = num_rows_to_read
.into_iter()
.zip(num_rows_to_read)
.map(|(index, num_rows)| self.init_reader_async(index, num_rows));

let batched_readers = polars_io::pl_async::get_runtime()
Expand All @@ -339,7 +347,9 @@ impl Source for ParquetSource {
self.prefetch_files()?;

let Some(mut reader) = self.batched_readers.pop_front() else {
// If there was no new reader, we depleted all of them and are finished.
if self.total_files_read != self.paths.len() && self.rows_left_to_read != 0 {
return self.get_batches(_context);
}
return Ok(SourceResult::Finished);
};

Expand Down

0 comments on commit 59aa55a

Please sign in to comment.