-
Notifications
You must be signed in to change notification settings - Fork 794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deprecate MetadataLoader
#6474
Deprecate MetadataLoader
#6474
Changes from all commits
5843f45
cb1498a
5f07723
7e669cc
8068a31
388cf5a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -212,16 +212,18 @@ impl ArrowReaderMetadata { | |
input: &mut T, | ||
options: ArrowReaderOptions, | ||
) -> Result<Self> { | ||
// TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not only is this akward, it is also a common source of confusion / bugs (namely that when someone supplies the ParquetMetaData to the arrow reader options to avoid a second object store request, if often turns out the second fetch happens anyways to read the page index (thus obviating the attempt at optimization) To avoid this they need to ensure when they read the metadata in the first place, they also read the page index. This is (in a roundabout way) what is happening to @progval in apache/datafusion#12593 I will try and file a ticket explaining the issue There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Filed #6476 |
||
// took an argument to fetch the page indexes. | ||
let mut metadata = input.get_metadata().await?; | ||
|
||
if options.page_index | ||
&& metadata.column_index().is_none() | ||
&& metadata.offset_index().is_none() | ||
{ | ||
let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone()); | ||
let mut loader = MetadataLoader::new(input, m); | ||
loader.load_page_index(true, true).await?; | ||
metadata = Arc::new(loader.finish()) | ||
let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); | ||
reader.load_page_index(input).await?; | ||
metadata = Arc::new(reader.finish()?) | ||
} | ||
Self::try_new(metadata, options) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt}; | |
|
||
use object_store::{ObjectMeta, ObjectStore}; | ||
|
||
use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader}; | ||
use crate::arrow::async_reader::AsyncFileReader; | ||
use crate::errors::Result; | ||
use crate::file::metadata::ParquetMetaData; | ||
use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; | ||
|
||
/// Reads Parquet files in object storage using [`ObjectStore`]. | ||
/// | ||
|
@@ -124,15 +124,14 @@ impl AsyncFileReader for ParquetObjectReader { | |
|
||
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> { | ||
Box::pin(async move { | ||
let preload_column_index = self.preload_column_index; | ||
let preload_offset_index = self.preload_offset_index; | ||
let file_size = self.meta.size; | ||
let prefetch = self.metadata_size_hint; | ||
let mut loader = MetadataLoader::load(self, file_size, prefetch).await?; | ||
loader | ||
.load_page_index(preload_column_index, preload_offset_index) | ||
let metadata = ParquetMetaDataReader::new() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
.with_column_indexes(self.preload_column_index) | ||
.with_offset_indexes(self.preload_offset_index) | ||
.with_prefetch_hint(self.metadata_size_hint) | ||
.load_and_finish(self, file_size) | ||
.await?; | ||
Ok(Arc::new(loader.finish())) | ||
Ok(Arc::new(metadata)) | ||
}) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -329,13 +329,18 @@ impl ParquetMetaDataReader { | |
return Ok(()); | ||
} | ||
|
||
self.load_page_index(fetch, remainder).await | ||
self.load_page_index_with_remainder(fetch, remainder).await | ||
} | ||
|
||
/// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already | ||
/// been obtained. See [`Self::new_with_metadata()`]. | ||
#[cfg(feature = "async")] | ||
pub async fn load_page_index<F: MetadataFetch>( | ||
pub async fn load_page_index<F: MetadataFetch>(&mut self, fetch: F) -> Result<()> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change will be breaking if in 52.2.0. I'll revert this if need be. I found having |
||
self.load_page_index_with_remainder(fetch, None).await | ||
} | ||
|
||
#[cfg(feature = "async")] | ||
async fn load_page_index_with_remainder<F: MetadataFetch>( | ||
&mut self, | ||
mut fetch: F, | ||
remainder: Option<(usize, Bytes)>, | ||
|
@@ -836,7 +841,7 @@ mod async_tests { | |
|
||
struct MetadataFetchFn<F>(F); | ||
|
||
impl<F, Fut> MetadataFetch for MetadataFetchFn<F> | ||
impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn<F> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think now that the API has been changed back, these test changes are also not needed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change allows me to wrap the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in #6484 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought reverting them was nice to demonstrate the same API can still be used I did notice that the metadata loader tests actually have a copy/paste |
||
where | ||
F: FnMut(Range<usize>) -> Fut + Send, | ||
Fut: Future<Output = Result<Bytes>> + Send, | ||
|
@@ -865,74 +870,68 @@ mod async_tests { | |
let expected = expected.file_metadata().schema(); | ||
let fetch_count = AtomicUsize::new(0); | ||
|
||
let mut fetch = |range| { | ||
let fetch = |range| { | ||
fetch_count.fetch_add(1, Ordering::SeqCst); | ||
futures::future::ready(read_range(&mut file, range)) | ||
}; | ||
|
||
let input = MetadataFetchFn(&mut fetch); | ||
let mut f = MetadataFetchFn(fetch); | ||
let actual = ParquetMetaDataReader::new() | ||
.load_and_finish(input, len) | ||
.load_and_finish(&mut f, len) | ||
.await | ||
.unwrap(); | ||
assert_eq!(actual.file_metadata().schema(), expected); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 2); | ||
|
||
// Metadata hint too small - below footer size | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let input = MetadataFetchFn(&mut fetch); | ||
let actual = ParquetMetaDataReader::new() | ||
.with_prefetch_hint(Some(7)) | ||
.load_and_finish(input, len) | ||
.load_and_finish(&mut f, len) | ||
.await | ||
.unwrap(); | ||
assert_eq!(actual.file_metadata().schema(), expected); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 2); | ||
|
||
// Metadata hint too small | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let input = MetadataFetchFn(&mut fetch); | ||
let actual = ParquetMetaDataReader::new() | ||
.with_prefetch_hint(Some(10)) | ||
.load_and_finish(input, len) | ||
.load_and_finish(&mut f, len) | ||
.await | ||
.unwrap(); | ||
assert_eq!(actual.file_metadata().schema(), expected); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 2); | ||
|
||
// Metadata hint too large | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let input = MetadataFetchFn(&mut fetch); | ||
let actual = ParquetMetaDataReader::new() | ||
.with_prefetch_hint(Some(500)) | ||
.load_and_finish(input, len) | ||
.load_and_finish(&mut f, len) | ||
.await | ||
.unwrap(); | ||
assert_eq!(actual.file_metadata().schema(), expected); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 1); | ||
|
||
// Metadata hint exactly correct | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let input = MetadataFetchFn(&mut fetch); | ||
let actual = ParquetMetaDataReader::new() | ||
.with_prefetch_hint(Some(428)) | ||
.load_and_finish(input, len) | ||
.load_and_finish(&mut f, len) | ||
.await | ||
.unwrap(); | ||
assert_eq!(actual.file_metadata().schema(), expected); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 1); | ||
|
||
let input = MetadataFetchFn(&mut fetch); | ||
let err = ParquetMetaDataReader::new() | ||
.load_and_finish(input, 4) | ||
.load_and_finish(&mut f, 4) | ||
.await | ||
.unwrap_err() | ||
.to_string(); | ||
assert_eq!(err, "EOF: file size of 4 is less than footer"); | ||
|
||
let input = MetadataFetchFn(&mut fetch); | ||
let err = ParquetMetaDataReader::new() | ||
.load_and_finish(input, 20) | ||
.load_and_finish(&mut f, 20) | ||
.await | ||
.unwrap_err() | ||
.to_string(); | ||
|
@@ -949,42 +948,39 @@ mod async_tests { | |
futures::future::ready(read_range(&mut file, range)) | ||
}; | ||
|
||
let f = MetadataFetchFn(&mut fetch); | ||
let mut f = MetadataFetchFn(&mut fetch); | ||
let mut loader = ParquetMetaDataReader::new().with_page_indexes(true); | ||
loader.try_load(f, len).await.unwrap(); | ||
loader.try_load(&mut f, len).await.unwrap(); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 3); | ||
let metadata = loader.finish().unwrap(); | ||
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); | ||
|
||
// Prefetch just footer exactly | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let f = MetadataFetchFn(&mut fetch); | ||
let mut loader = ParquetMetaDataReader::new() | ||
.with_page_indexes(true) | ||
.with_prefetch_hint(Some(1729)); | ||
loader.try_load(f, len).await.unwrap(); | ||
loader.try_load(&mut f, len).await.unwrap(); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 2); | ||
let metadata = loader.finish().unwrap(); | ||
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); | ||
|
||
// Prefetch more than footer but not enough | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let f = MetadataFetchFn(&mut fetch); | ||
let mut loader = ParquetMetaDataReader::new() | ||
.with_page_indexes(true) | ||
.with_prefetch_hint(Some(130649)); | ||
loader.try_load(f, len).await.unwrap(); | ||
loader.try_load(&mut f, len).await.unwrap(); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 2); | ||
let metadata = loader.finish().unwrap(); | ||
assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); | ||
|
||
// Prefetch exactly enough | ||
fetch_count.store(0, Ordering::SeqCst); | ||
let f = MetadataFetchFn(&mut fetch); | ||
let metadata = ParquetMetaDataReader::new() | ||
.with_page_indexes(true) | ||
.with_prefetch_hint(Some(130650)) | ||
.load_and_finish(f, len) | ||
.load_and_finish(&mut f, len) | ||
.await | ||
.unwrap(); | ||
assert_eq!(fetch_count.load(Ordering::SeqCst), 1); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍