Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to cleanup and prepare for directory caching #23

Merged
merged 1 commit into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ jobs:
- run: cargo test --features http-async
- run: cargo test --features mmap-async-tokio
- run: cargo test --features tilejson
- run: cargo test
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pmtiles"
version = "0.3.1"
version = "0.4.0"
edition = "2021"
authors = ["Luke Seelenbinder <[email protected]>"]
license = "MIT OR Apache-2.0"
Expand Down Expand Up @@ -36,10 +36,10 @@ tokio = { version = "1", default-features = false, features = ["io-util"], optio
varint-rs = "2"

[dev-dependencies]
flate2 = "1"
fmmap = { version = "0.3", features = ["tokio-async"] }
reqwest = { version = "0.11", features = ["rustls-tls-webpki-roots"] }
tokio = { version = "1", features = ["test-util", "macros", "rt"] }
flate2 = "1"

[package.metadata.docs.rs]
all-features = true
3 changes: 2 additions & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ test:
cargo test --features http-async
cargo test --features mmap-async-tokio
cargo test --features tilejson
cargo test
RUSTDOCFLAGS="-D warnings" cargo doc --no-deps

# Run cargo fmt and cargo clippy
Expand All @@ -25,7 +26,7 @@ fmt:

# Run cargo clippy
clippy:
cargo clippy --workspace --all-targets --bins --tests --lib --benches -- -D warnings
cargo clippy --workspace --all-targets --all-features --bins --tests --lib --benches -- -D warnings

# Build and open code documentation
docs:
Expand Down
151 changes: 58 additions & 93 deletions src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ use crate::header::{HEADER_SIZE, MAX_INITIAL_BYTES};
use crate::http::HttpBackend;
#[cfg(feature = "mmap-async-tokio")]
use crate::mmap::MmapBackend;
use crate::tile::{tile_id, Tile};
use crate::tile::tile_id;
use crate::{Compression, Header};

pub struct AsyncPmTilesReader<B: AsyncBackend> {
pub header: Header,
pub struct AsyncPmTilesReader<B> {
backend: B,
header: Header,
root_directory: Directory,
}

Expand All @@ -30,11 +30,13 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
///
/// Note: Prefer using new_with_* methods.
pub async fn try_from_source(backend: B) -> Result<Self, Error> {
let mut initial_bytes = backend.read_initial_bytes().await?;

let header_bytes = initial_bytes.split_to(HEADER_SIZE);
// Read the first 127 and up to 16,384 bytes to ensure we can initialize the header and root directory.
let mut initial_bytes = backend.read(0, MAX_INITIAL_BYTES).await?;
if initial_bytes.len() < HEADER_SIZE {
return Err(Error::InvalidHeader);
}

let header = Header::try_from_bytes(header_bytes)?;
let header = Header::try_from_bytes(initial_bytes.split_to(HEADER_SIZE))?;

let directory_bytes = initial_bytes
.split_off((header.root_offset as usize) - HEADER_SIZE)
Expand All @@ -44,45 +46,37 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
Self::read_compressed_directory(header.internal_compression, directory_bytes).await?;

Ok(Self {
header,
backend,
header,
root_directory,
})
}

/// Fetches a [Tile] from the archive.
pub async fn get_tile(&self, z: u8, x: u64, y: u64) -> Option<Tile> {
/// Fetches tile bytes from the archive.
pub async fn get_tile(&self, z: u8, x: u64, y: u64) -> Option<Bytes> {
let tile_id = tile_id(z, x, y);
let entry = self.find_tile_entry(tile_id, None, 0).await?;

let data = self
.backend
.read_exact(
(self.header.data_offset + entry.offset) as _,
entry.length as _,
)
.await
.ok()?;
let entry = self.find_tile_entry(tile_id).await?;

Some(Tile {
data,
tile_type: self.header.tile_type,
tile_compression: self.header.tile_compression,
})
let offset = (self.header.data_offset + entry.offset) as _;
let length = entry.length as _;
let data = self.backend.read_exact(offset, length).await.ok()?;

Some(data)
}

/// Access header information.
pub fn get_header(&self) -> &Header {
&self.header
}

/// Gets metadata from the archive.
///
/// Note: by spec, this should be valid JSON. This method currently returns a [String].
/// This may change in the future.
pub async fn get_metadata(&self) -> Result<String, Error> {
let metadata = self
.backend
.read_exact(
self.header.metadata_offset as _,
self.header.metadata_length as _,
)
.await?;
let offset = self.header.metadata_offset as _;
let length = self.header.metadata_length as _;
let metadata = self.backend.read_exact(offset, length).await?;

let decompressed_metadata =
Self::decompress(self.header.internal_compression, metadata).await?;
Expand Down Expand Up @@ -132,71 +126,52 @@ impl<B: AsyncBackend + Sync + Send> AsyncPmTilesReader<B> {
Ok(tj)
}

#[async_recursion]
async fn find_tile_entry(
&self,
tile_id: u64,
next_dir: Option<Directory>,
depth: u8,
) -> Option<Entry> {
// Max recursion...
if depth >= 4 {
return None;
/// Recursively locates a tile in the archive.
async fn find_tile_entry(&self, tile_id: u64) -> Option<Entry> {
let entry = self.root_directory.find_tile_id(tile_id);
if let Some(entry) = entry {
if entry.is_leaf() {
return self.find_entry_rec(tile_id, entry, 0).await;
}
}
entry.cloned()
nyurik marked this conversation as resolved.
Show resolved Hide resolved
}

let next_dir = next_dir.as_ref().unwrap_or(&self.root_directory);

match next_dir.find_tile_id(tile_id) {
None => None,
Some(needle) => {
if needle.run_length == 0 {
// Leaf directory
let next_dir = self
.read_directory(
(self.header.leaf_offset + needle.offset) as _,
needle.length as _,
)
.await
.ok()?;
self.find_tile_entry(tile_id, Some(next_dir), depth + 1)
.await
#[async_recursion]
async fn find_entry_rec(&self, tile_id: u64, entry: &Entry, depth: u8) -> Option<Entry> {
// the recursion is done as two functions because it is a bit cleaner,
// and it allows directory to be cached later without cloning it first.
let offset = (self.header.leaf_offset + entry.offset) as _;
let length = entry.length as _;
let dir = self.read_directory(offset, length).await.ok()?;
let entry = dir.find_tile_id(tile_id);

if let Some(entry) = entry {
if entry.is_leaf() {
return if depth <= 4 {
self.find_entry_rec(tile_id, entry, depth + 1).await
} else {
Some(needle.clone())
}
None
};
}
}

entry.cloned()
}

async fn read_directory(&self, offset: usize, length: usize) -> Result<Directory, Error> {
Self::read_directory_with_backend(
&self.backend,
self.header.internal_compression,
offset,
length,
)
.await
let data = self.backend.read_exact(offset, length).await?;
Self::read_compressed_directory(self.header.internal_compression, data).await
}

async fn read_compressed_directory(
compression: Compression,
bytes: Bytes,
) -> Result<Directory, Error> {
let decompressed_bytes = Self::decompress(compression, bytes).await?;

Directory::try_from(decompressed_bytes)
}

async fn read_directory_with_backend(
backend: &B,
compression: Compression,
offset: usize,
length: usize,
) -> Result<Directory, Error> {
let directory_bytes = backend.read_exact(offset, length).await?;

Self::read_compressed_directory(compression, directory_bytes).await
}

async fn decompress(compression: Compression, bytes: Bytes) -> Result<Bytes, Error> {
let mut decompressed_bytes = Vec::with_capacity(bytes.len() * 2);
match compression {
Expand Down Expand Up @@ -229,8 +204,8 @@ impl AsyncPmTilesReader<MmapBackend> {
/// Creates a new PMTiles reader from a file path using the async mmap backend.
///
/// Fails if [p] does not exist or is an invalid archive.
pub async fn new_with_path<P: AsRef<Path>>(p: P) -> Result<Self, Error> {
let backend = MmapBackend::try_from(p).await?;
pub async fn new_with_path<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let backend = MmapBackend::try_from(path).await?;

Self::try_from_source(backend).await
}
Expand All @@ -243,16 +218,6 @@ pub trait AsyncBackend {

/// Reads up to `length` bytes starting at `offset`.
async fn read(&self, offset: usize, length: usize) -> Result<Bytes, Error>;

/// Read the first 127 and up to 16,384 bytes to ensure we can initialize the header and root directory.
async fn read_initial_bytes(&self) -> Result<Bytes, Error> {
let bytes = self.read(0, MAX_INITIAL_BYTES).await?;
if bytes.len() < HEADER_SIZE {
return Err(Error::InvalidHeader);
}

Ok(bytes)
}
}

#[cfg(test)]
Expand All @@ -274,11 +239,11 @@ mod tests {
let tile = tiles.get_tile(z, x, y).await.unwrap();

assert_eq!(
tile.data.len(),
tile.len(),
fixture_bytes.len(),
"Expected tile length to match."
);
assert_eq!(tile.data, fixture_bytes, "Expected tile to match fixture.");
assert_eq!(tile, fixture_bytes, "Expected tile to match fixture.");
}

#[tokio::test]
Expand Down
9 changes: 8 additions & 1 deletion src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl Directory {
// https://github.com/protomaps/PMTiles/blob/9c7f298fb42290354b8ed0a9b2f50e5c0d270c40/js/index.ts#L210
if next_id > 0 {
let previous_tile = self.entries.get(next_id - 1)?;
if previous_tile.run_length == 0
if previous_tile.is_leaf()
|| tile_id - previous_tile.tile_id < previous_tile.run_length as u64
{
return Some(previous_tile);
Expand Down Expand Up @@ -88,6 +88,13 @@ pub(crate) struct Entry {
pub(crate) run_length: u32,
}

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
impl Entry {
pub fn is_leaf(&self) -> bool {
self.run_length == 0
}
}

#[cfg(test)]
mod tests {
use std::io::{BufReader, Read, Write};
Expand Down
30 changes: 19 additions & 11 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,27 @@ impl Header {
tiles: sources,
minzoom: self.min_zoom,
maxzoom: self.max_zoom,
bounds: tilejson::Bounds::new(
self.min_longitude as f64,
self.min_latitude as f64,
self.max_longitude as f64,
self.max_latitude as f64,
),
center: tilejson::Center::new(
self.center_longitude as f64,
self.center_latitude as f64,
self.center_zoom,
),
bounds: self.get_bounds(),
center: self.get_center(),
}
}

pub fn get_bounds(&self) -> tilejson::Bounds {
tilejson::Bounds::new(
self.min_longitude as f64,
self.min_latitude as f64,
self.max_longitude as f64,
self.max_latitude as f64,
)
}

pub fn get_center(&self) -> tilejson::Center {
tilejson::Center::new(
self.center_longitude as f64,
self.center_latitude as f64,
self.center_zoom,
)
}
}

#[derive(Debug, Eq, PartialEq, Copy, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,6 @@ mod tests {
let client = reqwest::Client::builder().use_rustls_tls().build().unwrap();
let backend = HttpBackend::try_from(client, TEST_URL).unwrap();

let _tiles = AsyncPmTilesReader::try_from_source(backend).await.unwrap();
AsyncPmTilesReader::try_from_source(backend).await.unwrap();
}
}
10 changes: 0 additions & 10 deletions src/tile.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use bytes::Bytes;

use crate::{Compression, TileType};

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio", test))]
pub(crate) fn tile_id(z: u8, x: u64, y: u64) -> u64 {
if z == 0 {
Expand All @@ -15,12 +11,6 @@ pub(crate) fn tile_id(z: u8, x: u64, y: u64) -> u64 {
base_id + tile_id
}

pub struct Tile {
pub data: Bytes,
pub tile_type: TileType,
pub tile_compression: Compression,
}

#[cfg(test)]
mod test {
use super::tile_id;
Expand Down