From 0a1944fc23af7bc02d039808205043d02c207781 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 10 Jun 2024 11:57:19 -0700 Subject: [PATCH] feat: pickle the manifest when pickling a dataset (#2459) This allows us to send the pickled dataset across IPC and not require a reload of the manifest on the remote. It should be safe since manifests are read only. It uses the existing protobuf serialization for manifests. If the manifest contains a dictionary field then we still need to load the dictionary information on the remote. Perhaps we can pickle the dictionary at some point (or, with lv2, maybe dictionary won't need to be in manifest) --- python/python/lance/dataset.py | 18 +++++-- python/src/dataset.rs | 13 ++++- rust/lance-core/src/datatypes/field.rs | 5 ++ rust/lance-core/src/datatypes/schema.rs | 4 ++ rust/lance-table/src/format/manifest.rs | 8 +++ rust/lance/src/dataset.rs | 31 +++++++---- rust/lance/src/dataset/builder.rs | 70 ++++++++++++++++++------- 7 files changed, 115 insertions(+), 34 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 0cd862854c..9b5f4f0857 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -161,6 +161,7 @@ def __init__( metadata_cache_size: Optional[int] = None, commit_lock: Optional[CommitLock] = None, storage_options: Optional[Dict[str, str]] = None, + serialized_manifest: Optional[bytes] = None, ): uri = os.fspath(uri) if isinstance(uri, Path) else uri self._uri = uri @@ -172,17 +173,26 @@ def __init__( metadata_cache_size, commit_lock, storage_options, + serialized_manifest, ) + @classmethod + def __deserialize__(cls, uri: str, version: int, manifest: bytes): + return cls(uri, version, serialized_manifest=manifest) + def __reduce__(self): - return LanceDataset, (self.uri, self._ds.version()) + return type(self).__deserialize__, ( + self.uri, + self._ds.version(), + self._ds.serialized_manifest(), + ) def __getstate__(self): - return self.uri, self._ds.version() + return self.uri, self._ds.version(), self._ds.serialized_manifest() def __setstate__(self, state): - self._uri, version = state - self._ds = _Dataset(self._uri, version) + self._uri, version, manifest = state + self._ds = _Dataset(self._uri, version, manifest=manifest) def __copy__(self): ds = LanceDataset.__new__(LanceDataset) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index f075685bbb..ab3b0dca9d 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -54,7 +54,7 @@ use lance_table::io::commit::CommitHandler; use object_store::path::Path; use pyo3::exceptions::{PyStopIteration, PyTypeError}; use pyo3::prelude::*; -use pyo3::types::{PyList, PySet, PyString}; +use pyo3::types::{PyBytes, PyList, PySet, PyString}; use pyo3::{ exceptions::{PyIOError, PyKeyError, PyValueError}, pyclass, @@ -63,6 +63,7 @@ use pyo3::{ }; use snafu::{location, Location}; +use crate::error::PythonErrorExt; use crate::fragment::{FileFragment, FragmentMetadata}; use crate::schema::LanceSchema; use crate::session::Session; @@ -276,6 +277,7 @@ pub struct Dataset { #[pymethods] impl Dataset { + #[allow(clippy::too_many_arguments)] #[new] fn new( uri: String, @@ -285,6 +287,7 @@ impl Dataset { metadata_cache_size: Option, commit_handler: Option, storage_options: Option>, + manifest: Option<&[u8]>, ) -> PyResult { let mut params = ReadParams { index_cache_size: index_cache_size.unwrap_or(DEFAULT_INDEX_CACHE_SIZE), @@ -308,6 +311,9 @@ impl Dataset { if let Some(storage_options) = storage_options { builder = builder.with_storage_options(storage_options); } + if let Some(manifest) = manifest { + builder = builder.with_serialized_manifest(manifest).infer_error()?; + } let dataset = RT.runtime.block_on(builder.load()); @@ -351,6 +357,11 @@ impl Dataset { }) } + fn serialized_manifest(&self, py: Python) -> PyObject { + let manifest_bytes = self.ds.manifest().serialized(); + PyBytes::new(py, &manifest_bytes).into() + } + /// Load index metadata fn load_indices(self_: PyRef<'_, Self>) -> PyResult> { let index_metadata = RT diff --git a/rust/lance-core/src/datatypes/field.rs b/rust/lance-core/src/datatypes/field.rs index a9517eb97e..c94041a9e6 100644 --- a/rust/lance-core/src/datatypes/field.rs +++ b/rust/lance-core/src/datatypes/field.rs @@ -81,6 +81,11 @@ impl Field { } } + pub fn has_dictionary_types(&self) -> bool { + matches!(self.data_type(), DataType::Dictionary(_, _)) + || self.children.iter().any(Field::has_dictionary_types) + } + fn explain_differences( &self, expected: &Self, diff --git a/rust/lance-core/src/datatypes/schema.rs b/rust/lance-core/src/datatypes/schema.rs index 9c47028bac..7990d32d6a 100644 --- a/rust/lance-core/src/datatypes/schema.rs +++ b/rust/lance-core/src/datatypes/schema.rs @@ -142,6 +142,10 @@ impl Schema { } } + pub fn has_dictionary_types(&self) -> bool { + self.fields.iter().any(|f| f.has_dictionary_types()) + } + pub fn check_compatible(&self, expected: &Self, options: &SchemaCompareOptions) -> Result<()> { if !self.compare_with_options(expected, options) { let difference = self.explain_difference(expected, options); diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 3406cbe2e2..88ff30e14f 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -10,6 +10,7 @@ use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta}; use lance_file::reader::FileReader; use lance_io::traits::{ProtoStruct, Reader}; use object_store::path::Path; +use prost::Message; use prost_types::Timestamp; use super::Fragment; @@ -257,6 +258,13 @@ impl Manifest { pub fn uses_move_stable_row_ids(&self) -> bool { self.reader_feature_flags & FLAG_MOVE_STABLE_ROW_IDS != 0 } + + /// Creates a serialized copy of the manifest, suitable for IPC or temp storage + /// and can be used to create a dataset + pub fn serialized(&self) -> Vec { + let pb_manifest: pb::Manifest = self.into(); + pb_manifest.encode_to_vec() + } } #[derive(Debug, Clone, PartialEq)] diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index d3c30f9f80..7b1cf500a2 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -287,25 +287,22 @@ impl Dataset { path: manifest_file, size: None, }; + let manifest = Self::load_manifest(self.object_store.as_ref(), &manifest_location).await?; Self::checkout_manifest( self.object_store.clone(), base_path, self.uri.clone(), - &manifest_location, + manifest, self.session.clone(), self.commit_handler.clone(), ) .await } - async fn checkout_manifest( - object_store: Arc, - base_path: Path, - uri: String, + async fn load_manifest( + object_store: &ObjectStore, manifest_location: &ManifestLocation, - session: Arc, - commit_handler: Arc, - ) -> Result { + ) -> Result { let object_reader = if let Some(size) = manifest_location.size { object_store .open_with_size(&manifest_location.path, size as usize) @@ -343,10 +340,10 @@ impl Dataset { let mut manifest = if manifest_size - offset <= last_block.len() { let message_len = LittleEndian::read_u32(&last_block[offset..offset + 4]) as usize; let message_data = &last_block[offset + 4..offset + 4 + message_len]; - Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?)? + Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?) } else { - read_struct(object_reader.as_ref(), offset).await? - }; + read_struct(object_reader.as_ref(), offset).await + }?; if !can_read_dataset(manifest.reader_feature_flags) { let message = format!( @@ -361,6 +358,18 @@ impl Dataset { } populate_schema_dictionary(&mut manifest.schema, object_reader.as_ref()).await?; + + Ok(manifest) + } + + async fn checkout_manifest( + object_store: Arc, + base_path: Path, + uri: String, + manifest: Manifest, + session: Arc, + commit_handler: Arc, + ) -> Result { Ok(Self { object_store, base: base_path, diff --git a/rust/lance/src/dataset/builder.rs b/rust/lance/src/dataset/builder.rs index 777ed2dd0b..7c3073c078 100644 --- a/rust/lance/src/dataset/builder.rs +++ b/rust/lance/src/dataset/builder.rs @@ -2,9 +2,14 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use std::{collections::HashMap, sync::Arc, time::Duration}; +use lance_file::datatypes::populate_schema_dictionary; use lance_io::object_store::{ObjectStore, ObjectStoreParams}; -use lance_table::io::commit::{commit_handler_from_url, CommitHandler, ManifestLocation}; +use lance_table::{ + format::Manifest, + io::commit::{commit_handler_from_url, CommitHandler, ManifestLocation}, +}; use object_store::{aws::AwsCredentialProvider, path::Path, DynObjectStore}; +use prost::Message; use snafu::{location, Location}; use tracing::instrument; use url::Url; @@ -23,6 +28,8 @@ pub struct DatasetBuilder { /// Metadata cache size for the fragment metadata. If it is zero, metadata /// cache is disabled. metadata_cache_size: usize, + /// Optional pre-loaded manifest to avoid loading it again. + manifest: Option, session: Option>, commit_handler: Option>, options: ObjectStoreParams, @@ -40,6 +47,7 @@ impl DatasetBuilder { commit_handler: None, session: None, version: None, + manifest: None, } } } @@ -105,6 +113,15 @@ impl DatasetBuilder { self } + /// Use a serialized manifest instead of loading it from the object store. + /// + /// This is common when transferring a dataset across IPC boundaries. + pub fn with_serialized_manifest(mut self, manifest: &[u8]) -> Result { + let manifest = Manifest::try_from(lance_table::format::pb::Manifest::decode(manifest)?)?; + self.manifest = Some(manifest); + Ok(self) + } + /// Set options used to initialize storage backend /// /// Options may be passed in the HashMap or set as environment variables. See documentation of @@ -214,33 +231,50 @@ impl DatasetBuilder { let version = self.version; let table_uri = self.table_uri.clone(); + let manifest = self.manifest.take(); + let (object_store, base_path, commit_handler) = self.build_object_store().await?; - let manifest = match version { - Some(version) => { + + let manifest = if manifest.is_some() { + let mut manifest = manifest.unwrap(); + if manifest.schema.has_dictionary_types() { let path = commit_handler - .resolve_version(&base_path, version, &object_store.inner) + .resolve_version(&base_path, manifest.version, &object_store.inner) .await?; - ManifestLocation { - version, - path, - size: None, - } + let reader = object_store.open(&path).await?; + populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?; } - None => commit_handler - .resolve_latest_location(&base_path, &object_store) - .await - .map_err(|e| Error::DatasetNotFound { - source: Box::new(e), - path: base_path.to_string(), - location: location!(), - })?, + manifest + } else { + let manifest_location = match version { + Some(version) => { + let path = commit_handler + .resolve_version(&base_path, version, &object_store.inner) + .await?; + ManifestLocation { + version, + path, + size: None, + } + } + None => commit_handler + .resolve_latest_location(&base_path, &object_store) + .await + .map_err(|e| Error::DatasetNotFound { + source: Box::new(e), + path: base_path.to_string(), + location: location!(), + })?, + }; + + Dataset::load_manifest(&object_store, &manifest_location).await? }; Dataset::checkout_manifest( Arc::new(object_store), base_path, table_uri, - &manifest, + manifest, session, commit_handler, )