Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
angelip2303 committed Feb 8, 2024
1 parent 03bb447 commit 337e2f9
Show file tree
Hide file tree
Showing 24 changed files with 342 additions and 293 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ version = "0.0.1"
edition = "2021"

[dependencies]
zarrs = { version = "0.7.3", default-features = false, features = [ "http", "gzip", "sharding" ] }
zarrs = { version = "0.11.6", default-features = false, features = [ "http", "gzip", "sharding", "opendal", "async", "ndarray" ] }
clap = { version = "4.1.8", features = ["derive"] }
serde_json = "1.0.108"
thiserror = "1.0.50"
Expand All @@ -14,7 +14,6 @@ sprs = "0.11.1"
rio_turtle = "0.8.4"
rio_xml = "0.8.4"
rio_api = "0.8.4"
safe-transmute = "0.11.2"
rayon = "1.8.0"
parking_lot = "0.12"

Expand Down
11 changes: 6 additions & 5 deletions examples/http_bench.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use remote_hdt::error::RemoteHDTError;
use remote_hdt::storage::matrix::MatrixLayout;
use remote_hdt::storage::ops::Ops;
use remote_hdt::storage::params::Serialization;
use remote_hdt::storage::HTTPStorage;
use remote_hdt::storage::params::{Backend, Serialization};
use remote_hdt::storage::Storage;
use std::time::Instant;

fn main() -> Result<(), RemoteHDTError> {
let mut remote_hdt = HTTPStorage::new(MatrixLayout, Serialization::Zarr);
let arr = remote_hdt
.connect("https://raw.githubusercontent.com/weso/RemoteHDT/master/resources/root.zarr")?;
let mut binding = Storage::new(MatrixLayout, Serialization::Zarr);
let arr = binding.load(Backend::HTTP(
"https://raw.githubusercontent.com/weso/RemoteHDT/master/resources/root.zarr",
))?;

let before = Instant::now();
arr.get_subject("<http://example.org/alan>")?;
Expand Down
8 changes: 4 additions & 4 deletions examples/load_bench.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use remote_hdt::error::RemoteHDTError;
use remote_hdt::storage::params::Serialization;
use remote_hdt::storage::params::{Backend, Serialization};
use remote_hdt::storage::tabular::TabularLayout;
use remote_hdt::storage::LocalStorage;
use remote_hdt::storage::Storage;
use std::env;
use std::time::Instant;

Expand All @@ -16,8 +16,8 @@ fn main() -> Result<(), RemoteHDTError> {

let before = Instant::now();

LocalStorage::new(TabularLayout, Serialization::Zarr)
.load(format!("{}.zarr", zarr_path).as_str())?;
Storage::new(TabularLayout, Serialization::Zarr)
.load(Backend::FileSystem(format!("{}.zarr", zarr_path).as_str()))?;

println!("Elapsed time: {:.2?}", before.elapsed());

Expand Down
8 changes: 4 additions & 4 deletions examples/ntriples/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use remote_hdt::error::RemoteHDTError;
use remote_hdt::storage::params::{ChunkingStrategy, ReferenceSystem, Serialization};
use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization};
use remote_hdt::storage::tabular::TabularLayout;
use remote_hdt::storage::LocalStorage;
use remote_hdt::storage::Storage;

pub fn main() -> Result<(), RemoteHDTError> {
LocalStorage::new(TabularLayout, Serialization::Zarr).serialize(
"root.zarr",
Storage::new(TabularLayout, Serialization::Zarr).serialize(
Backend::FileSystem("root.zarr"),
"examples/ntriples/rdf.nt",
ChunkingStrategy::Chunk,
ReferenceSystem::SPO,
Expand Down
8 changes: 4 additions & 4 deletions examples/query_bench.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use remote_hdt::error::RemoteHDTError;
use remote_hdt::storage::matrix::MatrixLayout;
use remote_hdt::storage::ops::Ops;
use remote_hdt::storage::params::Serialization;
use remote_hdt::storage::LocalStorage;
use remote_hdt::storage::params::{Backend, Serialization};
use remote_hdt::storage::Storage;
use std::env;
use std::time::Instant;

Expand All @@ -17,8 +17,8 @@ fn main() -> Result<(), RemoteHDTError> {
let number_of_universities: &String = &args[1];
let zarr_path = format!("{}-lubm", number_of_universities);

let mut remote_hdt = LocalStorage::new(MatrixLayout, Serialization::Zarr);
let arr = remote_hdt.load(format!("{}.zarr", zarr_path).as_str())?;
let mut binding = Storage::new(MatrixLayout, Serialization::Zarr);
let arr = binding.load(Backend::FileSystem(format!("{}.zarr", zarr_path).as_str()))?;

let before = Instant::now();
arr.get_subject(SUBJECT)?;
Expand Down
8 changes: 4 additions & 4 deletions examples/rdf_xml/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use remote_hdt::error::RemoteHDTError;
use remote_hdt::storage::params::{ChunkingStrategy, ReferenceSystem, Serialization};
use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization};
use remote_hdt::storage::tabular::TabularLayout;
use remote_hdt::storage::LocalStorage;
use remote_hdt::storage::Storage;

pub fn main() -> Result<(), RemoteHDTError> {
LocalStorage::new(TabularLayout, Serialization::Zarr).serialize(
"root.zarr",
Storage::new(TabularLayout, Serialization::Zarr).serialize(
Backend::FileSystem("root.zarr"),
"examples/rdf_xml/rdf.rdf",
ChunkingStrategy::Chunk,
ReferenceSystem::SPO,
Expand Down
8 changes: 4 additions & 4 deletions examples/serialize_bench.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use remote_hdt::error::RemoteHDTError;
use remote_hdt::storage::matrix::MatrixLayout;
use remote_hdt::storage::params::{ChunkingStrategy, ReferenceSystem, Serialization};
use remote_hdt::storage::LocalStorage;
use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization};
use remote_hdt::storage::Storage;
use std::env;
use std::time::Instant;

Expand All @@ -21,8 +21,8 @@ fn main() -> Result<(), RemoteHDTError> {

let before = Instant::now();

LocalStorage::new(MatrixLayout, Serialization::Zarr).serialize(
zarr_path,
Storage::new(MatrixLayout, Serialization::Zarr).serialize(
Backend::FileSystem(zarr_path),
rdf_path,
ChunkingStrategy::Sharding(*shard_size),
ReferenceSystem::SPO,
Expand Down
10 changes: 5 additions & 5 deletions examples/turtle/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use remote_hdt::error::RemoteHDTError;
use remote_hdt::storage::params::{ChunkingStrategy, ReferenceSystem, Serialization};
use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization};
use remote_hdt::storage::tabular::TabularLayout;
use remote_hdt::storage::LocalStorage;
use remote_hdt::storage::Storage;

pub fn main() -> Result<(), RemoteHDTError> {
LocalStorage::new(TabularLayout, Serialization::Zarr).serialize(
"root.zarr",
"examples/turtle/rdf.ttk",
Storage::new(TabularLayout, Serialization::Zarr).serialize(
Backend::FileSystem("root.zarr"),
"examples/turtle/rdf.ttl",
ChunkingStrategy::Chunk,
ReferenceSystem::SPO,
)?;
Expand Down
3 changes: 1 addition & 2 deletions src/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::collections::HashSet;

use fcsd::Set;
use std::collections::HashSet;

use crate::storage::params::ReferenceSystem;

Expand Down
26 changes: 12 additions & 14 deletions src/engine/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,31 @@ use zarrs::array::Array;
use zarrs::array_subset::ArraySubset;
use zarrs::storage::ReadableStorageTraits;

use crate::storage::ZarrType;
use crate::utils::columns_per_shard;
use crate::utils::rows_per_shard;

use super::EngineResult;
use super::EngineStrategy;

impl<T: ReadableStorageTraits> EngineStrategy<Vec<ZarrType>> for Array<T> {
fn get_first_term(&self, index: usize) -> EngineResult<Vec<ZarrType>> {
impl<T: ReadableStorageTraits> EngineStrategy<Vec<usize>> for Array<T> {
fn get_first_term(&self, index: usize) -> EngineResult<Vec<usize>> {
let index_to_chunk = index as u64 / rows_per_shard(self);
let chunk_to_index = index as u64 % rows_per_shard(self);
Ok(self
.retrieve_chunk_subset_elements(
&[index_to_chunk, 0],
&ArraySubset::new_with_start_end_inc(
vec![chunk_to_index, 0],
vec![chunk_to_index, columns_per_shard(self) - 1],
)?,
)?
.to_vec())
let ans = self.retrieve_chunk_subset_elements(
&[index_to_chunk, 0],
&ArraySubset::new_with_start_end_inc(
vec![chunk_to_index, 0],
vec![chunk_to_index, columns_per_shard(self) - 1],
)?,
)?;
Ok(ans.to_vec())
}

fn get_second_term(&self, _index: usize) -> EngineResult<Vec<ZarrType>> {
fn get_second_term(&self, _index: usize) -> EngineResult<Vec<usize>> {
unimplemented!()
}

fn get_third_term(&self, index: usize) -> EngineResult<Vec<ZarrType>> {
fn get_third_term(&self, index: usize) -> EngineResult<Vec<usize>> {
let start = vec![0, index as u64];
let end = vec![self.shape()[0], index as u64];
let shape = &ArraySubset::new_with_start_end_inc(start, end)?;
Expand Down
13 changes: 13 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use zarrs::array::codec::bytes_to_bytes::gzip::GzipCompressionLevelError;
use zarrs::array::ArrayCreateError;
use zarrs::array::ArrayError;
use zarrs::array_subset::IncompatibleDimensionalityError;
use zarrs::array_subset::IncompatibleStartEndIndicesError;
use zarrs::group::GroupCreateError;
use zarrs::storage::store::FilesystemStoreCreateError;
use zarrs::storage::store::HTTPStoreCreateError;
Expand All @@ -29,6 +30,8 @@ pub enum RemoteHDTError {
HTTPCreate(#[from] HTTPStoreCreateError),
#[error("The Path already exists, please provide an empty path")]
PathExists,
#[error("The Path does not exist, please provide another path")]
PathDoesNotExist,
#[error(transparent)]
GZipCompression(#[from] GzipCompressionLevelError),
#[error("The Graph you are trying to serialize is empty")]
Expand All @@ -45,6 +48,14 @@ pub enum RemoteHDTError {
ReferenceSystemNotInJSON,
#[error("Error serializing the triples of the Graph")]
TripleSerialization,
#[error("The provided path is not valid")]
OsPathToString,
#[error(transparent)]
Opendal(#[from] zarrs::opendal::Error),
#[error("The provided backend is read-only")]
ReadOnlyBackend,
#[error("Error while parsing the RDF graph")]
RdfParse,
}

#[derive(Error, Debug)]
Expand All @@ -55,6 +66,8 @@ pub enum EngineError {
Array(#[from] ArrayError),
#[error("Operation error")]
Operation,
#[error(transparent)]
IncompatibleStartEndIndicesError(#[from] IncompatibleStartEndIndicesError),
}

#[derive(Error, Debug)]
Expand Down
7 changes: 4 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use clap::Parser;
use remote_hdt::storage::params::Backend;
use remote_hdt::storage::params::ChunkingStrategy;
use remote_hdt::storage::params::ReferenceSystem;
use remote_hdt::storage::params::Serialization;
use remote_hdt::storage::tabular::TabularLayout;
use remote_hdt::storage::LocalStorage;
use remote_hdt::storage::Storage;
use remote_hdt::storage::StorageResult;

#[derive(Parser, Debug)]
Expand All @@ -20,8 +21,8 @@ struct Args {

fn main() -> StorageResult<()> {
let args: Args = Args::parse();
LocalStorage::new(TabularLayout, Serialization::Sparse).serialize(
&args.zarr,
Storage::new(TabularLayout, Serialization::Sparse).serialize(
Backend::FileSystem(&args.zarr),
&args.rdf,
ChunkingStrategy::Chunk,
ReferenceSystem::SPO,
Expand Down
Loading

0 comments on commit 337e2f9

Please sign in to comment.