-
Notifications
You must be signed in to change notification settings - Fork 794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deprecate MetadataLoader
#6474
Deprecate MetadataLoader
#6474
Changes from 4 commits
5843f45
cb1498a
5f07723
7e669cc
8068a31
388cf5a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -212,16 +212,18 @@ impl ArrowReaderMetadata { | |
input: &mut T, | ||
options: ArrowReaderOptions, | ||
) -> Result<Self> { | ||
// TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not only is this akward, it is also a common source of confusion / bugs (namely that when someone supplies the ParquetMetaData to the arrow reader options to avoid a second object store request, if often turns out the second fetch happens anyways to read the page index (thus obviating the attempt at optimization) To avoid this they need to ensure when they read the metadata in the first place, they also read the page index. This is (in a roundabout way) what is happening to @progval in apache/datafusion#12593 I will try and file a ticket explaining the issue There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filed #6476 |
||
// took an argument to fetch the page indexes. | ||
let mut metadata = input.get_metadata().await?; | ||
|
||
if options.page_index | ||
&& metadata.column_index().is_none() | ||
&& metadata.offset_index().is_none() | ||
{ | ||
let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone()); | ||
let mut loader = MetadataLoader::new(input, m); | ||
loader.load_page_index(true, true).await?; | ||
metadata = Arc::new(loader.finish()) | ||
let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); | ||
reader.load_page_index(input).await?; | ||
metadata = Arc::new(reader.finish()?) | ||
} | ||
Self::try_new(metadata, options) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt}; | |
|
||
use object_store::{ObjectMeta, ObjectStore}; | ||
|
||
use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader}; | ||
use crate::arrow::async_reader::AsyncFileReader; | ||
use crate::errors::Result; | ||
use crate::file::metadata::ParquetMetaData; | ||
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; | ||
|
||
/// Reads Parquet files in object storage using [`ObjectStore`]. | ||
/// | ||
|
@@ -124,15 +124,13 @@ impl AsyncFileReader for ParquetObjectReader { | |
|
||
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> { | ||
Box::pin(async move { | ||
let preload_column_index = self.preload_column_index; | ||
let preload_offset_index = self.preload_offset_index; | ||
let file_size = self.meta.size; | ||
let prefetch = self.metadata_size_hint; | ||
let mut loader = MetadataLoader::load(self, file_size, prefetch).await?; | ||
loader | ||
.load_page_index(preload_column_index, preload_offset_index) | ||
let metadata = ParquetMetaDataReader::new() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
.with_column_indexes(self.preload_column_index) | ||
.with_offset_indexes(self.preload_offset_index) | ||
.with_prefetch_hint(self.metadata_size_hint) | ||
.load_and_finish(self, self.meta.size) | ||
.await?; | ||
Ok(Arc::new(loader.finish())) | ||
Ok(Arc::new(metadata)) | ||
}) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -299,11 +299,14 @@ impl ParquetMetaDataReader { | |
/// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches | ||
/// performed by this function. | ||
#[cfg(feature = "async")] | ||
pub async fn load_and_finish<F: MetadataFetch>( | ||
pub async fn load_and_finish<F>( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since this API is new and has not yet been released, this is not a breaking API change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Specifically There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which is why I was rushing to get this into 53.1.0. 😄 But if we revert this change then this PR can merge after 53.1.0 is released. |
||
mut self, | ||
fetch: F, | ||
fetch: &mut F, | ||
file_size: usize, | ||
) -> Result<ParquetMetaData> { | ||
) -> Result<ParquetMetaData> | ||
where | ||
for<'a> &'a mut F: MetadataFetch, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
What don't you like about it? Is the idea that I think in general trying to get async functions to be happy with references is often much harder than with owned objects. So I guess I think we should keep the API as owned to make working with these APIs simpler, unless there is some compelling reason for the change (better performance, etc) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Some of it is my lack of Rust experience. I'm having a hard time tracking the ownership here, and am confusing traits with objects. Implementing the
Indeed, it took me days to get this to compile 😅.
I'll revert now that I understand the mechanics a little better. But one consequence is some things will get a little more verbose. For instance, when not taking ownership, fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let metadata = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint)
.load_and_finish(self, self.meta.size)
.await?;
Ok(Arc::new(metadata))
})
} But if fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let file_size = self.meta.size;
let metadata = ParquetMetaDataReader::new()
.with_column_indexes(self.preload_column_index)
.with_offset_indexes(self.preload_offset_index)
.with_prefetch_hint(self.metadata_size_hint)
.load_and_finish(self, file_size)
.await?;
Ok(Arc::new(metadata))
})
} A small price to pay, but one I was hoping to avoid. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I agree it would be nicer. While I don't fully understand all the implications here, this is in my mind related to how how the rust compiler generates async continuations and how it interacts with the borrow checker. Sometimes it has a hard time expressing that a borrow is ok for some reason |
||
{ | ||
self.try_load(fetch, file_size).await?; | ||
self.finish() | ||
} | ||
|
@@ -314,13 +317,12 @@ impl ParquetMetaDataReader { | |
/// See [`Self::with_prefetch_hint`] for a discussion of how to reduce the number of fetches | ||
/// performed by this function. | ||
#[cfg(feature = "async")] | ||
pub async fn try_load<F: MetadataFetch>( | ||
&mut self, | ||
mut fetch: F, | ||
file_size: usize, | ||
) -> Result<()> { | ||
pub async fn try_load<F>(&mut self, fetch: &mut F, file_size: usize) -> Result<()> | ||
where | ||
for<'a> &'a mut F: MetadataFetch, | ||
{ | ||
let (metadata, remainder) = | ||
Self::load_metadata(&mut fetch, file_size, self.get_prefetch_size()).await?; | ||
Self::load_metadata(fetch, file_size, self.get_prefetch_size()).await?; | ||
|
||
self.metadata = Some(metadata); | ||
|
||
|
@@ -329,17 +331,28 @@ impl ParquetMetaDataReader { | |
return Ok(()); | ||
} | ||
|
||
self.load_page_index(fetch, remainder).await | ||
self.load_page_index_with_remainder(fetch, remainder).await | ||
} | ||
|
||
/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already | ||
/// been obtained. See [`Self::new_with_metadata()`]. | ||
#[cfg(feature = "async")] | ||
pub async fn load_page_index<F: MetadataFetch>( | ||
pub async fn load_page_index<F>(&mut self, fetch: &mut F) -> Result<()> | ||
where | ||
for<'a> &'a mut F: MetadataFetch, | ||
{ | ||
self.load_page_index_with_remainder(fetch, None).await | ||
} | ||
|
||
#[cfg(feature = "async")] | ||
async fn load_page_index_with_remainder<F>( | ||
&mut self, | ||
mut fetch: F, | ||
mut fetch: &mut F, | ||
remainder: Option<(usize, Bytes)>, | ||
) -> Result<()> { | ||
) -> Result<()> | ||
where | ||
for<'a> &'a mut F: MetadataFetch, | ||
{ | ||
if self.metadata.is_none() { | ||
return Err(general_err!("Footer metadata is not present")); | ||
} | ||
|
@@ -494,11 +507,14 @@ impl ParquetMetaDataReader { | |
} | ||
|
||
#[cfg(feature = "async")] | ||
async fn load_metadata<F: MetadataFetch>( | ||
fetch: &mut F, | ||
async fn load_metadata<F>( | ||
mut fetch: &mut F, | ||
file_size: usize, | ||
prefetch: usize, | ||
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> { | ||
) -> Result<(ParquetMetaData, Option<(usize, Bytes)>)> | ||
where | ||
for<'a> &'a mut F: MetadataFetch, | ||
{ | ||
if file_size < FOOTER_SIZE { | ||
return Err(eof_err!("file size of {} is less than footer", file_size)); | ||
} | ||
|
@@ -836,7 +852,7 @@ mod async_tests { | |
|
||
struct MetadataFetchFn<F>(F); | ||
|
||
impl<F, Fut> MetadataFetch for MetadataFetchFn<F> | ||
impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn<F> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think now that the API has been changed back, these test changes are also not needed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change allows me to wrap the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in #6484 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought reverting them was nice to demonstrate the same API can still be used I did notice that the metadata loader tests actually have a copy/paste |
||
where | ||
F: FnMut(Range<usize>) -> Fut + Send, | ||
Fut: Future<Output = Result<Bytes>> + Send, | ||
|
@@ -865,74 +881,68 @@ mod async_tests { | |
let expected = expected.file_metadata().schema(); | ||
let fetch_count = AtomicUsize::new(0); | ||
|
||
let mut fetch = |range| { | ||
let fetch = |range| { | ||
fetch_count.fetch_add(1, Ordering::SeqCst); | ||
futures::future::ready(read_range(&mut file, range)) | ||
}; | ||
|
||
let input = MetadataFetchFn(&mut fetch); | ||
let mut f = MetadataFetchFn(fetch); | ||
let actual = ParquetMetaDataReader::new() | ||
.load_and_finish(input, len) | ||
.load_and_finish(&mut f, len) | ||
.await | ||
.unwrap(); | ||
assert_eq!(actual.file_metadata().schema(), expected); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 2); | ||
|
||
// Metadata hint too small - below footer size | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let input = MetadataFetchFn(&mut fetch); | ||
let actual = ParquetMetaDataReader::new() | ||
.with_prefetch_hint(Some(7)) | ||
.load_and_finish(input, len) | ||
.load_and_finish(&mut f, len) | ||
.await | ||
.unwrap(); | ||
assert_eq!(actual.file_metadata().schema(), expected); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 2); | ||
|
||
// Metadata hint too small | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let input = MetadataFetchFn(&mut fetch); | ||
let actual = ParquetMetaDataReader::new() | ||
.with_prefetch_hint(Some(10)) | ||
.load_and_finish(input, len) | ||
.load_and_finish(&mut f, len) | ||
.await | ||
.unwrap(); | ||
assert_eq!(actual.file_metadata().schema(), expected); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 2); | ||
|
||
// Metadata hint too large | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let input = MetadataFetchFn(&mut fetch); | ||
let actual = ParquetMetaDataReader::new() | ||
.with_prefetch_hint(Some(500)) | ||
.load_and_finish(input, len) | ||
.load_and_finish(&mut f, len) | ||
.await | ||
.unwrap(); | ||
assert_eq!(actual.file_metadata().schema(), expected); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 1); | ||
|
||
// Metadata hint exactly correct | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let input = MetadataFetchFn(&mut fetch); | ||
let actual = ParquetMetaDataReader::new() | ||
.with_prefetch_hint(Some(428)) | ||
.load_and_finish(input, len) | ||
.load_and_finish(&mut f, len) | ||
.await | ||
.unwrap(); | ||
assert_eq!(actual.file_metadata().schema(), expected); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 1); | ||
|
||
let input = MetadataFetchFn(&mut fetch); | ||
let err = ParquetMetaDataReader::new() | ||
.load_and_finish(input, 4) | ||
.load_and_finish(&mut f, 4) | ||
.await | ||
.unwrap_err() | ||
.to_string(); | ||
assert_eq!(err, "EOF: file size of 4 is less than footer"); | ||
|
||
let input = MetadataFetchFn(&mut fetch); | ||
let err = ParquetMetaDataReader::new() | ||
.load_and_finish(input, 20) | ||
.load_and_finish(&mut f, 20) | ||
.await | ||
.unwrap_err() | ||
.to_string(); | ||
|
@@ -949,42 +959,39 @@ mod async_tests { | |
futures::future::ready(read_range(&mut file, range)) | ||
}; | ||
|
||
let f = MetadataFetchFn(&mut fetch); | ||
let mut f = MetadataFetchFn(&mut fetch); | ||
let mut loader = ParquetMetaDataReader::new().with_page_indexes(true); | ||
loader.try_load(f, len).await.unwrap(); | ||
loader.try_load(&mut f, len).await.unwrap(); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 3); | ||
let metadata = loader.finish().unwrap(); | ||
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); | ||
|
||
// Prefetch just footer exactly | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let f = MetadataFetchFn(&mut fetch); | ||
let mut loader = ParquetMetaDataReader::new() | ||
.with_page_indexes(true) | ||
.with_prefetch_hint(Some(1729)); | ||
loader.try_load(f, len).await.unwrap(); | ||
loader.try_load(&mut f, len).await.unwrap(); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 2); | ||
let metadata = loader.finish().unwrap(); | ||
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); | ||
|
||
// Prefetch more than footer but not enough | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let f = MetadataFetchFn(&mut fetch); | ||
let mut loader = ParquetMetaDataReader::new() | ||
.with_page_indexes(true) | ||
.with_prefetch_hint(Some(130649)); | ||
loader.try_load(f, len).await.unwrap(); | ||
loader.try_load(&mut f, len).await.unwrap(); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 2); | ||
let metadata = loader.finish().unwrap(); | ||
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); | ||
|
||
// Prefetch exactly enough | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let f = MetadataFetchFn(&mut fetch); | ||
let metadata = ParquetMetaDataReader::new() | ||
.with_page_indexes(true) | ||
.with_prefetch_hint(Some(130650)) | ||
.load_and_finish(f, len) | ||
.load_and_finish(&mut f, len) | ||
.await | ||
.unwrap(); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 1); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍