diff --git a/protos/file.proto b/protos/file.proto index 358d53bf3a..9b28f8e837 100644 --- a/protos/file.proto +++ b/protos/file.proto @@ -16,6 +16,22 @@ syntax = "proto3"; package lance.file; +// A file descriptor that describes the contents of a Lance file +message FileDescriptor { + // The schema of the file + Schema schema = 1; + // The number of rows in the file + uint64 length = 2; +} + +// A schema which describes the data type of each of the columns +message Schema { + // All fields in this file, including the nested fields. + repeated lance.file.Field fields = 1; + // Schema metadata. + map metadata = 5; +} + // Metadata of one Lance file. message Metadata { // 4 was used for StatisticsMetadata in the past, but has been moved to prevent diff --git a/rust/lance-file/Cargo.toml b/rust/lance-file/Cargo.toml index 85cf917a03..3f1ae03c37 100644 --- a/rust/lance-file/Cargo.toml +++ b/rust/lance-file/Cargo.toml @@ -23,9 +23,11 @@ arrow-schema.workspace = true arrow-select.workspace = true async-recursion.workspace = true async-trait.workspace = true +byteorder.workspace = true bytes.workspace = true datafusion-common.workspace = true futures.workspace = true +lance-datagen.workspace = true num_cpus.workspace = true num-traits.workspace = true object_store.workspace = true diff --git a/rust/lance-file/src/format.rs b/rust/lance-file/src/format.rs index 366f44d2aa..ed09a5f9cf 100644 --- a/rust/lance-file/src/format.rs +++ b/rust/lance-file/src/format.rs @@ -29,4 +29,5 @@ pub mod metadata; /// These version/magic values are written at the end of Lance files (e.g. versions/1.version) pub const MAJOR_VERSION: i16 = 0; pub const MINOR_VERSION: i16 = 2; +pub const MINOR_VERSION_TWO: u16 = 3; pub const MAGIC: &[u8; 4] = b"LANC"; diff --git a/rust/lance-file/src/v2.rs b/rust/lance-file/src/v2.rs index 364d2ccf3b..f6d28e32e5 100644 --- a/rust/lance-file/src/v2.rs +++ b/rust/lance-file/src/v2.rs @@ -1 +1,3 @@ pub(crate) mod io; +pub mod reader; +pub mod writer; diff --git a/rust/lance-file/src/v2/reader.rs b/rust/lance-file/src/v2/reader.rs new file mode 100644 index 0000000000..90bf44ee6b --- /dev/null +++ b/rust/lance-file/src/v2/reader.rs @@ -0,0 +1,515 @@ +use std::{io::Cursor, ops::Range, sync::Arc}; + +use arrow_array::RecordBatch; +use arrow_schema::Schema; +use byteorder::{ByteOrder, LittleEndian, ReadBytesExt}; +use bytes::{Bytes, BytesMut}; +use futures::{stream::BoxStream, StreamExt}; +use lance_encoding::{ + decoder::{BatchDecodeStream, ColumnInfo, DecodeBatchScheduler, PageInfo}, + EncodingsIo, +}; +use prost::Message; +use snafu::{location, Location}; + +use lance_core::{Error, Result}; +use lance_encoding::format::pb as pbenc; +use lance_io::{scheduler::FileScheduler, ReadBatchParams}; +use tokio::{sync::mpsc, task::JoinHandle}; + +use crate::{ + datatypes::{Fields, FieldsWithMeta}, + format::{pb, pbfile, MAGIC, MAJOR_VERSION, MINOR_VERSION_TWO}, +}; + +use super::io::LanceEncodingsIo; + +// TODO: Caching +pub struct CachedFileMetadata { + /// The schema of the file + pub file_schema: Schema, + /// The column metadatas + pub column_metadatas: Vec, + /// The number of rows in the file + pub num_rows: u64, +} + +pub struct FileReader { + scheduler: Arc, + file_schema: Schema, + column_infos: Vec, + num_rows: u64, +} + +struct Footer { + column_meta_start: u64, + // We don't use this today because we get here by subtracting backward from global_buf_start + #[allow(dead_code)] + column_meta_offsets_start: u64, + global_buff_start: u64, + global_buff_offsets_start: u64, + num_global_buffers: u32, + num_columns: u32, +} + +const FOOTER_LEN: usize = 48; + +impl FileReader { + async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> { + let file_size = scheduler.reader().size().await? as u64; + let begin = if file_size < scheduler.reader().block_size() as u64 { + 0 + } else { + file_size - scheduler.reader().block_size() as u64 + }; + let tail_bytes = scheduler.submit_single(begin..file_size).await?; + Ok((tail_bytes, file_size)) + } + + // Checks to make sure the footer is written correctly and returns the + // position of the file descriptor (which comes from the footer) + fn decode_footer(footer_bytes: &Bytes) -> Result