diff --git a/python/Cargo.toml b/python/Cargo.toml index 60ab7bd23d..f3e58b9af5 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -41,11 +41,14 @@ pyo3 = { version = "0.20", features = ["extension-module", "abi3-py38"] } tokio = { version = "1.23", features = ["rt-multi-thread"] } uuid = "1.3.0" serde_json = "1" +serde = "1.0.197" +serde_yaml = "0.9.34" num_cpus = "1" snafu = "0.7.4" tracing-chrome = "0.7.1" tracing-subscriber = "0.3.17" tracing = "0.1.37" +url = "2.5.0" # Prevent dynamic linking of lzma, which comes from datafusion lzma-sys = { version = "*", features = ["static"] } diff --git a/python/pyproject.toml b/python/pyproject.toml index 7e2c681a2d..c2366ca972 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -12,13 +12,13 @@ keywords = [ "data-science", "machine-learning", "arrow", - "data-analytics" + "data-analytics", ] categories = [ "database-implementations", "data-structures", "development-tools", - "science" + "science", ] classifiers = [ "Development Status :: 3 - Alpha", @@ -48,7 +48,7 @@ build-backend = "maturin" [project.optional-dependencies] tests = [ "datasets", - "duckdb; python_version<'3.12'", # TODO: remove when duckdb supports 3.12 + "duckdb; python_version<'3.12'", # TODO: remove when duckdb supports 3.12 "ml_dtypes", "pillow", "pandas", @@ -65,7 +65,7 @@ torch = ["torch"] lint.select = ["F", "E", "W", "I", "G", "TCH", "PERF", "CPY001", "B019"] [tool.ruff.lint.per-file-ignores] -"*.pyi" = ["E302"] +"*.pyi" = ["E301", "E302"] [tool.mypy] python_version = "3.11" diff --git a/python/python/lance/file.py b/python/python/lance/file.py new file mode 100644 index 0000000000..122d006b5d --- /dev/null +++ b/python/python/lance/file.py @@ -0,0 +1,188 @@ +# Copyright (c) 2023. Lance Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Union + +import pyarrow as pa + +from .lance import ( + LanceBufferDescriptor, + LanceColumnMetadata, + LanceFileMetadata, + LancePageMetadata, +) +from .lance import ( + LanceFileReader as _LanceFileReader, +) +from .lance import ( + LanceFileWriter as _LanceFileWriter, +) + + +class ReaderResults: + """ + Utility class for converting results from Lance's internal + format (RecordBatchReader) to a desired format such + as a pyarrow Table, etc. + """ + + def __init__(self, reader: pa.RecordBatchReader): + """ + Creates a new instance, not meant for external use + """ + self.reader = reader + + def to_batches(self) -> pa.RecordBatchReader: + """ + Return the results as a pyarrow RecordBatchReader + """ + return self.reader + + def to_table(self) -> pa.Table: + """ + Return the results as a pyarrow Table + """ + return self.reader.read_all() + + +class LanceFileReader: + """ + A file reader for reading Lance files + + This class is used to read Lance data files, a low level structure + optimized for storing multi-modal tabular data. If you are working with + Lance datasets then you should use the LanceDataset class instead. + """ + + # TODO: make schema optional + def __init__(self, path: str, schema: pa.Schema): + """ + Creates a new file reader to read the given file + + Parameters + ---------- + + path: str + The path to read, can be a pathname for local storage + or a URI to read from cloud storage. + schema: pa.Schema + The desired projection schema + """ + self._reader = _LanceFileReader(path, schema) + + def read_all(self, *, batch_size: int = 1024) -> ReaderResults: + """ + Reads the entire file + + Parameters + ---------- + batch_size: int, default 1024 + The file will be read in batches. This parameter controls + how many rows will be in each batch (except the final batch) + + Smaller batches will use less memory but might be slightly + slower because there is more per-batch overhead + """ + return ReaderResults(self._reader.read_all(batch_size)) + + def read_range( + self, start: int, num_rows: int, *, batch_size: int = 1024 + ) -> ReaderResults: + """ + Read a range of rows from the file + + Parameters + ---------- + start: int + The offset of the first row to start reading + num_rows: int + The number of rows to read from the file + batch_size: int, default 1024 + The file will be read in batches. This parameter controls + how many rows will be in each batch (except the final batch) + + Smaller batches will use less memory but might be slightly + slower because there is more per-batch overhead + """ + return ReaderResults(self._reader.read_range(start, num_rows, batch_size)) + + def metadata(self) -> LanceFileMetadata: + """ + Return metadata describing the file contents + """ + return self._reader.metadata() + + +class LanceFileWriter: + """ + A file writer for writing Lance data files + + This class is used to write Lance data files, a low level structure + optimized for storing multi-modal tabular data. If you are working with + Lance datasets then you should use the LanceDataset class instead. + """ + + def __init__(self, path: str, schema: pa.Schema, **kwargs): + """ + Create a new LanceFileWriter to write to the given path + + Parameters + ---------- + path: str + The path to write to. Can be a pathname for local storage + or a URI for remote storage. + schema: pa.Schema + The schema of data that will be written + """ + self._writer = _LanceFileWriter(path, schema, **kwargs) + self.closed = False + + def write_batch(self, batch: Union[pa.RecordBatch, pa.Table]) -> None: + """ + Write a batch of data to the file + + parameters + ---------- + batch: Union[pa.RecordBatch, pa.Table] + The data to write to the file + """ + if isinstance(batch, pa.Table): + for batch in batch.to_batches(): + self._writer.write_batch(batch) + else: + self._writer.write_batch(batch) + + def close(self) -> None: + """ + Write the file metadata and close the file + """ + if self.closed: + return + self.closed = True + self._writer.finish() + + def __enter__(self) -> "LanceFileWriter": + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.close() + + +__all__ = [ + "LanceFileReader", + "LanceFileWriter", + "LanceFileMetadata", + "LanceColumnMetadata", + "LancePageMetadata", + "LanceBufferDescriptor", +] diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index a2a69b0db0..27c10e49ae 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -32,3 +32,36 @@ class CompactionMetrics: fragments_added: int files_removed: int files_added: int + +class LanceFileWriter: + def __init__(self, path: str, schema: pa.Schema): ... + def write_batch(self, batch: pa.RecordBatch) -> None: ... + def finish(self) -> None: ... + +class LanceFileReader: + def __init__(self, path: str, schema: pa.Schema): ... + def read_all(self, batch_size: int) -> pa.RecordBatchReader: ... + def read_range( + self, start: int, num_rows: int, batch_size: int + ) -> pa.RecordBatchReader: ... + +class LanceBufferDescriptor: + position: int + size: int + +class LancePageMetadata: + buffers: List[LanceBufferDescriptor] + encoding: str + +class LanceColumnMetadata: + column_buffers: List[LanceBufferDescriptor] + pages: List[LancePageMetadata] + +class LanceFileMetadata: + schema: pa.Schema + num_rows: int + num_data_bytes: int + num_column_metadata_bytes: int + num_global_buffer_bytes: int + global_buffers: List[LanceBufferDescriptor] + columns: List[LanceColumnMetadata] diff --git a/python/python/tests/test_file.py b/python/python/tests/test_file.py new file mode 100644 index 0000000000..c13a561909 --- /dev/null +++ b/python/python/tests/test_file.py @@ -0,0 +1,70 @@ +# Copyright (c) 2023. Lance Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pyarrow as pa +from lance.file import LanceFileReader, LanceFileWriter + + +def test_file_writer(tmp_path): + path = tmp_path / "foo.lance" + schema = pa.schema([pa.field("a", pa.int64())]) + with LanceFileWriter(str(path), schema) as writer: + writer.write_batch(pa.table({"a": [1, 2, 3]})) + assert len(path.read_bytes()) > 0 + + +def test_round_trip(tmp_path): + path = tmp_path / "foo.lance" + schema = pa.schema([pa.field("a", pa.int64())]) + data = pa.table({"a": [1, 2, 3]}) + with LanceFileWriter(str(path), schema) as writer: + writer.write_batch(data) + reader = LanceFileReader(str(path), schema) + result = reader.read_all().to_table() + assert result == data + + # TODO: Currently fails, need to fix reader + # result = reader.read_range(1, 1).to_table() + # assert result == pa.table({"a": [2]}) + + # TODO: Test reading invalid ranges + # TODO: Test invalid batch sizes + + +def test_metadata(tmp_path): + path = tmp_path / "foo.lance" + schema = pa.schema([pa.field("a", pa.int64())]) + data = pa.table({"a": [1, 2, 3]}) + with LanceFileWriter(str(path), schema) as writer: + writer.write_batch(data) + reader = LanceFileReader(str(path), schema) + metadata = reader.metadata() + + assert metadata.schema == schema + assert metadata.num_rows == 3 + assert metadata.num_global_buffer_bytes > 0 + assert metadata.num_column_metadata_bytes > 0 + assert metadata.num_data_bytes == 24 + assert len(metadata.columns) == 1 + + column = metadata.columns[0] + assert len(column.column_buffers) == 0 + assert len(column.pages) == 1 + + page = column.pages[0] + assert len(page.buffers) == 1 + assert page.buffers[0].position == 0 + assert page.buffers[0].size == 24 + + assert len(page.encoding) > 0 diff --git a/python/src/error.rs b/python/src/error.rs new file mode 100644 index 0000000000..dc1f5a1fd7 --- /dev/null +++ b/python/src/error.rs @@ -0,0 +1,69 @@ +// Copyright 2024 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use pyo3::{ + exceptions::{PyIOError, PyNotImplementedError, PyOSError, PyRuntimeError, PyValueError}, + PyResult, +}; + +use lance::error::Error as LanceError; + +pub trait PythonErrorExt { + /// Convert to a python error based on the Lance error type + fn infer_error(self) -> PyResult; + /// Convert to OSError + fn os_error(self) -> PyResult; + /// Convert to RuntimeError + fn runtime_error(self) -> PyResult; + /// Convert to ValueError + fn value_error(self) -> PyResult; + /// Convert to PyNotImplementedError + fn not_implemented(self) -> PyResult; + /// Convert to PyIoError + fn io_error(self) -> PyResult; +} + +impl PythonErrorExt for std::result::Result { + fn infer_error(self) -> PyResult { + match &self { + Ok(_) => Ok(self.unwrap()), + Err(err) => match err { + LanceError::InvalidInput { .. } => self.value_error(), + LanceError::NotSupported { .. } => self.not_implemented(), + LanceError::IO { .. } => self.io_error(), + _ => self.runtime_error(), + }, + } + } + + fn os_error(self) -> PyResult { + self.map_err(|err| PyOSError::new_err(err.to_string())) + } + + fn runtime_error(self) -> PyResult { + self.map_err(|err| PyRuntimeError::new_err(err.to_string())) + } + + fn value_error(self) -> PyResult { + self.map_err(|err| PyValueError::new_err(err.to_string())) + } + + fn not_implemented(self) -> PyResult { + self.map_err(|err| PyNotImplementedError::new_err(err.to_string())) + } + + fn io_error(self) -> PyResult { + self.map_err(|err| PyIOError::new_err(err.to_string())) + } +} diff --git a/python/src/file.rs b/python/src/file.rs new file mode 100644 index 0000000000..9e17d2020d --- /dev/null +++ b/python/src/file.rs @@ -0,0 +1,286 @@ +// Copyright 2024 Lance Developers. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{pin::Pin, sync::Arc}; + +use arrow::pyarrow::PyArrowType; +use arrow_array::{RecordBatch, RecordBatchReader}; +use arrow_schema::Schema as ArrowSchema; +use futures::stream::StreamExt; +use lance::io::{ObjectStore, RecordBatchStream}; +use lance_file::v2::{ + reader::{BufferDescriptor, CachedFileMetadata, FileReader}, + writer::{FileWriter, FileWriterOptions}, +}; +use lance_io::scheduler::StoreScheduler; +use object_store::path::Path; +use pyo3::{exceptions::PyRuntimeError, pyclass, pymethods, IntoPy, PyObject, PyResult, Python}; +use serde::Serialize; +use url::Url; + +use crate::{error::PythonErrorExt, RT}; + +#[pyclass(get_all)] +#[derive(Clone, Debug, Serialize)] +pub struct LanceBufferDescriptor { + /// The byte offset of the buffer in the file + pub position: u64, + /// The size (in bytes) of the buffer + pub size: u64, +} + +impl LanceBufferDescriptor { + fn new(inner: &BufferDescriptor) -> Self { + Self { + position: inner.position, + size: inner.size, + } + } + + fn new_from_parts(position: u64, size: u64) -> Self { + Self { position, size } + } +} + +#[pyclass(get_all)] +#[derive(Clone, Debug, Serialize)] +pub struct LancePageMetadata { + /// The buffers in the page + pub buffers: Vec, + /// A description of the encoding used to encode the page + pub encoding: String, +} + +impl LancePageMetadata { + fn new(inner: &lance_file::format::pbfile::column_metadata::Page) -> Self { + let buffers = inner + .buffer_offsets + .iter() + .zip(inner.buffer_sizes.iter()) + .map(|(pos, size)| LanceBufferDescriptor::new_from_parts(*pos, *size)) + .collect(); + Self { + buffers, + encoding: lance_file::v2::reader::describe_encoding(inner), + } + } +} + +#[pyclass(get_all)] +#[derive(Clone, Debug, Serialize)] +pub struct LanceColumnMetadata { + /// The column-wide buffers + pub column_buffers: Vec, + /// The data pages in the column + pub pages: Vec, +} + +impl LanceColumnMetadata { + fn new(inner: &lance_file::format::pbfile::ColumnMetadata) -> Self { + let column_buffers = inner + .buffer_offsets + .iter() + .zip(inner.buffer_sizes.iter()) + .map(|(pos, size)| LanceBufferDescriptor::new_from_parts(*pos, *size)) + .collect(); + Self { + column_buffers, + pages: inner.pages.iter().map(LancePageMetadata::new).collect(), + } + } +} + +#[pyclass(get_all)] +#[derive(Clone, Debug, Serialize)] +pub struct LanceFileMetadata { + /// The schema of the file + #[serde(skip)] + pub schema: Option, + /// The major version of the file + pub major_version: u16, + /// The minor version of the file + pub minor_version: u16, + /// The number of rows in the file + pub num_rows: u64, + /// The number of bytes in the data section of the file + pub num_data_bytes: u64, + /// The number of bytes in the column metadata section of the file + pub num_column_metadata_bytes: u64, + /// The number of bytes in the global buffer section of the file + pub num_global_buffer_bytes: u64, + /// The global buffers + pub global_buffers: Vec, + /// The column metadata, an entry might be None if the metadata for a column + /// was not loaded into memory when the file was opened. + pub columns: Vec>, +} + +impl LanceFileMetadata { + fn new(inner: &CachedFileMetadata, py: Python) -> Self { + let schema = Some(PyArrowType(inner.file_schema.clone()).into_py(py)); + Self { + major_version: inner.major_version, + minor_version: inner.minor_version, + schema, + num_rows: inner.num_rows, + num_data_bytes: inner.num_data_bytes, + num_column_metadata_bytes: inner.num_column_metadata_bytes, + num_global_buffer_bytes: inner.num_global_buffer_bytes, + global_buffers: inner + .file_buffers + .iter() + .map(LanceBufferDescriptor::new) + .collect(), + columns: inner + .column_metadatas + .iter() + .map(LanceColumnMetadata::new) + .map(Some) + .collect(), + } + } +} + +#[pymethods] +impl LanceFileMetadata { + pub fn __repr__(&self) -> PyResult { + serde_yaml::to_string(self).map_err(|err| PyRuntimeError::new_err(err.to_string())) + } +} + +#[pyclass] +pub struct LanceFileWriter { + inner: Box, +} + +impl LanceFileWriter { + async fn open(uri_or_path: String, schema: PyArrowType) -> PyResult { + let (object_store, path) = if let Ok(_) = Url::parse(&uri_or_path) { + ObjectStore::from_uri(&uri_or_path).await.infer_error()? + } else { + ( + ObjectStore::local(), + Path::parse(uri_or_path).map_err(|e| PyRuntimeError::new_err(e.to_string()))?, + ) + }; + let object_writer = object_store.create(&path).await.infer_error()?; + let inner = FileWriter::try_new( + object_writer, + schema.0.clone(), + FileWriterOptions::default(), + ) + .infer_error()?; + Ok(Self { + inner: Box::new(inner), + }) + } +} + +#[pymethods] +impl LanceFileWriter { + #[new] + pub fn new(path: String, schema: PyArrowType) -> PyResult { + RT.runtime.block_on(Self::open(path, schema)) + } + + pub fn write_batch(&mut self, batch: PyArrowType) -> PyResult<()> { + RT.runtime + .block_on(self.inner.write_batch(&batch.0)) + .infer_error() + } + + pub fn finish(&mut self) -> PyResult<()> { + RT.runtime.block_on(self.inner.finish()).infer_error() + } +} + +#[pyclass] +pub struct LanceFileReader { + inner: Box, +} + +impl LanceFileReader { + async fn open(uri_or_path: String, schema: PyArrowType) -> PyResult { + let (object_store, path) = if let Ok(_) = Url::parse(&uri_or_path) { + ObjectStore::from_uri(&uri_or_path).await.infer_error()? + } else { + ( + ObjectStore::local(), + Path::parse(uri_or_path).map_err(|e| PyRuntimeError::new_err(e.to_string()))?, + ) + }; + let scheduler = StoreScheduler::new(Arc::new(object_store), 8); + let file = scheduler.open_file(&path).await.infer_error()?; + let inner = FileReader::try_open(file, schema.0.clone()) + .await + .infer_error()?; + Ok(Self { + inner: Box::new(inner), + }) + } +} + +struct LanceReaderAdapter(Pin>); + +impl Iterator for LanceReaderAdapter { + type Item = std::result::Result; + + fn next(&mut self) -> Option { + let batch = RT.runtime.block_on(self.0.next()); + batch.map(|b| b.map_err(|e| e.into())) + } +} + +impl RecordBatchReader for LanceReaderAdapter { + fn schema(&self) -> std::sync::Arc { + self.0.schema().clone() + } +} + +#[pymethods] +impl LanceFileReader { + #[new] + pub fn new(path: String, schema: PyArrowType) -> PyResult { + RT.runtime.block_on(Self::open(path, schema)) + } + + pub fn read_all( + &mut self, + batch_size: u32, + ) -> PyResult>> { + let stream = RT.runtime.block_on( + self.inner + .read_stream(lance_io::ReadBatchParams::RangeFull, batch_size), + ); + Ok(PyArrowType(Box::new(LanceReaderAdapter(stream)))) + } + + pub fn read_range( + &mut self, + offset: u64, + num_rows: u64, + batch_size: u32, + ) -> PyResult>> { + let stream = RT.runtime.block_on(self.inner.read_stream( + lance_io::ReadBatchParams::Range((offset as usize)..(offset + num_rows) as usize), + batch_size, + )); + Ok(PyArrowType(Box::new(LanceReaderAdapter(stream)))) + } + + pub fn metadata(&mut self, py: Python) -> LanceFileMetadata { + let inner_meta = self.inner.metadata(); + LanceFileMetadata::new(inner_meta, py) + } +} diff --git a/python/src/lib.rs b/python/src/lib.rs index 4ff934431e..b17f555830 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -35,6 +35,10 @@ use dataset::optimize::{ }; use dataset::MergeInsertBuilder; use env_logger::Env; +use file::{ + LanceBufferDescriptor, LanceColumnMetadata, LanceFileMetadata, LanceFileReader, + LanceFileWriter, LancePageMetadata, +}; use futures::StreamExt; use lance_index::DatasetIndexExt; use pyo3::exceptions::{PyIOError, PyValueError}; @@ -47,7 +51,9 @@ pub(crate) mod arrow; #[cfg(feature = "datagen")] pub(crate) mod datagen; pub(crate) mod dataset; +pub(crate) mod error; pub(crate) mod executor; +pub(crate) mod file; pub(crate) mod fragment; pub(crate) mod reader; pub(crate) mod scanner; @@ -101,6 +107,12 @@ fn lance(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/rust/lance-encoding/src/encoder.rs b/rust/lance-encoding/src/encoder.rs index 3e24b10bef..3ef62d1de2 100644 --- a/rust/lance-encoding/src/encoder.rs +++ b/rust/lance-encoding/src/encoder.rs @@ -106,7 +106,7 @@ pub trait ArrayEncoder: std::fmt::Debug + Send + Sync { /// column with three fields (a boolean field, an int32 field, and a 4096-dimension /// tensor field) the tensor field is likely to emit encoded pages much more frequently /// than the boolean field. -pub trait FieldEncoder { +pub trait FieldEncoder: Send { /// Buffer the data and, if there is enough data in the buffer to form a page, return /// an encoding task to encode the data. /// diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs index ca93e79b55..fa5225b1c5 100644 --- a/rust/lance-file/src/v2/reader.rs +++ b/rust/lance-file/src/v2/reader.rs @@ -1,4 +1,4 @@ -use std::{io::Cursor, ops::Range, sync::Arc}; +use std::{io::Cursor, ops::Range, pin::Pin, sync::Arc}; use arrow_array::RecordBatch; use arrow_schema::Schema; @@ -14,7 +14,11 @@ use snafu::{location, Location}; use lance_core::{Error, Result}; use lance_encoding::format::pb as pbenc; -use lance_io::{scheduler::FileScheduler, ReadBatchParams}; +use lance_io::{ + scheduler::FileScheduler, + stream::{RecordBatchStream, RecordBatchStreamAdapter}, + ReadBatchParams, +}; use tokio::{sync::mpsc, task::JoinHandle}; use crate::{ @@ -24,6 +28,15 @@ use crate::{ use super::io::LanceEncodingsIo; +// For now, we don't use global buffers for anything other than schema. If we +// use these later we should make them lazily loaded and then cached once loaded. +// +// We store their position / length for debugging purposes +pub struct BufferDescriptor { + pub position: u64, + pub size: u64, +} + // TODO: Caching pub struct CachedFileMetadata { /// The schema of the file @@ -32,6 +45,15 @@ pub struct CachedFileMetadata { pub column_metadatas: Vec, /// The number of rows in the file pub num_rows: u64, + pub file_buffers: Vec, + /// The number of bytes contained in the data page section of the file + pub num_data_bytes: u64, + /// The number of bytes contained in the column metadata section of the file + pub num_column_metadata_bytes: u64, + /// The number of bytes contained in the global buffer section of the file + pub num_global_buffer_bytes: u64, + pub major_version: u16, + pub minor_version: u16, } pub struct FileReader { @@ -39,6 +61,7 @@ pub struct FileReader { file_schema: Schema, column_infos: Vec, num_rows: u64, + metadata: Arc, } struct Footer { @@ -50,11 +73,17 @@ struct Footer { global_buff_offsets_start: u64, num_global_buffers: u32, num_columns: u32, + major_version: u16, + minor_version: u16, } const FOOTER_LEN: usize = 48; impl FileReader { + pub fn metadata(&self) -> &Arc { + &self.metadata + } + async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> { let file_size = scheduler.reader().size().await? as u64; let begin = if file_size < scheduler.reader().block_size() as u64 { @@ -117,6 +146,8 @@ impl FileReader { global_buff_offsets_start, num_global_buffers, num_columns, + major_version, + minor_version, }) } @@ -215,55 +246,65 @@ impl FileReader { // Next, read the metadata for the columns let column_metadatas = Self::read_all_column_metadata(scheduler, &footer).await?; + let footer_start = file_len - FOOTER_LEN as u64; + let num_data_bytes = footer.column_meta_start; + let num_column_metadata_bytes = footer.global_buff_start - footer.column_meta_start; + let num_global_buffer_bytes = footer_start - footer.global_buff_start; + + let global_bufs_table_nbytes = footer.num_global_buffers as usize * 16; + let global_bufs_table_start = (footer.global_buff_offsets_start - meta_offset) as usize; + let global_bufs_table_end = global_bufs_table_start + global_bufs_table_nbytes; + let global_bufs_table = + all_metadata_bytes.slice(global_bufs_table_start..global_bufs_table_end); + let mut global_bufs_cursor = Cursor::new(&global_bufs_table); + + let mut global_buffers = Vec::with_capacity(footer.num_global_buffers as usize); + for _ in 0..footer.num_global_buffers { + let buf_pos = global_bufs_cursor.read_u64::()? - meta_offset; + let buf_size = global_bufs_cursor.read_u64::()?; + global_buffers.push(BufferDescriptor { + position: buf_pos, + size: buf_size, + }); + } + Ok(CachedFileMetadata { file_schema: Schema::from(&schema), column_metadatas, num_rows, + num_data_bytes, + num_column_metadata_bytes, + num_global_buffer_bytes, + file_buffers: global_buffers, + major_version: footer.major_version, + minor_version: footer.minor_version, }) } - pub async fn print_all_metadata(scheduler: &FileScheduler) -> Result<()> { + pub async fn print_all_metadata(metadata: &CachedFileMetadata) -> Result<()> { // 1. read and print the footer - let (tail_bytes, file_len) = Self::read_tail(scheduler).await?; - let footer = Self::decode_footer(&tail_bytes)?; - println!("# Footer"); println!(); println!( "File version : {}.{}", MAJOR_VERSION, MINOR_VERSION_NEXT ); - println!("Data bytes : {}", footer.column_meta_start); - println!( - "Col. meta size (padded): {}", - footer.column_meta_offsets_start - footer.column_meta_start - ); - println!( - "Glo. buff size (padded): {}", - footer.global_buff_offsets_start - footer.global_buff_start - ); - - let all_metadata_bytes = - Self::get_all_meta_bytes(tail_bytes, file_len, scheduler, &footer).await?; - let meta_offset = footer.column_meta_start; + println!("Data bytes : {}", metadata.num_data_bytes); + println!("Col. meta bytes: {}", metadata.num_column_metadata_bytes); + println!("Glo. data bytes: {}", metadata.num_global_buffer_bytes); // 2. print the global buffers - let global_bufs_table_nbytes = footer.num_global_buffers as usize * 16; - let global_bufs_table_start = (footer.global_buff_offsets_start - meta_offset) as usize; - let global_bufs_table_end = global_bufs_table_start + global_bufs_table_nbytes; - let global_bufs_table = - all_metadata_bytes.slice(global_bufs_table_start..global_bufs_table_end); - let mut global_bufs_cursor = Cursor::new(&global_bufs_table); println!("Global buffers:"); - for _ in 0..footer.num_global_buffers { - let buf_pos = global_bufs_cursor.read_u64::()? - meta_offset; - let buf_size = global_bufs_cursor.read_u64::()?; - println!(" * {}..{}", buf_pos, buf_pos + buf_size); + for file_buffer in &metadata.file_buffers { + println!( + " * {}..{}", + file_buffer.position, + file_buffer.position + file_buffer.size + ); } - let col_meta = Self::read_all_column_metadata(scheduler, &footer).await?; println!("Columns:"); - for (idx, col) in col_meta.iter().enumerate() { + for (idx, col) in metadata.column_metadatas.iter().enumerate() { println!(" * Column {}", idx); println!(); println!(" Buffers:"); @@ -353,6 +394,7 @@ impl FileReader { column_infos, file_schema: file_metadata.file_schema.clone(), num_rows, + metadata: file_metadata, }) } @@ -382,7 +424,7 @@ impl FileReader { &self, params: ReadBatchParams, batch_size: u32, - ) -> BoxStream<'static, Result> { + ) -> Pin> { let futures_stream = match params { ReadBatchParams::Indices(_) => todo!(), ReadBatchParams::Range(range) => { @@ -402,12 +444,55 @@ impl FileReader { } ReadBatchParams::RangeFull => self.read_range(0..self.num_rows, batch_size).await, }; - futures_stream + let batch_stream = futures_stream .buffered(16) // JoinHandle returns Result> where the outer Result means the thread // task panic'd so we propagate that panic here. .map(|res_res| res_res.unwrap()) - .boxed() + .boxed(); + Box::pin(RecordBatchStreamAdapter::new( + Arc::new(self.file_schema.clone()), + batch_stream, + )) + } +} + +/// Inspects a page and returns a String describing the page's encoding +pub fn describe_encoding(page: &pbfile::column_metadata::Page) -> String { + if let Some(encoding) = &page.encoding { + if let Some(style) = &encoding.style { + match style { + pbfile::encoding::Style::Deferred(deferred) => { + format!( + "DeferredEncoding(pos={},size={})", + deferred.buffer_location, deferred.buffer_length + ) + } + pbfile::encoding::Style::Direct(direct) => { + if let Some(encoding) = &direct.encoding { + if encoding.type_url == "/lance.encodings.ArrayEncoding" { + let encoding = encoding.to_msg::(); + match encoding { + Ok(encoding) => { + format!("{:#?}", encoding) + } + Err(err) => { + format!("Unsupported(decode_err={})", err) + } + } + } else { + format!("Unrecognized(type_url={})", encoding.type_url) + } + } else { + "MISSING DIRECT VALUE".to_string() + } + } + } + } else { + "MISSING STYLE".to_string() + } + } else { + "MISSING".to_string() } } @@ -459,11 +544,6 @@ mod tests { } file_writer.finish().await.unwrap(); - let file_scheduler = scheduler.open_file(&tmp_path).await.unwrap(); - FileReader::print_all_metadata(&file_scheduler) - .await - .unwrap(); - for read_size in [32, 1024, 1024 * 1024] { let file_scheduler = scheduler.open_file(&tmp_path).await.unwrap(); let file_reader = FileReader::try_open(file_scheduler, (*schema).clone()) diff --git a/rust/lance-file/src/v2/writer.rs b/rust/lance-file/src/v2/writer.rs index 6ff61dcf8b..bb85f47823 100644 --- a/rust/lance-file/src/v2/writer.rs +++ b/rust/lance-file/src/v2/writer.rs @@ -291,3 +291,43 @@ impl FileWriter { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::types::Float64Type; + use arrow_array::RecordBatchReader; + use lance_datagen::{array, gen, BatchCount, RowCount}; + use lance_io::object_store::ObjectStore; + use object_store::path::Path; + + use crate::v2::writer::{FileWriter, FileWriterOptions}; + + #[tokio::test] + async fn test_basic_write() { + let tmp_dir = tempfile::tempdir().unwrap(); + let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned(); + let tmp_path = Path::parse(tmp_path).unwrap(); + let tmp_path = tmp_path.child("some_file.lance"); + let obj_store = Arc::new(ObjectStore::local()); + + let reader = gen() + .col(Some("score".to_string()), array::rand::()) + .into_reader_rows(RowCount::from(1000), BatchCount::from(10)); + + let writer = obj_store.create(&tmp_path).await.unwrap(); + + let mut file_writer = FileWriter::try_new( + writer, + (*reader.schema()).clone(), + FileWriterOptions::default(), + ) + .unwrap(); + + for batch in reader { + file_writer.write_batch(&batch.unwrap()).await.unwrap(); + } + file_writer.finish().await.unwrap(); + } +}