Skip to content

Commit

Permalink
feat: pickle the manifest when pickling a dataset (#2459)
Browse files Browse the repository at this point in the history
This allows us to send the pickled dataset across IPC and not require a
reload of the manifest on the remote. It should be safe since manifests
are read only. It uses the existing protobuf serialization for
manifests.

If the manifest contains a dictionary field then we still need to load
the dictionary information on the remote. Perhaps we can pickle the
dictionary at some point (or, with lv2, maybe dictionary won't need to
be in manifest)
  • Loading branch information
westonpace authored Jun 10, 2024
1 parent 8746737 commit 0a1944f
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 34 deletions.
18 changes: 14 additions & 4 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ def __init__(
metadata_cache_size: Optional[int] = None,
commit_lock: Optional[CommitLock] = None,
storage_options: Optional[Dict[str, str]] = None,
serialized_manifest: Optional[bytes] = None,
):
uri = os.fspath(uri) if isinstance(uri, Path) else uri
self._uri = uri
Expand All @@ -172,17 +173,26 @@ def __init__(
metadata_cache_size,
commit_lock,
storage_options,
serialized_manifest,
)

@classmethod
def __deserialize__(cls, uri: str, version: int, manifest: bytes):
return cls(uri, version, serialized_manifest=manifest)

def __reduce__(self):
return LanceDataset, (self.uri, self._ds.version())
return type(self).__deserialize__, (
self.uri,
self._ds.version(),
self._ds.serialized_manifest(),
)

def __getstate__(self):
return self.uri, self._ds.version()
return self.uri, self._ds.version(), self._ds.serialized_manifest()

def __setstate__(self, state):
self._uri, version = state
self._ds = _Dataset(self._uri, version)
self._uri, version, manifest = state
self._ds = _Dataset(self._uri, version, manifest=manifest)

def __copy__(self):
ds = LanceDataset.__new__(LanceDataset)
Expand Down
13 changes: 12 additions & 1 deletion python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use lance_table::io::commit::CommitHandler;
use object_store::path::Path;
use pyo3::exceptions::{PyStopIteration, PyTypeError};
use pyo3::prelude::*;
use pyo3::types::{PyList, PySet, PyString};
use pyo3::types::{PyBytes, PyList, PySet, PyString};
use pyo3::{
exceptions::{PyIOError, PyKeyError, PyValueError},
pyclass,
Expand All @@ -63,6 +63,7 @@ use pyo3::{
};
use snafu::{location, Location};

use crate::error::PythonErrorExt;
use crate::fragment::{FileFragment, FragmentMetadata};
use crate::schema::LanceSchema;
use crate::session::Session;
Expand Down Expand Up @@ -276,6 +277,7 @@ pub struct Dataset {

#[pymethods]
impl Dataset {
#[allow(clippy::too_many_arguments)]
#[new]
fn new(
uri: String,
Expand All @@ -285,6 +287,7 @@ impl Dataset {
metadata_cache_size: Option<usize>,
commit_handler: Option<PyObject>,
storage_options: Option<HashMap<String, String>>,
manifest: Option<&[u8]>,
) -> PyResult<Self> {
let mut params = ReadParams {
index_cache_size: index_cache_size.unwrap_or(DEFAULT_INDEX_CACHE_SIZE),
Expand All @@ -308,6 +311,9 @@ impl Dataset {
if let Some(storage_options) = storage_options {
builder = builder.with_storage_options(storage_options);
}
if let Some(manifest) = manifest {
builder = builder.with_serialized_manifest(manifest).infer_error()?;
}

let dataset = RT.runtime.block_on(builder.load());

Expand Down Expand Up @@ -351,6 +357,11 @@ impl Dataset {
})
}

fn serialized_manifest(&self, py: Python) -> PyObject {
let manifest_bytes = self.ds.manifest().serialized();
PyBytes::new(py, &manifest_bytes).into()
}

/// Load index metadata
fn load_indices(self_: PyRef<'_, Self>) -> PyResult<Vec<PyObject>> {
let index_metadata = RT
Expand Down
5 changes: 5 additions & 0 deletions rust/lance-core/src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ impl Field {
}
}

pub fn has_dictionary_types(&self) -> bool {
matches!(self.data_type(), DataType::Dictionary(_, _))
|| self.children.iter().any(Field::has_dictionary_types)
}

fn explain_differences(
&self,
expected: &Self,
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-core/src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ impl Schema {
}
}

pub fn has_dictionary_types(&self) -> bool {
self.fields.iter().any(|f| f.has_dictionary_types())
}

pub fn check_compatible(&self, expected: &Self, options: &SchemaCompareOptions) -> Result<()> {
if !self.compare_with_options(expected, options) {
let difference = self.explain_difference(expected, options);
Expand Down
8 changes: 8 additions & 0 deletions rust/lance-table/src/format/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
use lance_file::reader::FileReader;
use lance_io::traits::{ProtoStruct, Reader};
use object_store::path::Path;
use prost::Message;
use prost_types::Timestamp;

use super::Fragment;
Expand Down Expand Up @@ -257,6 +258,13 @@ impl Manifest {
pub fn uses_move_stable_row_ids(&self) -> bool {
self.reader_feature_flags & FLAG_MOVE_STABLE_ROW_IDS != 0
}

/// Creates a serialized copy of the manifest, suitable for IPC or temp storage
/// and can be used to create a dataset
pub fn serialized(&self) -> Vec<u8> {
let pb_manifest: pb::Manifest = self.into();
pb_manifest.encode_to_vec()
}
}

#[derive(Debug, Clone, PartialEq)]
Expand Down
31 changes: 20 additions & 11 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,25 +287,22 @@ impl Dataset {
path: manifest_file,
size: None,
};
let manifest = Self::load_manifest(self.object_store.as_ref(), &manifest_location).await?;
Self::checkout_manifest(
self.object_store.clone(),
base_path,
self.uri.clone(),
&manifest_location,
manifest,
self.session.clone(),
self.commit_handler.clone(),
)
.await
}

async fn checkout_manifest(
object_store: Arc<ObjectStore>,
base_path: Path,
uri: String,
async fn load_manifest(
object_store: &ObjectStore,
manifest_location: &ManifestLocation,
session: Arc<Session>,
commit_handler: Arc<dyn CommitHandler>,
) -> Result<Self> {
) -> Result<Manifest> {
let object_reader = if let Some(size) = manifest_location.size {
object_store
.open_with_size(&manifest_location.path, size as usize)
Expand Down Expand Up @@ -343,10 +340,10 @@ impl Dataset {
let mut manifest = if manifest_size - offset <= last_block.len() {
let message_len = LittleEndian::read_u32(&last_block[offset..offset + 4]) as usize;
let message_data = &last_block[offset + 4..offset + 4 + message_len];
Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?)?
Manifest::try_from(lance_table::format::pb::Manifest::decode(message_data)?)
} else {
read_struct(object_reader.as_ref(), offset).await?
};
read_struct(object_reader.as_ref(), offset).await
}?;

if !can_read_dataset(manifest.reader_feature_flags) {
let message = format!(
Expand All @@ -361,6 +358,18 @@ impl Dataset {
}

populate_schema_dictionary(&mut manifest.schema, object_reader.as_ref()).await?;

Ok(manifest)
}

async fn checkout_manifest(
object_store: Arc<ObjectStore>,
base_path: Path,
uri: String,
manifest: Manifest,
session: Arc<Session>,
commit_handler: Arc<dyn CommitHandler>,
) -> Result<Self> {
Ok(Self {
object_store,
base: base_path,
Expand Down
70 changes: 52 additions & 18 deletions rust/lance/src/dataset/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@
// SPDX-FileCopyrightText: Copyright The Lance Authors
use std::{collections::HashMap, sync::Arc, time::Duration};

use lance_file::datatypes::populate_schema_dictionary;
use lance_io::object_store::{ObjectStore, ObjectStoreParams};
use lance_table::io::commit::{commit_handler_from_url, CommitHandler, ManifestLocation};
use lance_table::{
format::Manifest,
io::commit::{commit_handler_from_url, CommitHandler, ManifestLocation},
};
use object_store::{aws::AwsCredentialProvider, path::Path, DynObjectStore};
use prost::Message;
use snafu::{location, Location};
use tracing::instrument;
use url::Url;
Expand All @@ -23,6 +28,8 @@ pub struct DatasetBuilder {
/// Metadata cache size for the fragment metadata. If it is zero, metadata
/// cache is disabled.
metadata_cache_size: usize,
/// Optional pre-loaded manifest to avoid loading it again.
manifest: Option<Manifest>,
session: Option<Arc<Session>>,
commit_handler: Option<Arc<dyn CommitHandler>>,
options: ObjectStoreParams,
Expand All @@ -40,6 +47,7 @@ impl DatasetBuilder {
commit_handler: None,
session: None,
version: None,
manifest: None,
}
}
}
Expand Down Expand Up @@ -105,6 +113,15 @@ impl DatasetBuilder {
self
}

/// Use a serialized manifest instead of loading it from the object store.
///
/// This is common when transferring a dataset across IPC boundaries.
pub fn with_serialized_manifest(mut self, manifest: &[u8]) -> Result<Self> {
let manifest = Manifest::try_from(lance_table::format::pb::Manifest::decode(manifest)?)?;
self.manifest = Some(manifest);
Ok(self)
}

/// Set options used to initialize storage backend
///
/// Options may be passed in the HashMap or set as environment variables. See documentation of
Expand Down Expand Up @@ -214,33 +231,50 @@ impl DatasetBuilder {
let version = self.version;
let table_uri = self.table_uri.clone();

let manifest = self.manifest.take();

let (object_store, base_path, commit_handler) = self.build_object_store().await?;
let manifest = match version {
Some(version) => {

let manifest = if manifest.is_some() {
let mut manifest = manifest.unwrap();
if manifest.schema.has_dictionary_types() {
let path = commit_handler
.resolve_version(&base_path, version, &object_store.inner)
.resolve_version(&base_path, manifest.version, &object_store.inner)
.await?;
ManifestLocation {
version,
path,
size: None,
}
let reader = object_store.open(&path).await?;
populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
}
None => commit_handler
.resolve_latest_location(&base_path, &object_store)
.await
.map_err(|e| Error::DatasetNotFound {
source: Box::new(e),
path: base_path.to_string(),
location: location!(),
})?,
manifest
} else {
let manifest_location = match version {
Some(version) => {
let path = commit_handler
.resolve_version(&base_path, version, &object_store.inner)
.await?;
ManifestLocation {
version,
path,
size: None,
}
}
None => commit_handler
.resolve_latest_location(&base_path, &object_store)
.await
.map_err(|e| Error::DatasetNotFound {
source: Box::new(e),
path: base_path.to_string(),
location: location!(),
})?,
};

Dataset::load_manifest(&object_store, &manifest_location).await?
};

Dataset::checkout_manifest(
Arc::new(object_store),
base_path,
table_uri,
&manifest,
manifest,
session,
commit_handler,
)
Expand Down

0 comments on commit 0a1944f

Please sign in to comment.