Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet: Add support for user-provided metadata loaders #12593

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

progval
Copy link
Contributor

@progval progval commented Sep 23, 2024

Which issue does this PR close?

Closes #12592.

Rationale for this change

This allows users to, for example, cache the Page Index so it does not need to be parsed every time we open the file.

If have a demo here: https://gitlab.softwareheritage.org/swh/devel/swh-provenance/-/merge_requests/182 , the key thing being a CachingParquetFormatFactory/CachingParquetFormat pair that acts like ParquetFormatFactory/ParquetFormat but they call ParquetExecBuilder::with_parquet_file_reader_factory to a file reader factory that keeps a pool of readers (keyed by file path). It gives a significant improvement on the time_elapsed_opening metric:

statistics_eval_time=775.283531ms, row_pushdown_eval_time=191ns, time_elapsed_scanning_until_data=16.463873ms, time_elapsed_processing=401.16043928s, time_elapsed_opening=473.912060225s, page_index_eval_time=24.19564ms, metadata_load_time=410.282903954s, bloom_filter_eval_time=62.038167937s, time_elapsed_scanning_total=16.516948ms

to

row_pushdown_eval_time=191ns, time_elapsed_scanning_until_data=1.143893882s, time_elapsed_processing=34.501308377s, time_elapsed_opening=67.214927741s, page_index_eval_time=32.002383ms, metadata_load_time=3.411246ms, bloom_filter_eval_time=66.495163165s, statistics_eval_time=1.255058963s, time_elapsed_scanning_total=1.143978104s

What changes are included in this PR?

  • Renamed ParquetFileReader struct to DefaultParquetFileReader
  • Add new ParquetFileReader trait that extends AsyncFileReader with a load_metadata method.
  • Call it from <ParquetOpener as FileOpener>::open

Are these changes tested?

Not within the repo. Should I add a new module datafusion-examples/ adapted from my demo above.

Are there any user-facing changes?

Breaking change for any user who implements ParquetFileReaderFactory.

This allows users to, for example, cache the Page Index so it does not
need to be parsed every time we open the file.
@alamb
Copy link
Contributor

alamb commented Sep 25, 2024

I plan to review this later today

@alamb alamb added the api change Changes the API exposed to users of the crate label Sep 25, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this contribution @progval

I went through this change and I am not sure what can not be done with the existing APIs. The example you cite:

This allows users to, for example, cache the Page Index so it does not need to be parsed every time we open the file.

I believe the existing AsyncReader::get_metadata already permits this and it is actually already done by the advanced_parquet_index.rs example

let metadata = self
.metadata
.get(&filename)
.expect("metadata for file not found: {filename}");
Ok(Box::new(ParquetReaderWithCache {
filename,
metadata: Arc::clone(metadata),
inner,
}))

One thing that I found very unclear when working with this code at first, is that if the ParquetMetaData returned by AsyncReader::get_metadata does not already have the page and offset index loaded the ArrowReader will in fact try and read the index information with new requests (doc).

When building the cache in the example, it was important to ensure that the page indexes were read as part of the initial load here:

let options = ArrowReaderOptions::new()
// Load the page index when reading metadata to cache
// so it is available to interpret row selections
.with_page_index(true);
let reader =
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?;
let metadata = reader.metadata().clone();
let schema = reader.schema().clone();

Perhaps your system needs to do something similar?

@alamb
Copy link
Contributor

alamb commented Sep 25, 2024

I looked a bit at https://gitlab.softwareheritage.org/swh/devel/swh-provenance/-/merge_requests/182/diffs

I wonder if you could cache the Arc rather than the ArrowReaderMetadta 🤔 (I find the maze of similarly named "Reader/loader/Arrow/metadata" structures hard to keep straight sometimes

@progval
Copy link
Contributor Author

progval commented Sep 25, 2024

I agree, it's confusing.

I actually started from advanced_parquet_index.rs, but it appeared to be insufficient.

But reading this again, I agree with you that it should be. I'll give it another shot.

@progval
Copy link
Contributor Author

progval commented Sep 25, 2024

I remembered / rediscovered the issue. It's in this code:

https://github.com/apache/arrow-rs/blob/62825b27e98e6719cb66258535c75c7490ddba44/parquet/src/arrow/async_reader/mod.rs#L212-L228

ArrowReaderMetadata::load_metadata does two expensive things:

  1. call AsyncFileReader::get_metadata() to get a ParquetMetadata, which we can cache as per advanced_parquet_index.rs
  2. instantiate and use parquet::MetadataLoader to get another ParquetMetadata from the first one. And this new ParquetMetadata isn't cachable without my patch.

in essence, it's enriching the metadata struct by doing: let metadata = MetadataLoader::new(..., metadata).load_page_index(...).finish()

@progval
Copy link
Contributor Author

progval commented Sep 25, 2024

To give an idea of the difference. Metrics with the new load_metadata cached:

time_elapsed_opening=165.253279952s, time_elapsed_processing=36.211909412s, time_elapsed_scanning_total=605.765021ms, time_elapsed_scanning_until_data=605.726082ms

Metrics with only get_metadata cached:

time_elapsed_opening=272.811692949s, time_elapsed_processing=118.675115374s, time_elapsed_scanning_total=2.530368545s, time_elapsed_scanning_until_data=2.530322168s

@alamb alamb marked this pull request as draft September 25, 2024 21:01
@alamb alamb marked this pull request as ready for review September 25, 2024 21:01
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remembered / rediscovered the issue. It's in this code:

Right -- I agree this is confusing.

I think you can work around it by ensuring that the ParquetMetadata that is passed back already has the page_index loaded -- so MetadataLoader::new()....load_page_index(true).build is a noop

However, given I hit the basically the exact same thing when writing the advanced parquet index, and now you have hit it as well, I think changing / making it clearer how to handle this is in order

I actually like the idea of a PR / change in behavior to stop this automatic fetching from happening (as presumably the reason for using a custom loader is to avoid having to fetch the metadata!)

Maybe we can just change the code to not fetch the metadata (and just use what is passed back)

}

/// [`AsyncFileReader`] augmented with a method to customize how file metadata is loaded.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can use one of the (many!) already existing APIs /traits for loading the metadata. The number of layers of abstraction are already pretty mind boggling. Adding yet another one seems unweildy. Or maybe we could just change the code not to call the loader if it has metadata 🤔

/// Returns a [`AsyncFileReader`] trait object
///
/// This can usually be implemented as `Box::new(*self)`
fn upcast(self: Box<Self>) -> Box<dyn AsyncFileReader + 'static>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is unfortunate, but I also could not figire out a way without it

@alamb
Copy link
Contributor

alamb commented Sep 25, 2024

I wonder if we could change the "automatically load page index if needed" to "error if page index is needed but it is not loaded" 🤔 That might be a less surprising behavior

@progval
Copy link
Contributor Author

progval commented Sep 26, 2024

I wonder if we could change the "automatically load page index if needed" to "error if page index is needed but it is not loaded" 🤔 That might be a less surprising behavior

I agree, and it would work for my use case because I always want the page index. I did it like this:

Click to unfold
     fn get_metadata(&mut self) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
-        self.inner.get_metadata()
+        Box::pin(self.get_metadata_async())
     }
 
     fn get_byte_ranges(
@@ -117,6 +118,55 @@ impl AsyncFileReader for CachingParquetFileReader {
     }
 }
 
+impl CachingParquetFileReader {
+    /// Implementation of [`AsyncFileReader::get_metadata`] using new-style async,
+    /// so it can pass the borrow checker
+    async fn get_metadata_async(&mut self) -> parquet::errors::Result<Arc<ParquetMetaData>> {
+        match &self.metadata {
+            Some(metadata) => Ok(Arc::clone(metadata)),
+            None => match self.inner.get_metadata().await {
+                Ok(metadata) => {
+                    // This function is called by `ArrowReaderMetadata::load_async`.
+                    // Then, `load_async` may enrich the `ParquetMetaData` we return with
+                    // the page index, using `MetadataLoader`; and this enriched
+                    // `ParquetMetaData` reader would not be cached.
+                    //
+                    // Datafusion does not (currently) support caching the enriched
+                    // `ParquetMetaData`, so we unconditionally enrich it here with
+                    // the page index, so we can cache it.
+                    //
+                    // See:
+                    // * discussion on https://github.com/apache/datafusion/pull/12593
+                    // * https://github.com/apache/arrow-rs/blob/62825b27e98e6719cb66258535c75c7490ddba44/parquet/src/arrow/async_reader/mod.rs#L212-L228
+                    let metadata = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
+                    let mut loader = MetadataLoader::new(
+                        CachingParquetFileReaderMetadataFetch(self),
+                        metadata,
+                    );
+                    loader.load_page_index(true, true).await?;
+                    let metadata = Arc::new(loader.finish());
+                    self.metadata = Some(Arc::clone(&metadata));
+                    Ok(metadata)
+                }
+                Err(e) => Err(e),
+            },
+        }
+    }
+}
+
+struct CachingParquetFileReaderMetadataFetch<'a>(&'a mut CachingParquetFileReader);
+
+impl<'a> MetadataFetch for CachingParquetFileReaderMetadataFetch<'a> {
+    fn fetch(
+        &mut self,
+        range: Range<usize>,
+    ) -> BoxFuture<'_, parquet::errors::Result<bytes::Bytes>> {
+        println!("fetch");
+        self.0.fetch(range)
+    }
+}

(For some reason it's a bit slower even though it does cache as it should. I'll investigate in two weeks)

But for a generic solution, the FileOpener would need to tell AsyncFileReader::get_metadata (through ArrowReaderMetadata) whether the page index is needed, so this needs an API change (or at least addition) to Arrow. But since the parquet crate is being refactored, this may be the right time.

@alamb
Copy link
Contributor

alamb commented Sep 26, 2024

Thanks @progval -- I will find time to review this more carefully tomorrow.

My primary concerns are with the API being even more complicated to use correctly than the current one (the upcast thing is confusing and "feels" wrong to me, as does the need for another API in DataFusion that wraps an already complicated API in parquet.

I am hopeful we can come up with something more elegant

@progval
Copy link
Contributor Author

progval commented Oct 1, 2024

(For some reason it's a bit slower even though it does cache as it should. I'll investigate in two weeks)

Actually, it's not slower. I must have had a noisy benchmark or bad luck eyeballing the mean request time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

parquet: Add support for user-provided metadata loaders
2 participants