Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for rust-s3 backend. #30

Merged
merged 14 commits into from
Feb 2, 2024
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ jobs:
- run: cargo test --features http-async
- run: cargo test --features mmap-async-tokio
- run: cargo test --features tilejson
- run: cargo test --features s3-async-native
- run: cargo test --features s3-async-rustls
- run: cargo test
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pmtiles"
version = "0.5.2"
version = "0.6.0"
edition = "2021"
authors = ["Luke Seelenbinder <[email protected]>"]
license = "MIT OR Apache-2.0"
Expand All @@ -13,6 +13,8 @@ categories = ["science::geo"]
[features]
default = []
http-async = ["dep:tokio", "dep:reqwest"]
s3-async-native = ["dep:tokio", "dep:rust-s3", "rust-s3/tokio-native-tls"]
s3-async-rustls = ["dep:tokio", "dep:rust-s3", "rust-s3/tokio-rustls-tls"]
mmap-async-tokio = ["dep:tokio", "dep:fmmap", "fmmap?/tokio-async"]
tilejson = ["dep:tilejson", "dep:serde", "dep:serde_json"]

Expand All @@ -34,6 +36,7 @@ thiserror = "1"
tilejson = { version = "0.4", optional = true }
tokio = { version = "1", default-features = false, features = ["io-util"], optional = true }
varint-rs = "2"
rust-s3 = { version = "0.33.0", optional = true, default-features = false, features = ["fail-on-err"] }

[dev-dependencies]
flate2 = "1"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ originally created by Brandon Liu for Protomaps.
- Backends supported:
- Async `mmap` (Tokio) for local files
- Async `http` and `https` (Reqwuest + Tokio) for URLs
- Async `s3` (Rust-S3 + Tokio) for S3-compatible buckets

## Plans & TODOs

Expand Down
2 changes: 2 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ test:
cargo test --features http-async
cargo test --features mmap-async-tokio
cargo test --features tilejson
cargo test --features s3-async-native
cargo test --features s3-async-rustls
cargo test
RUSTDOCFLAGS="-D warnings" cargo doc --no-deps

Expand Down
52 changes: 45 additions & 7 deletions src/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,31 @@ use async_trait::async_trait;
use bytes::Bytes;
#[cfg(feature = "http-async")]
use reqwest::{Client, IntoUrl};
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
#[cfg(any(
feature = "http-async",
feature = "mmap-async-tokio",
feature = "s3-async-rustls",
feature = "s3-async-native"
))]
use tokio::io::AsyncReadExt;

#[cfg(feature = "http-async")]
use crate::backend::HttpBackend;
#[cfg(feature = "mmap-async-tokio")]
use crate::backend::MmapBackend;
#[cfg(any(feature = "s3-async-rustls", feature = "s3-async-native"))]
use crate::backend::S3Backend;
use crate::cache::DirCacheResult;
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
#[cfg(any(
feature = "http-async",
feature = "mmap-async-tokio",
feature = "s3-async-native",
feature = "s3-async-rustls"
))]
use crate::cache::{DirectoryCache, NoCache};
use crate::directory::{DirEntry, Directory};
use crate::error::{PmtError, PmtResult};
use crate::header::{HEADER_SIZE, MAX_INITIAL_BYTES};
#[cfg(feature = "http-async")]
use crate::http::HttpBackend;
#[cfg(feature = "mmap-async-tokio")]
use crate::mmap::MmapBackend;
use crate::tile::tile_id;
use crate::{Compression, Header};

Expand Down Expand Up @@ -263,6 +275,32 @@ impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<MmapBackend, C> {
}
}

#[cfg(any(feature = "s3-async-native", feature = "s3-async-rustls"))]
impl AsyncPmTilesReader<S3Backend, NoCache> {
/// Creates a new `PMTiles` reader from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_bucket_path(bucket: s3::Bucket, path: String) -> PmtResult<Self> {
Self::new_with_cached_bucket_path(NoCache, bucket, path).await
}
}

#[cfg(any(feature = "s3-async-native", feature = "s3-async-rustls"))]
impl<C: DirectoryCache + Sync + Send> AsyncPmTilesReader<S3Backend, C> {
/// Creates a new `PMTiles` reader with cache from a URL using the Reqwest backend.
///
/// Fails if [url] does not exist or is an invalid archive. (Note: HTTP requests are made to validate it.)
pub async fn new_with_cached_bucket_path(
cache: C,
bucket: s3::Bucket,
path: String,
) -> PmtResult<Self> {
let backend = S3Backend::from(bucket, path);

Self::try_from_cached_source(backend, cache).await
}
}

#[async_trait]
pub trait AsyncBackend {
/// Reads exactly `length` bytes starting at `offset`
Expand All @@ -276,7 +314,7 @@ pub trait AsyncBackend {
#[cfg(feature = "mmap-async-tokio")]
mod tests {
use super::AsyncPmTilesReader;
use crate::mmap::MmapBackend;
use crate::backend::MmapBackend;
use crate::tests::{RASTER_FILE, VECTOR_FILE};

#[tokio::test]
Expand Down
22 changes: 13 additions & 9 deletions src/http.rs → src/backend/http.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use async_trait::async_trait;
use bytes::Bytes;
use reqwest::header::{HeaderValue, RANGE};
use reqwest::{Client, IntoUrl, Method, Request, StatusCode, Url};
use reqwest::{
header::{HeaderValue, RANGE},
Client, IntoUrl, Method, Request, StatusCode, Url,
};

use crate::async_reader::AsyncBackend;
use crate::error::{PmtHttpError, PmtResult};
use crate::{async_reader::AsyncBackend, error::PmtResult, PmtError};

pub struct HttpBackend {
client: Client,
Expand All @@ -28,26 +29,29 @@ impl AsyncBackend for HttpBackend {
if data.len() == length {
Ok(data)
} else {
Err(PmtHttpError::UnexpectedNumberOfBytesReturned(length, data.len()).into())
Err(PmtError::UnexpectedNumberOfBytesReturned(
length,
data.len(),
))
}
}

async fn read(&self, offset: usize, length: usize) -> PmtResult<Bytes> {
let end = offset + length - 1;
let range = format!("bytes={offset}-{end}");
let range = HeaderValue::try_from(range).map_err(PmtHttpError::from)?;
let range = HeaderValue::try_from(range)?;

let mut req = Request::new(Method::GET, self.pmtiles_url.clone());
req.headers_mut().insert(RANGE, range);

let response = self.client.execute(req).await?.error_for_status()?;
if response.status() != StatusCode::PARTIAL_CONTENT {
return Err(PmtHttpError::RangeRequestsUnsupported.into());
return Err(PmtError::RangeRequestsUnsupported);
}

let response_bytes = response.bytes().await?;
if response_bytes.len() > length {
Err(PmtHttpError::ResponseBodyTooLong(response_bytes.len(), length).into())
Err(PmtError::ResponseBodyTooLong(response_bytes.len(), length))
} else {
Ok(response_bytes)
}
Expand All @@ -56,8 +60,8 @@ impl AsyncBackend for HttpBackend {

#[cfg(test)]
mod tests {
use super::*;
use crate::async_reader::AsyncPmTilesReader;
use crate::http::HttpBackend;

static TEST_URL: &str =
"https://protomaps.github.io/PMTiles/protomaps(vector)ODbL_firenze.pmtiles";
Expand Down
File renamed without changes.
17 changes: 17 additions & 0 deletions src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#[cfg(any(feature = "s3-async-native", feature = "s3-async-rustls"))]
mod s3;

#[cfg(any(feature = "s3-async-native", feature = "s3-async-rustls"))]
pub use s3::S3Backend;

#[cfg(feature = "http-async")]
mod http;

#[cfg(feature = "http-async")]
pub use http::HttpBackend;

#[cfg(feature = "mmap-async-tokio")]
mod mmap;

#[cfg(feature = "mmap-async-tokio")]
pub use mmap::MmapBackend;
55 changes: 55 additions & 0 deletions src/backend/s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use async_trait::async_trait;
use bytes::Bytes;
use s3::Bucket;

use crate::{
async_reader::AsyncBackend,
error::PmtError::{ResponseBodyTooLong, UnexpectedNumberOfBytesReturned},
};

pub struct S3Backend {
bucket: Bucket,
pmtiles_path: String,
}

impl S3Backend {
#[must_use]
pub fn from(bucket: Bucket, pmtiles_path: String) -> S3Backend {
Self {
bucket,
pmtiles_path,
}
}
}

#[async_trait]
impl AsyncBackend for S3Backend {
async fn read_exact(&self, offset: usize, length: usize) -> crate::error::PmtResult<Bytes> {
let data = self.read(offset, length).await?;

if data.len() == length {
Ok(data)
} else {
Err(UnexpectedNumberOfBytesReturned(length, data.len()))
}
}

async fn read(&self, offset: usize, length: usize) -> crate::error::PmtResult<Bytes> {
let response = self
.bucket
.get_object_range(
self.pmtiles_path.as_str(),
offset as _,
Some((offset + length - 1) as _),
)
.await?;

let response_bytes = response.bytes();

if response_bytes.len() > length {
Err(ResponseBodyTooLong(response_bytes.len(), length))
} else {
Ok(response_bytes.clone())
}
}
}
2 changes: 0 additions & 2 deletions src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ impl Debug for Directory {

impl Directory {
/// Find the directory entry for a given tile ID.
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
#[must_use]
pub fn find_tile_id(&self, tile_id: u64) -> Option<&DirEntry> {
match self.entries.binary_search_by(|e| e.tile_id.cmp(&tile_id)) {
Expand Down Expand Up @@ -97,7 +96,6 @@ pub struct DirEntry {
pub(crate) run_length: u32,
}

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
impl DirEntry {
pub(crate) fn is_leaf(&self) -> bool {
self.run_length == 0
Expand Down
36 changes: 18 additions & 18 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,30 @@ pub enum PmtError {
#[cfg(feature = "mmap-async-tokio")]
#[error("Unable to open mmap file")]
UnableToOpenMmapFile,
#[cfg(feature = "http-async")]
#[error("{0}")]
Http(#[from] PmtHttpError),
}

#[cfg(feature = "http-async")]
#[derive(Debug, Error)]
pub enum PmtHttpError {
#[cfg(any(
feature = "http-async",
feature = "s3-async-native",
feature = "s3-async-rustls"
))]
#[error("Unexpected number of bytes returned [expected: {0}, received: {1}].")]
UnexpectedNumberOfBytesReturned(usize, usize),
#[cfg(feature = "http-async")]
#[error("Range requests unsupported")]
RangeRequestsUnsupported,
#[cfg(any(
feature = "http-async",
feature = "s3-async-native",
feature = "s3-async-rustls"
))]
#[error("HTTP response body is too long, Response {0}B > requested {1}B")]
ResponseBodyTooLong(usize, usize),
#[error("HTTP error {0}")]
#[cfg(feature = "http-async")]
#[error(transparent)]
Http(#[from] reqwest::Error),
#[error("{0}")]
#[cfg(feature = "http-async")]
#[error(transparent)]
InvalidHeaderValue(#[from] reqwest::header::InvalidHeaderValue),
}

// This is required because thiserror #[from] does not support two-level conversion.
#[cfg(feature = "http-async")]
impl From<reqwest::Error> for PmtError {
fn from(e: reqwest::Error) -> Self {
Self::Http(PmtHttpError::Http(e))
}
#[cfg(any(feature = "s3-async-rustls", feature = "s3-async-native"))]
#[error(transparent)]
S3(#[from] s3::error::S3Error),
}
2 changes: 0 additions & 2 deletions src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use bytes::{Buf, Bytes};

use crate::error::{PmtError, PmtResult};

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
pub(crate) const MAX_INITIAL_BYTES: usize = 16_384;
#[cfg(any(feature = "http-async", feature = "mmap-async-tokio", test))]
pub(crate) const HEADER_SIZE: usize = 127;

#[allow(dead_code)]
Expand Down
45 changes: 33 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,49 @@
#![forbid(unsafe_code)]

mod tile;
pub use directory::{DirEntry, Directory};
pub use error::{PmtError, PmtResult};

mod header;
pub use crate::header::{Compression, Header, TileType};
pub use header::{Compression, Header, TileType};

mod directory;
pub use directory::{DirEntry, Directory};
#[cfg(any(feature = "s3-async-rustls", feature = "s3-async-native"))]
pub use backend::S3Backend;

#[cfg(any(feature = "s3-async-rustls", feature = "s3-async-native"))]
pub use s3;

mod error;
#[cfg(feature = "http-async")]
pub use error::PmtHttpError;
pub use error::{PmtError, PmtResult};
pub use backend::HttpBackend;

#[cfg(feature = "http-async")]
pub mod http;
pub use reqwest;

#[cfg(feature = "mmap-async-tokio")]
pub mod mmap;
pub use backend::MmapBackend;

mod tile;

mod header;

mod directory;

mod error;

mod backend;

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
#[cfg(any(
feature = "http-async",
feature = "mmap-async-tokio",
feature = "s3-async-rustls",
feature = "s3-async-native"
))]
pub mod async_reader;

#[cfg(any(feature = "http-async", feature = "mmap-async-tokio"))]
#[cfg(any(
feature = "http-async",
feature = "mmap-async-tokio",
feature = "s3-async-native",
feature = "s3-async-rustls"
))]
pub mod cache;

#[cfg(test)]
Expand Down
Loading
Loading