diff --git a/.gitignore b/.gitignore index 562046a..7f51346 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ benches/*/*.nt !resources/root.zarr .vscode heaptrack.* -tests/out \ No newline at end of file +tests/out +uniprotkb_* \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index bf0c655..ef7b0c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,7 +5,7 @@ version = "0.0.1" edition = "2021" [dependencies] -zarrs = { version = "0.6.0", default-features = false, features = [ "http", "gzip", "sharding" ] } +zarrs = { version = "0.12.4", default-features = false, features = [ "http", "gzip", "sharding", "async", "ndarray", "crc32c" ] } clap = { version = "4.1.8", features = ["derive"] } serde_json = "1.0.108" thiserror = "1.0.50" @@ -14,11 +14,8 @@ sprs = "0.11.1" rio_turtle = "0.8.4" rio_xml = "0.8.4" rio_api = "0.8.4" -safe-transmute = "0.11.2" rayon = "1.8.0" - -[target.'cfg(not(target_env = "msvc"))'.dependencies] -jemallocator = "0.5.0" +parking_lot = "0.12" [profile.release] codegen-units = 1 diff --git a/examples/http_bench.rs b/examples/http_bench.rs index 9c7ee89..0b9f4ea 100644 --- a/examples/http_bench.rs +++ b/examples/http_bench.rs @@ -1,20 +1,20 @@ -use remote_hdt::engine::EngineStrategy; -use remote_hdt::storage::matrix::MatrixLayout; -use remote_hdt::storage::HTTPStorage; +use remote_hdt::error::RemoteHDTError; +use remote_hdt::storage::layout::matrix::MatrixLayout; +use remote_hdt::storage::ops::Ops; +use remote_hdt::storage::params::{Backend, Serialization}; +use remote_hdt::storage::Storage; use std::time::Instant; -fn main() { - let mut remote_hdt = HTTPStorage::new(MatrixLayout); - let arr = remote_hdt - .connect("https://raw.githubusercontent.com/weso/RemoteHDT/master/resources/root.zarr") - .unwrap(); - let index = remote_hdt - .get_dictionary() - .get_subject_idx_unchecked(""); +fn main() -> Result<(), RemoteHDTError> { + let mut binding = Storage::new(MatrixLayout, Serialization::Zarr); + let arr = binding.load(Backend::HTTP( + "https://raw.githubusercontent.com/weso/RemoteHDT/master/resources/root.zarr", + ))?; let before = Instant::now(); - arr.get_subject(index).unwrap(); - let after = before.elapsed(); + arr.get_subject("")?; - println!("Elapsed time: {:.2?}", after) + println!("Elapsed time: {:.2?}", before.elapsed()); + + Ok(()) } diff --git a/examples/load_bench.rs b/examples/load_bench.rs index 667c1f4..d679950 100644 --- a/examples/load_bench.rs +++ b/examples/load_bench.rs @@ -1,21 +1,25 @@ -use remote_hdt::storage::tabular::TabularLayout; -use remote_hdt::storage::LocalStorage; +use remote_hdt::error::RemoteHDTError; +use remote_hdt::storage::layout::tabular::TabularLayout; +use remote_hdt::storage::params::{Backend, Serialization}; +use remote_hdt::storage::Storage; use std::env; use std::time::Instant; -fn main() { +fn main() -> Result<(), RemoteHDTError> { let args: Vec = env::args().collect(); if args.len() <= 1 { panic!("Usage: cargo run --example query_bench "); } + let number_of_universities: &String = &args[1]; let zarr_path = format!("{}-lubm", number_of_universities); let before = Instant::now(); - LocalStorage::new(TabularLayout) - .load(format!("{}.zarr", zarr_path).as_str()) - .unwrap(); - let after = before.elapsed(); - println!("Elapsed time: {:.2?}", after) + Storage::new(TabularLayout, Serialization::Zarr) + .load(Backend::FileSystem(format!("{}.zarr", zarr_path).as_str()))?; + + println!("Elapsed time: {:.2?}", before.elapsed()); + + Ok(()) } diff --git a/examples/ntriples/main.rs b/examples/ntriples/main.rs index fc6af9c..8ab2feb 100644 --- a/examples/ntriples/main.rs +++ b/examples/ntriples/main.rs @@ -1,13 +1,15 @@ -use remote_hdt::storage::tabular::TabularLayout; -use remote_hdt::storage::ChunkingStrategy; -use remote_hdt::storage::LocalStorage; +use remote_hdt::error::RemoteHDTError; +use remote_hdt::storage::layout::tabular::TabularLayout; +use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization}; +use remote_hdt::storage::Storage; -pub fn main() { - LocalStorage::new(TabularLayout) - .serialize( - "root.zarr", - "examples/ntriples/rdf.nt", - ChunkingStrategy::Chunk, - ) - .unwrap(); +pub fn main() -> Result<(), RemoteHDTError> { + Storage::new(TabularLayout, Serialization::Zarr).serialize( + Backend::FileSystem("root.zarr"), + "examples/ntriples/rdf.nt", + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + )?; + + Ok(()) } diff --git a/examples/query_bench.rs b/examples/query_bench.rs index f53499f..711effc 100644 --- a/examples/query_bench.rs +++ b/examples/query_bench.rs @@ -1,30 +1,29 @@ -use remote_hdt::engine::EngineStrategy; -use remote_hdt::storage::matrix::MatrixLayout; -use remote_hdt::storage::LocalStorage; +use remote_hdt::error::RemoteHDTError; +use remote_hdt::storage::layout::matrix::MatrixLayout; +use remote_hdt::storage::ops::Ops; +use remote_hdt::storage::params::{Backend, Serialization}; +use remote_hdt::storage::Storage; use std::env; use std::time::Instant; -const SUBJECT: &str = ""; +const SUBJECT: &str = ""; -fn main() { +fn main() -> Result<(), RemoteHDTError> { let args: Vec = env::args().collect(); if args.len() <= 1 { panic!("Usage: cargo run --example query_bench "); } + let number_of_universities: &String = &args[1]; let zarr_path = format!("{}-lubm", number_of_universities); - let mut remote_hdt = LocalStorage::new(MatrixLayout); - let arr = remote_hdt - .load(format!("{}.zarr", zarr_path).as_str()) - .unwrap(); - let index = remote_hdt - .get_dictionary() - .get_subject_idx_unchecked(SUBJECT); + let mut binding = Storage::new(MatrixLayout, Serialization::Zarr); + let arr = binding.load(Backend::FileSystem(format!("{}.zarr", zarr_path).as_str()))?; let before = Instant::now(); - arr.get_subject(index).unwrap(); - let after = before.elapsed(); + arr.get_object(SUBJECT)?; + + println!("Elapsed time: {:.2?}", before.elapsed()); - println!("Elapsed time: {:.2?}", after) + Ok(()) } diff --git a/examples/rdf_xml/main.rs b/examples/rdf_xml/main.rs index ef19d9a..c415701 100644 --- a/examples/rdf_xml/main.rs +++ b/examples/rdf_xml/main.rs @@ -1,13 +1,15 @@ -use remote_hdt::storage::tabular::TabularLayout; -use remote_hdt::storage::ChunkingStrategy; -use remote_hdt::storage::LocalStorage; +use remote_hdt::error::RemoteHDTError; +use remote_hdt::storage::layout::tabular::TabularLayout; +use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization}; +use remote_hdt::storage::Storage; -pub fn main() { - LocalStorage::new(TabularLayout) - .serialize( - "root.zarr", - "examples/rdf_xml/rdf.rdf", - ChunkingStrategy::Chunk, - ) - .unwrap(); +pub fn main() -> Result<(), RemoteHDTError> { + Storage::new(TabularLayout, Serialization::Zarr).serialize( + Backend::FileSystem("root.zarr"), + "examples/rdf_xml/rdf.rdf", + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + )?; + + Ok(()) } diff --git a/examples/serialize.rs b/examples/serialize.rs new file mode 100644 index 0000000..9ca4192 --- /dev/null +++ b/examples/serialize.rs @@ -0,0 +1,30 @@ +use remote_hdt::error::RemoteHDTError; +use remote_hdt::storage::layout::matrix::MatrixLayout; +use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization}; +use remote_hdt::storage::Storage; +use std::env; +use std::time::Instant; + +fn main() -> Result<(), RemoteHDTError> { + let args: Vec = env::args().collect(); + if args.len() <= 3 { + panic!("Usage: cargo run --example serialize "); + } + + let rdf_path = &args[1].as_str(); + let zarr_path = &args[2].as_str(); + let shard_size = &args[3].parse::().unwrap(); + + let before = Instant::now(); + + Storage::new(MatrixLayout, Serialization::Zarr).serialize( + Backend::FileSystem(zarr_path), + rdf_path, + ChunkingStrategy::Sharding(*shard_size), + ReferenceSystem::SPO, + )?; + + println!("Elapsed time: {:.2?}", before.elapsed()); + + Ok(()) +} diff --git a/examples/serialize_bench.rs b/examples/serialize_bench.rs deleted file mode 100644 index d9ee85c..0000000 --- a/examples/serialize_bench.rs +++ /dev/null @@ -1,31 +0,0 @@ -use remote_hdt::storage::matrix::MatrixLayout; -use remote_hdt::storage::ChunkingStrategy; -use remote_hdt::storage::LocalStorage; -use std::env; -use std::time::Instant; - -#[cfg(not(target_env = "msvc"))] -#[global_allocator] -static ALLOCATOR: jemallocator::Jemalloc = jemallocator::Jemalloc; - -fn main() { - let args: Vec = env::args().collect(); - if args.len() <= 3 { - panic!("Usage: cargo run --example serialize_bench "); - } - let rdf_path: &String = &args[1]; - let zarr_path: &String = &args[2]; - let shard_size: &String = &args[3]; - - let before = Instant::now(); - - LocalStorage::new(MatrixLayout) - .serialize( - &zarr_path.as_str(), - &rdf_path.as_str(), - ChunkingStrategy::Sharding(shard_size.parse::().unwrap()), - ) - .unwrap(); - - println!("Elapsed time: {:.2?}", before.elapsed()) -} diff --git a/examples/turtle/main.rs b/examples/turtle/main.rs index 2f689ee..76fe24a 100644 --- a/examples/turtle/main.rs +++ b/examples/turtle/main.rs @@ -1,13 +1,15 @@ -use remote_hdt::storage::tabular::TabularLayout; -use remote_hdt::storage::ChunkingStrategy; -use remote_hdt::storage::LocalStorage; +use remote_hdt::error::RemoteHDTError; +use remote_hdt::storage::layout::tabular::TabularLayout; +use remote_hdt::storage::params::{Backend, ChunkingStrategy, ReferenceSystem, Serialization}; +use remote_hdt::storage::Storage; -pub fn main() { - LocalStorage::new(TabularLayout) - .serialize( - "root.zarr", - "examples/turtle/rdf.ttk", - ChunkingStrategy::Chunk, - ) - .unwrap(); +pub fn main() -> Result<(), RemoteHDTError> { + Storage::new(TabularLayout, Serialization::Zarr).serialize( + Backend::FileSystem("root.zarr"), + "examples/turtle/rdf.ttl", + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + )?; + + Ok(()) } diff --git a/src/dictionary.rs b/src/dictionary.rs index 6a74608..d58d06c 100644 --- a/src/dictionary.rs +++ b/src/dictionary.rs @@ -1,11 +1,13 @@ +use fcsd::Set; use std::collections::HashSet; -use fcsd::Set; +use crate::storage::params::ReferenceSystem; use super::utils::hash_to_set; #[derive(Clone)] pub struct Dictionary { + reference_system: ReferenceSystem, subjects: Set, predicates: Set, objects: Set, @@ -14,6 +16,7 @@ pub struct Dictionary { impl Default for Dictionary { fn default() -> Self { Dictionary { + reference_system: ReferenceSystem::SPO, subjects: Set::new(vec!["PlaceHolder"]).unwrap(), predicates: Set::new(vec!["PlaceHolder"]).unwrap(), objects: Set::new(vec!["PlaceHolder"]).unwrap(), @@ -23,11 +26,13 @@ impl Default for Dictionary { impl Dictionary { pub(crate) fn from_vec_str( + reference_system: ReferenceSystem, subjects: &Vec, predicates: &Vec, objects: &Vec, ) -> Self { Dictionary { + reference_system, subjects: Set::new(subjects).unwrap(), predicates: Set::new(predicates).unwrap(), objects: Set::new(objects).unwrap(), @@ -35,11 +40,13 @@ impl Dictionary { } pub(crate) fn from_set_terms( + reference_system: ReferenceSystem, subjects: HashSet, predicates: HashSet, objects: HashSet, ) -> Self { Dictionary { + reference_system, subjects: Set::new(hash_to_set(subjects)).unwrap(), predicates: Set::new(hash_to_set(predicates)).unwrap(), objects: Set::new(hash_to_set(objects)).unwrap(), @@ -50,6 +57,10 @@ impl Dictionary { self.subjects.len() } + pub fn predicates_size(&self) -> usize { + self.predicates.len() + } + pub fn objects_size(&self) -> usize { self.objects.len() } @@ -66,9 +77,18 @@ impl Dictionary { self.objects.to_owned() } + pub fn get_reference_system(&self) -> ReferenceSystem { + self.reference_system.to_owned() + } + pub fn get_subject_idx(&self, subject: &str) -> Option { let mut locator = self.subjects.locator(); - locator.run(subject) + match self.reference_system { + ReferenceSystem::PSO | ReferenceSystem::OSP => { + locator.run(subject).map(|value| value + 1) + } + _ => locator.run(subject), + } } pub fn get_subject_idx_unchecked(&self, subject: &str) -> usize { @@ -77,7 +97,12 @@ impl Dictionary { pub fn get_predicate_idx(&self, predicate: &str) -> Option { let mut locator = self.predicates.locator(); - locator.run(predicate).map(|value| value + 1) + match self.reference_system { + ReferenceSystem::SPO | ReferenceSystem::OPS => { + locator.run(predicate).map(|value| value + 1) + } + _ => locator.run(predicate), + } } pub fn get_predicate_idx_unchecked(&self, predicate: &str) -> usize { @@ -86,7 +111,12 @@ impl Dictionary { pub fn get_object_idx(&self, object: &str) -> Option { let mut locator = self.objects.locator(); - locator.run(object) + match self.reference_system { + ReferenceSystem::SOP | ReferenceSystem::POS => { + locator.run(object).map(|value| value + 1) + } + _ => locator.run(object), + } } pub fn get_object_idx_unchecked(&self, object: &str) -> usize { diff --git a/src/engine/array.rs b/src/engine/array.rs index 5ca2f5d..0470603 100644 --- a/src/engine/array.rs +++ b/src/engine/array.rs @@ -1,22 +1,29 @@ -use sprs::{CsMat, TriMat}; +use sprs::TriMat; use crate::storage::ZarrArray; -use super::{EngineResult, EngineStrategy}; +use super::EngineResult; +use super::EngineStrategy; -impl EngineStrategy> for ZarrArray { - fn get_subject(&self, index: usize) -> EngineResult> { +impl EngineStrategy for ZarrArray { + fn get_first_term(&self, index: usize) -> EngineResult { let mut matrix = TriMat::new((self.rows(), self.rows())); matrix.add_triplet(index, index, 1); let matrix = matrix.to_csc(); Ok(&matrix * self) } - fn get_predicate(&self, _value: u8) -> EngineResult> { - unimplemented!() + fn get_second_term(&self, value: usize) -> EngineResult { + let mut matrix = TriMat::new((self.rows(), self.cols())); + self.iter().for_each(|(&e, (row, col))| { + if e == value { + matrix.add_triplet(row, col, value); + } + }); + Ok(matrix.to_csc()) } - fn get_object(&self, index: usize) -> EngineResult> { + fn get_third_term(&self, index: usize) -> EngineResult { let mut matrix = TriMat::new((self.cols(), self.cols())); matrix.add_triplet(index, index, 1); let matrix = matrix.to_csc(); diff --git a/src/engine/chunk.rs b/src/engine/chunk.rs index 1761804..edf515c 100644 --- a/src/engine/chunk.rs +++ b/src/engine/chunk.rs @@ -3,35 +3,46 @@ use zarrs::array_subset::ArraySubset; use zarrs::storage::ReadableStorageTraits; use crate::error::EngineError; -use crate::utils::objects_per_chunk; -use crate::utils::subjects_per_chunk; +use crate::utils::columns_per_shard; +use crate::utils::rows_per_shard; use super::EngineResult; use super::EngineStrategy; -impl EngineStrategy> for Array { - fn get_subject(&self, index: usize) -> EngineResult> { - let index_to_chunk = index as u64 / subjects_per_chunk(self); - let chunk_to_index = index % subjects_per_chunk(self) as usize; - match self - .retrieve_chunk(&[index_to_chunk, 0])? - .chunks(objects_per_chunk(self) as usize) - .nth(chunk_to_index) - { - Some(ans) => Ok(ans.to_owned()), - None => Err(EngineError::Operation), - } +impl EngineStrategy> for Array { + fn get_first_term(&self, index: usize) -> EngineResult> { + let shard_index = index as u64 / rows_per_shard(self); + let shard = self.retrieve_chunk_elements(&[shard_index, 0])?; + let chunk_index = index as u64 % rows_per_shard(self); + let start = (chunk_index * columns_per_shard(self)) as usize; + let end = start + columns_per_shard(self) as usize; + let chunk: &[u32] = &shard[start..end]; + Ok(chunk.to_vec()) } - fn get_predicate(&self, _index: u8) -> EngineResult> { - unimplemented!() + fn get_second_term(&self, index: usize) -> EngineResult> { + let mut ans = Vec::new(); + let number_of_shards = match self.chunk_grid_shape() { + Some(chunk_grid) => chunk_grid[0], + None => return Err(EngineError::Operation), + }; + for i in 0..number_of_shards { + let mut shard = self.retrieve_chunk_elements::(&[i, 0])?; + shard.iter_mut().for_each(|e| { + if *e != index as u32 { + *e = 0 + } + }); + ans.append(&mut shard); + } + Ok(ans) } - fn get_object(&self, index: usize) -> EngineResult> { - let start = vec![0, index as u64]; - let end = vec![self.shape()[0], index as u64]; - let shape = &ArraySubset::new_with_start_end_inc(start, end)?; - let ans = self.retrieve_array_subset_elements(shape)?; - Ok(ans.to_vec()) + fn get_third_term(&self, index: usize) -> EngineResult> { + let objects = self.shape()[0]; + let col = index as u64; + let shape = ArraySubset::new_with_ranges(&[0..objects, col..col + 1]); + let array_subset = self.retrieve_array_subset_elements::(&shape)?; + Ok(array_subset) } } diff --git a/src/engine/mod.rs b/src/engine/mod.rs index f4b2400..a147cec 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -3,10 +3,10 @@ use crate::error::EngineError; pub mod array; pub mod chunk; -pub type EngineResult = Result; +pub(crate) type EngineResult = Result; -pub trait EngineStrategy { - fn get_subject(&self, index: usize) -> EngineResult; - fn get_predicate(&self, index: u8) -> EngineResult; - fn get_object(&self, index: usize) -> EngineResult; +pub(crate) trait EngineStrategy { + fn get_first_term(&self, index: usize) -> EngineResult; + fn get_second_term(&self, index: usize) -> EngineResult; + fn get_third_term(&self, index: usize) -> EngineResult; } diff --git a/src/error.rs b/src/error.rs index 531e8ae..a810e88 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,9 +1,12 @@ use std::convert::Infallible; use thiserror::Error; use zarrs::array::codec::bytes_to_bytes::gzip::GzipCompressionLevelError; +use zarrs::array::codec::CodecError; use zarrs::array::ArrayCreateError; use zarrs::array::ArrayError; +use zarrs::array::NonZeroError; use zarrs::array_subset::IncompatibleDimensionalityError; +use zarrs::array_subset::IncompatibleStartEndIndicesError; use zarrs::group::GroupCreateError; use zarrs::storage::store::FilesystemStoreCreateError; use zarrs::storage::store::HTTPStoreCreateError; @@ -29,10 +32,32 @@ pub enum RemoteHDTError { HTTPCreate(#[from] HTTPStoreCreateError), #[error("The Path already exists, please provide an empty path")] PathExists, + #[error("The Path does not exist, please provide another path")] + PathDoesNotExist, #[error(transparent)] GZipCompression(#[from] GzipCompressionLevelError), #[error("The Graph you are trying to serialize is empty")] EmptyGraph, + #[error(transparent)] + Ops(#[from] OpsError), + #[error("The subjects has not been serialized properly")] + SubjectsNotInJSON, + #[error("The predicates has not been serialized properly")] + PredicatesNotInJSON, + #[error("The objects has not been serialized properly")] + ObjectsNotInJSON, + #[error("The Reference System has not been serialized properly")] + ReferenceSystemNotInJSON, + #[error("Error serializing the triples of the Graph")] + TripleSerialization, + #[error("The provided path is not valid")] + OsPathToString, + #[error("The provided backend is read-only")] + ReadOnlyBackend, + #[error("Error while parsing the RDF graph")] + RdfParse, + #[error(transparent)] + NonZero(#[from] NonZeroError), } #[derive(Error, Debug)] @@ -43,6 +68,10 @@ pub enum EngineError { Array(#[from] ArrayError), #[error("Operation error")] Operation, + #[error(transparent)] + IncompatibleStartEndIndicesError(#[from] IncompatibleStartEndIndicesError), + #[error(transparent)] + Codec(#[from] CodecError), } #[derive(Error, Debug)] @@ -56,3 +85,19 @@ pub enum ParserError { #[error("No format provided")] NoFormatProvided, } + +#[derive(Error, Debug)] +pub enum OpsError { + #[error(transparent)] + Engine(#[from] EngineError), + #[error("The provided subject could not be found")] + SubjectNotFound, + #[error("The provided predicate could not be found")] + PredicateNotFound, + #[error("The provided object could not be found")] + ObjectNotFound, + #[error("The array has not been loaded correctly")] + EmptyArray, + #[error("The sparse array has not been loaded correctly")] + EmptySparseArray, +} diff --git a/src/io/mod.rs b/src/io/mod.rs index db1ad57..7bf6a33 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -6,6 +6,7 @@ use std::io::BufReader; use crate::dictionary::Dictionary; use crate::error::ParserError; +use crate::storage::params::ReferenceSystem; use self::ntriples::NTriples; use self::rdf_xml::RdfXml; @@ -19,7 +20,7 @@ pub type RdfParserResult = Result<(Graph, Dictionary), ParserError>; pub type Graph = Vec>; trait Backend::Error>> { - fn parse(path: &str) -> RdfParserResult { + fn parse(path: &str, reference_system: &ReferenceSystem) -> RdfParserResult { // We create as many HashSets as fields we will be storing; that is, one // for the subjects, another for the predicates, and one for the objects. // The idea is that we will create a Dictionary matching every Term to @@ -41,18 +42,55 @@ trait Backend::Error>> { return Err(ParserError::Dictionary(err)); } - let mut graph = vec![Vec::new(); subjects.len()]; - let dictionary = Dictionary::from_set_terms(subjects, predicates, objects); + let mut graph = vec![ + Vec::new(); + match reference_system { + ReferenceSystem::SPO | ReferenceSystem::SOP => subjects.len(), + ReferenceSystem::PSO | ReferenceSystem::POS => predicates.len(), + ReferenceSystem::OSP | ReferenceSystem::OPS => objects.len(), + } + ]; + let dictionary = + Dictionary::from_set_terms(reference_system.to_owned(), subjects, predicates, objects); if let Err(err) = Self::parser_fn(path, &mut |triple: Triple| { { let sidx = dictionary.get_subject_idx_unchecked(&triple.subject.to_string()); let pidx = dictionary.get_predicate_idx_unchecked(&triple.predicate.to_string()); let oidx = dictionary.get_object_idx_unchecked(&triple.object.to_string()); - graph - .get_mut(sidx) - .unwrap() - .push((pidx as u32, oidx as u32)) + + match reference_system { + ReferenceSystem::SPO => { + if let Some(subject) = graph.get_mut(sidx) { + subject.push((pidx as u32, oidx as u32)) + } + } + ReferenceSystem::SOP => { + if let Some(subject) = graph.get_mut(sidx) { + subject.push((oidx as u32, pidx as u32)) + } + } + ReferenceSystem::PSO => { + if let Some(predicate) = graph.get_mut(pidx) { + predicate.push((sidx as u32, oidx as u32)) + } + } + ReferenceSystem::POS => { + if let Some(predicate) = graph.get_mut(pidx) { + predicate.push((oidx as u32, sidx as u32)) + } + } + ReferenceSystem::OPS => { + if let Some(object) = graph.get_mut(oidx) { + object.push((pidx as u32, sidx as u32)) + } + } + ReferenceSystem::OSP => { + if let Some(object) = graph.get_mut(oidx) { + object.push((sidx as u32, pidx as u32)) + } + } + } }; Ok(()) } as Result<(), E>) @@ -94,11 +132,11 @@ trait Backend::Error>> { pub struct RdfParser; impl RdfParser { - pub fn parse(path: &str) -> RdfParserResult { + pub fn parse(path: &str, reference_system: &ReferenceSystem) -> RdfParserResult { match path.split('.').last() { - Some("nt") => NTriples::parse(path), - Some("ttl") => Turtle::parse(path), - Some("rdf") => RdfXml::parse(path), + Some("nt") => NTriples::parse(path, reference_system), + Some("ttl") => Turtle::parse(path, reference_system), + Some("rdf") => RdfXml::parse(path, reference_system), Some(format) => Err(ParserError::NotSupportedFormat(format.to_string())), None => Err(ParserError::NoFormatProvided), } diff --git a/src/lib.rs b/src/lib.rs index 8ebbc75..1968cf7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ pub mod dictionary; -pub mod engine; +mod engine; pub mod error; mod io; pub mod storage; diff --git a/src/main.rs b/src/main.rs index 3873624..b476eb9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,11 @@ use clap::Parser; -use remote_hdt::storage::{tabular::TabularLayout, ChunkingStrategy, LocalStorage, StorageResult}; +use remote_hdt::storage::layout::tabular::TabularLayout; +use remote_hdt::storage::params::Backend; +use remote_hdt::storage::params::ChunkingStrategy; +use remote_hdt::storage::params::ReferenceSystem; +use remote_hdt::storage::params::Serialization; +use remote_hdt::storage::Storage; +use remote_hdt::storage::StorageResult; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -15,6 +21,11 @@ struct Args { fn main() -> StorageResult<()> { let args: Args = Args::parse(); - LocalStorage::new(TabularLayout).serialize(&args.zarr, &args.rdf, ChunkingStrategy::Chunk)?; + Storage::new(TabularLayout, Serialization::Sparse).serialize( + Backend::FileSystem(&args.zarr), + &args.rdf, + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + )?; Ok(()) } diff --git a/src/storage/layout.rs b/src/storage/layout.rs deleted file mode 100644 index 66a326f..0000000 --- a/src/storage/layout.rs +++ /dev/null @@ -1,86 +0,0 @@ -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; - -use safe_transmute::TriviallyTransmutable; -use zarrs::array::codec::ArrayToBytesCodecTraits; -use zarrs::array::Array; -use zarrs::array::ChunkGrid; -use zarrs::array::DataType; -use zarrs::array::DimensionName; -use zarrs::array::FillValue; -use zarrs::array_subset::ArraySubset; -use zarrs::storage::store::FilesystemStore; - -use crate::dictionary::Dictionary; -use crate::io::Graph; -use crate::utils::objects_per_chunk; -use crate::utils::subjects_per_chunk; -use crate::utils::value_to_term; - -use super::ChunkingStrategy; -use super::StorageResult; -use super::ZarrArray; - -type ArrayToBytesCodec = Box; - -pub trait LayoutOps { - fn retrieve_attributes(&mut self, arr: &Array) -> Dictionary { - // 4. We get the attributes so we can obtain some values that we will need - let attributes = arr.attributes(); - - let subjects = &value_to_term(attributes.get("subjects").unwrap()); - let predicates = &value_to_term(attributes.get("predicates").unwrap()); - let objects = &value_to_term(attributes.get("objects").unwrap()); - - Dictionary::from_vec_str(subjects, predicates, objects) - } - - fn serialize(&mut self, arr: Array, graph: Graph) -> StorageResult<()> { - let objects_size = arr.shape()[1] as usize; - let count = AtomicU64::new(0); - let binding = self.graph_iter(graph); - let iter = binding.chunks_exact(subjects_per_chunk(&arr) as usize); - let remainder = iter.remainder(); - - iter.for_each(|chunk| { - arr.store_chunk_elements( - &[count.load(Ordering::Relaxed), 0], - self.chunk_elements(chunk, objects_size), - ) - .unwrap(); - count.fetch_add(1, Ordering::Relaxed); - }); - - if !remainder.is_empty() { - arr.store_array_subset_elements( - &ArraySubset::new_with_start_shape( - vec![count.load(Ordering::Relaxed) * subjects_per_chunk(&arr), 0], - vec![remainder.len() as u64, objects_per_chunk(&arr)], - ) - .unwrap(), // TODO: remove unwrap - self.chunk_elements(remainder, objects_size), - ) - .unwrap(); - } - - Ok(()) - } - - fn graph_iter(&self, graph: Graph) -> Vec; - fn chunk_elements(&self, chunk: &[C], objects: usize) -> Vec; - fn parse(&mut self, arr: Array, dictionary: &Dictionary) -> StorageResult; - fn sharding_factor(&self, subjects: usize, objects: usize) -> usize; -} - -pub trait Layout: LayoutOps { - fn shape(&self, dictionary: &Dictionary, graph: &Graph) -> Vec; - fn data_type(&self) -> DataType; - fn chunk_shape( - &self, - chunking_strategy: ChunkingStrategy, - dictionary: &Dictionary, - ) -> ChunkGrid; - fn fill_value(&self) -> FillValue; - fn dimension_names(&self) -> Option>; - fn array_to_bytes_codec(&self, dictionary: &Dictionary) -> StorageResult; -} diff --git a/src/storage/layout/matrix.rs b/src/storage/layout/matrix.rs new file mode 100644 index 0000000..f2bc735 --- /dev/null +++ b/src/storage/layout/matrix.rs @@ -0,0 +1,149 @@ +use parking_lot::Mutex; +use sprs::TriMat; +use std::num::NonZeroU64; +use std::sync::atomic::Ordering; +use zarrs::array::codec::array_to_bytes::sharding::ShardingCodecBuilder; +use zarrs::array::codec::ArrayToBytesCodecTraits; +use zarrs::array::codec::GzipCodec; +use zarrs::array::ChunkGrid; +use zarrs::array::DataType; +use zarrs::array::DimensionName; +use zarrs::array::FillValue; + +use super::ChunkingStrategy; +use super::Dimensionality; +use super::ReferenceSystem; +use super::StorageResult; + +use crate::io::Graph; +use crate::storage::layout::LayoutOps; +use crate::storage::AtomicZarrType; +use crate::storage::Layout; + +type Chunk = Vec<(u32, u32)>; + +pub struct MatrixLayout; + +impl Layout for MatrixLayout { + fn shape(&self, dimensionality: &Dimensionality) -> Vec { + vec![ + dimensionality.get_first_term_size(), + dimensionality.get_third_term_size(), + ] + } + + fn data_type(&self) -> DataType { + DataType::UInt32 + } + + fn chunk_shape( + &self, + chunking_strategy: ChunkingStrategy, + dimensionality: &Dimensionality, + ) -> ChunkGrid { + vec![ + chunking_strategy.into(), + NonZeroU64::new(dimensionality.get_third_term_size()).unwrap(), + ] + .into() + } + + fn fill_value(&self) -> FillValue { + FillValue::from(0u32) + } + + fn dimension_names(&self, reference_system: &ReferenceSystem) -> Option> { + match reference_system { + ReferenceSystem::SPO => Some(vec![ + DimensionName::new("Subjects"), + DimensionName::new("Objects"), + ]), + ReferenceSystem::SOP => Some(vec![ + DimensionName::new("Subjects"), + DimensionName::new("Predicates"), + ]), + ReferenceSystem::PSO => Some(vec![ + DimensionName::new("Predicates"), + DimensionName::new("Objects"), + ]), + ReferenceSystem::POS => Some(vec![ + DimensionName::new("Predicates"), + DimensionName::new("Subjects"), + ]), + ReferenceSystem::OPS => Some(vec![ + DimensionName::new("Objects"), + DimensionName::new("Subjects"), + ]), + ReferenceSystem::OSP => Some(vec![ + DimensionName::new("Objects"), + DimensionName::new("Predicates"), + ]), + } + } + + fn array_to_bytes_codec( + &self, + dimensionality: &Dimensionality, + ) -> StorageResult> { + let mut sharding_codec_builder = ShardingCodecBuilder::new( + vec![1, dimensionality.get_third_term_size()] + .as_slice() + .try_into()?, + ); + sharding_codec_builder.bytes_to_bytes_codecs(vec![Box::new(GzipCodec::new(5)?)]); + Ok(Box::new(sharding_codec_builder.build())) + } +} + +impl LayoutOps for MatrixLayout { + fn graph_iter(&self, graph: Graph) -> Vec { + graph + } + + fn store_chunk_elements(&self, chunk: &[Chunk], columns: usize) -> Vec { + // We create a slice that has the size of the chunk filled with 0 values + // having the size of the shard; that is, number of rows, and a given + // number of columns. This value is converted into an AtomicU8 for us to + // be able to share it among threads + let slice: Vec = vec![0u32; chunk.len() * columns] + .iter() + .map(|&n| AtomicZarrType::new(n)) + .collect(); + + for (first_term, triples) in chunk.iter().enumerate() { + triples.iter().for_each(|&(second_term, third_term)| { + let third_term_idx = third_term as usize + first_term * columns; + slice[third_term_idx].store(second_term, Ordering::Relaxed); + }); + } + + slice + .iter() + .map(|elem| elem.load(Ordering::Relaxed)) + .collect::>() + } + + fn retrieve_chunk_elements( + &mut self, + matrix: &Mutex>, + first_term_index: usize, + chunk: &[u32], + ) { + chunk + .iter() + .enumerate() + .for_each(|(third_term_idx, &second_term_idx)| { + if second_term_idx != 0 { + matrix.lock().add_triplet( + first_term_index, + third_term_idx, + second_term_idx as usize, + ); + } + }) + } + + fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize { + dimensionality.first_term_size + } +} diff --git a/src/storage/layout/mod.rs b/src/storage/layout/mod.rs new file mode 100644 index 0000000..bca148f --- /dev/null +++ b/src/storage/layout/mod.rs @@ -0,0 +1,184 @@ +use parking_lot::Mutex; +use sprs::TriMat; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use zarrs::array::codec::ArrayToBytesCodecTraits; +use zarrs::array::Array; +use zarrs::array::ChunkGrid; +use zarrs::array::DataType; +use zarrs::array::DimensionName; +use zarrs::array::FillValue; +use zarrs::array_subset::ArraySubset; +use zarrs::storage::store::FilesystemStore; +use zarrs::storage::ReadableStorageTraits; + +use crate::dictionary::Dictionary; +use crate::error::RemoteHDTError; +use crate::io::Graph; +use crate::utils::columns_per_shard; +use crate::utils::rows_per_shard; +use crate::utils::value_to_term; + +use super::ChunkingStrategy; +use super::Dimensionality; +use super::ReferenceSystem; +use super::StorageResult; +use super::ZarrArray; + +type ArrayToBytesCodec = Box; + +pub mod matrix; +pub mod tabular; + +pub trait LayoutOps { + fn retrieve_attributes( + &mut self, + arr: &Array, + ) -> StorageResult { + // 4. We get the attributes so we can obtain some values that we will need + let attributes = arr.attributes(); + + let subjects = &value_to_term(match attributes.get("subjects") { + Some(subjects) => subjects, + None => return Err(RemoteHDTError::SubjectsNotInJSON), + }); + let predicates = &value_to_term(match attributes.get("predicates") { + Some(predicates) => predicates, + None => return Err(RemoteHDTError::PredicatesNotInJSON), + }); + let objects = &value_to_term(match attributes.get("objects") { + Some(objects) => objects, + None => return Err(RemoteHDTError::ObjectsNotInJSON), + }); + + let reference_system: ReferenceSystem = match attributes.get("reference_system") { + Some(reference_system) => reference_system, + None => return Err(RemoteHDTError::ReferenceSystemNotInJSON), + } + .as_str() + .unwrap() + .into(); + + Ok(Dictionary::from_vec_str( + reference_system, + subjects, + predicates, + objects, + )) + } + + fn serialize(&mut self, arr: &Array, graph: Graph) -> StorageResult<()> { + let columns = arr.shape()[1] as usize; + let count = AtomicU64::new(0); + let binding = self.graph_iter(graph.to_owned()); + let iter = binding.chunks_exact(rows_per_shard(arr) as usize); + let remainder = iter.remainder(); + + for chunk in iter { + let slice = self.store_chunk_elements(chunk, columns); + arr.store_chunk_elements::(&[count.load(Ordering::Relaxed), 0], slice)?; + count.fetch_add(1, Ordering::Relaxed); + } + + if !remainder.is_empty() { + // first we count the number of shards that have been processed, and + // multiply it by the number of chunks in every shard. Hence, we will + // obtain the number of rows that have been processed + let rows_processed = count.load(Ordering::Relaxed) * rows_per_shard(arr); + // then we obtain the size of the last shard that is going to be + // processed; it is equals to the size of the remainder + let last_shard_size = remainder.len() as u64; + // lastly, we store the elements in the provided subset + arr.store_array_subset_elements::( + &ArraySubset::new_with_start_shape( + vec![rows_processed, 0], + vec![last_shard_size, columns_per_shard(arr)], + )?, + self.store_chunk_elements(remainder, columns), + )?; + } + + Ok(()) + } + + fn parse( + &mut self, + arr: &Array, + dimensionality: &Dimensionality, + ) -> StorageResult { + // First, we create the 2D matrix in such a manner that the number of + // rows is the same as the size of the first terms; i.e, in the SPO + // orientation, that will be equals to the number of subjects, while + // the number of columns is equals to the size of the third terms; i.e, + // following the same example as before, it will be equals to the number + // of objects. In our case the dimensionality abstracts the process + // of getting the size of the concrete dimension + let matrix = Mutex::new(TriMat::new(( + dimensionality.first_term_size, // we obtain the size of the first terms + dimensionality.third_term_size, // we obtain the size of the third terms + ))); + + // We compute the number of shards; for us to achieve so, we have to obtain + // first dimension of the chunk grid + let number_of_shards = match arr.chunk_grid_shape() { + Some(chunk_grid) => chunk_grid[0], + + None => 0, + }; + + let number_of_columns = arr.shape()[1] as usize; + + // For each chunk in the Zarr array we retrieve it and parse it into a + // matrix, inserting the triplet in its corresponding position. The idea + // of parsing the array chunk-by-chunk allows us to keep the RAM usage + // low, as instead of parsing the whole array, we process smaller pieces + // of it. Once we have all the pieces processed, we will have parsed the + // whole array + for shard in 0..number_of_shards { + arr.retrieve_chunk_elements::(&[shard, 0])? + // We divide each shard by the number of columns, as a shard is + // composed of chunks having the size of [1, number of cols] + .chunks(number_of_columns) + .enumerate() + .for_each(|(first_term_idx, chunk)| { + self.retrieve_chunk_elements( + &matrix, + first_term_idx + (shard * rows_per_shard(arr)) as usize, + chunk, + ); + }) + } + + // We use a CSC Matrix because typically, RDF knowledge graphs tend to + // have more rows than columns; as such, CSC matrices are optimized + // for that precise scenario + let x = matrix.lock(); + Ok(x.to_csc()) + } + + fn graph_iter(&self, graph: Graph) -> Vec; + fn store_chunk_elements(&self, chunk: &[C], columns: usize) -> Vec; + fn retrieve_chunk_elements( + &mut self, + matrix: &Mutex>, + first_term_idx: usize, + chunk: &[u32], + ); + fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize; +} + +pub trait Layout: LayoutOps { + fn shape(&self, dimensionality: &Dimensionality) -> Vec; + fn data_type(&self) -> DataType; + fn chunk_shape( + &self, + chunking_strategy: ChunkingStrategy, + dimensionality: &Dimensionality, + ) -> ChunkGrid; + fn fill_value(&self) -> FillValue; + fn dimension_names(&self, reference_system: &ReferenceSystem) -> Option>; + fn array_to_bytes_codec( + &self, + dimensionality: &Dimensionality, + ) -> StorageResult; +} diff --git a/src/storage/layout/tabular.rs b/src/storage/layout/tabular.rs new file mode 100644 index 0000000..be6c520 --- /dev/null +++ b/src/storage/layout/tabular.rs @@ -0,0 +1,98 @@ +use std::num::NonZeroU64; + +use parking_lot::Mutex; +use sprs::TriMat; +use zarrs::array::codec::array_to_bytes::sharding::ShardingCodecBuilder; +use zarrs::array::codec::ArrayToBytesCodecTraits; +use zarrs::array::codec::GzipCodec; +use zarrs::array::ChunkGrid; +use zarrs::array::DataType; +use zarrs::array::DimensionName; +use zarrs::array::FillValue; + +use super::ChunkingStrategy; +use super::Dimensionality; +use super::ReferenceSystem; +use super::StorageResult; + +use crate::io::Graph; +use crate::storage::layout::LayoutOps; +use crate::storage::Layout; + +type Chunk = (u32, u32, u32); + +pub struct TabularLayout; + +impl Layout for TabularLayout { + fn shape(&self, dimensionality: &Dimensionality) -> Vec { + vec![dimensionality.get_graph_size(), 3] + } + + fn data_type(&self) -> DataType { + DataType::UInt32 + } + + fn chunk_shape(&self, chunking_strategy: ChunkingStrategy, _: &Dimensionality) -> ChunkGrid { + vec![chunking_strategy.into(), NonZeroU64::new(3).unwrap()].into() // TODO: make this a constant value + } + + fn fill_value(&self) -> FillValue { + FillValue::from(0u32) + } + + fn dimension_names(&self, _: &ReferenceSystem) -> Option> { + Some(vec![ + DimensionName::new("Triples"), + DimensionName::new("Fields"), + ]) + } + + fn array_to_bytes_codec( + &self, + _: &Dimensionality, + ) -> StorageResult> { + let mut sharding_codec_builder = ShardingCodecBuilder::new(vec![1, 3].try_into()?); + sharding_codec_builder.bytes_to_bytes_codecs(vec![Box::new(GzipCodec::new(5)?)]); + Ok(Box::new(sharding_codec_builder.build())) + } +} + +impl LayoutOps for TabularLayout { + fn graph_iter(&self, graph: Graph) -> Vec { + graph + .iter() + .enumerate() + .flat_map(|(first_term, triples)| { + triples + .iter() + .map(|&(second_term, third_term)| (first_term as u32, second_term, third_term)) + .collect::>() + }) + .collect::>() + } + + fn store_chunk_elements(&self, chunk: &[Chunk], _: usize) -> Vec { + let mut ans = Vec::new(); + for &(first_term, second_term, third_term) in chunk { + ans.push(first_term); + ans.push(second_term); + ans.push(third_term); + } + ans + } + + fn retrieve_chunk_elements( + &mut self, + matrix: &Mutex>, + _first_term_index: usize, // TODO: will first_term_index instead of chunk[0] do the trick? + chunk: &[u32], + ) { + matrix + .lock() + .add_triplet(chunk[0] as usize, chunk[2] as usize, chunk[1] as usize); + } + + fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize { + dimensionality.first_term_size * dimensionality.third_term_size + } +} diff --git a/src/storage/matrix.rs b/src/storage/matrix.rs deleted file mode 100644 index 4d4cffa..0000000 --- a/src/storage/matrix.rs +++ /dev/null @@ -1,139 +0,0 @@ -use sprs::TriMat; -use std::sync::atomic::AtomicU8; -use std::sync::atomic::Ordering; -use std::sync::Mutex; -use zarrs::array::codec::array_to_bytes::sharding::ShardingCodecBuilder; -use zarrs::array::codec::ArrayToBytesCodecTraits; -use zarrs::array::codec::GzipCodec; -use zarrs::array::Array; -use zarrs::array::ChunkGrid; -use zarrs::array::DataType; -use zarrs::array::DimensionName; -use zarrs::array::FillValue; -use zarrs::storage::ReadableStorageTraits; - -use super::layout::Layout; -use super::layout::LayoutOps; -use super::ChunkingStrategy; -use super::StorageResult; -use super::ZarrArray; - -use crate::dictionary::Dictionary; -use crate::io::Graph; -use crate::utils::subjects_per_chunk; - -type ZarrType = u8; -type Chunk = Vec<(u32, u32)>; - -pub struct MatrixLayout; - -impl Layout for MatrixLayout -where - R: ReadableStorageTraits + Sized, -{ - fn shape(&self, dictionary: &Dictionary, _graph: &Graph) -> Vec { - vec![ - dictionary.subjects_size() as u64, - dictionary.objects_size() as u64, - ] - } - - fn data_type(&self) -> DataType { - DataType::UInt8 - } - - fn chunk_shape( - &self, - chunking_strategy: ChunkingStrategy, - dictionary: &Dictionary, - ) -> ChunkGrid { - vec![chunking_strategy.into(), dictionary.objects_size() as u64].into() - } - - fn fill_value(&self) -> FillValue { - FillValue::from(0u8) - } - - fn dimension_names(&self) -> Option> { - Some(vec![ - DimensionName::new("Subjects"), - DimensionName::new("Objects"), - ]) - } - - fn array_to_bytes_codec( - &self, - dictionary: &Dictionary, - ) -> StorageResult> { - let mut sharding_codec_builder = - ShardingCodecBuilder::new(vec![1, dictionary.objects_size() as u64]); - sharding_codec_builder.bytes_to_bytes_codecs(vec![Box::new(GzipCodec::new(5)?)]); - Ok(Box::new(sharding_codec_builder.build())) - } -} - -impl LayoutOps for MatrixLayout -where - R: ReadableStorageTraits + Sized, -{ - fn graph_iter(&self, graph: Graph) -> Vec { - graph - } - - fn chunk_elements(&self, chunk: &[Chunk], objects: usize) -> Vec { - let slice: Vec = vec![0u8; chunk.len() * objects] - .iter() - .map(|&n| AtomicU8::new(n)) - .collect(); - - for (i, triples) in chunk.iter().enumerate() { - triples.iter().for_each(|&(predicate, object)| { - let object_idx = object as usize + i * objects; - slice[object_idx].store(predicate as ZarrType, Ordering::Relaxed); - }); - } - - slice - .iter() - .map(|elem| elem.load(Ordering::Relaxed)) - .collect::>() - } - - fn parse(&mut self, arr: Array, dictionary: &Dictionary) -> StorageResult { - let matrix = Mutex::new(TriMat::new(( - dictionary.subjects_size(), - dictionary.objects_size(), - ))); - (0..arr.chunk_grid_shape().unwrap()[0]).for_each(|i| { - // Using this chunking strategy allows us to keep RAM usage low, - // as we load elements by row - arr.retrieve_chunk_elements::(&[i, 0]) - .unwrap() - .chunks(dictionary.objects_size()) - .enumerate() - .for_each(|(subject_idx, chunk)| { - chunk - .iter() - .enumerate() - .for_each(|(object_idx, &predicate_idx)| { - if predicate_idx != 0 { - matrix.lock().unwrap().add_triplet( - subject_idx + (i * subjects_per_chunk(&arr)) as usize, - object_idx, - predicate_idx, - ); - } - }) - }) - }); - - // We use a CSC Matrix because typically, RDF knowledge graphs tend to - // have more rows than columns - let x = matrix.lock().unwrap(); - Ok(x.to_csc()) - } - - fn sharding_factor(&self, subjects: usize, _: usize) -> usize { - subjects - } -} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index d8f6a57..2047967 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,93 +1,94 @@ -use safe_transmute::TriviallyTransmutable; use serde_json::Map; use sprs::CsMat; use std::path::PathBuf; use std::str::FromStr; +use std::sync::atomic::AtomicU32; use std::sync::Arc; use zarrs::array::Array; use zarrs::array::ArrayBuilder; +use zarrs::array_subset::ArraySubset; use zarrs::group::GroupBuilder; use zarrs::storage::store::FilesystemStore; use zarrs::storage::store::HTTPStore; +use zarrs::storage::ReadableStorageTraits; use crate::dictionary::Dictionary; use crate::error::RemoteHDTError; +use crate::io::Graph; use crate::io::RdfParser; use crate::utils::rdf_to_value; use self::layout::Layout; - -mod layout; -pub mod matrix; -pub mod tabular; - -pub type ZarrArray = CsMat; +use self::params::Backend; +use self::params::ChunkingStrategy; +use self::params::Dimensionality; +use self::params::ReferenceSystem; +use self::params::Serialization; + +pub mod layout; +pub mod ops; +pub mod params; + +pub type ZarrArray = CsMat; +type AtomicZarrType = AtomicU32; pub type StorageResult = Result; -pub type LocalStorage = Storage; -pub type HTTPStorage = Storage; - -const ARRAY_NAME: &str = "/group/RemoteHDT"; - -pub enum ChunkingStrategy { - Chunk, - Sharding(u64), - Best, -} - -pub enum ThreadingStrategy { - Single, - Multi, -} -impl From for u64 { - fn from(value: ChunkingStrategy) -> Self { - match value { - ChunkingStrategy::Chunk => 1, - ChunkingStrategy::Sharding(size) => size, - ChunkingStrategy::Best => 16, // TODO: set to the number of threads - } - } -} +const ARRAY_NAME: &str = "/group/RemoteHDT"; // TODO: parameterize this -pub struct Storage { +pub struct Storage { dictionary: Dictionary, - layout: Box>, + dimensionality: Dimensionality, + layout: Box>, + serialization: Serialization, + reference_system: ReferenceSystem, + array: Option>, + sparse_array: Option, } -impl Storage { - pub fn new(layout: impl Layout + 'static) -> Self { +impl Storage { + pub fn new(layout: impl Layout + 'static, serialization: Serialization) -> Self { Storage { dictionary: Default::default(), + dimensionality: Default::default(), layout: Box::new(layout), + serialization, + reference_system: ReferenceSystem::SPO, + array: None, + sparse_array: None, } } pub fn get_dictionary(&self) -> Dictionary { self.dictionary.to_owned() } -} -impl LocalStorage { + pub fn get_sparse_array(&self) -> Option { + self.sparse_array.to_owned() + } + /// # Errors /// Returns [`PathExistsError`] if the provided path already exists; that is, /// the user is trying to store the RDF dataset in an occupied storage. This /// is due to the fact that the user may incur in an undefined state. pub fn serialize<'a>( &mut self, - zarr_path: &'a str, + store: Backend<'a>, rdf_path: &'a str, chunking_strategy: ChunkingStrategy, - // threading_strategy: ThreadingStrategy, - ) -> StorageResult<&Self> { - // 1. The first thing that should be done is to check whether the path - // in which we are trying to store the dump already exists or not. If it - // does, we should stop the execution, preventing the user from losing - // data. Otherwise we can resume it and begin the actual proccess... - let path = PathBuf::from_str(zarr_path)?; - if path.exists() { - // the actual check occurs here !!! - return Err(RemoteHDTError::PathExists); - } + reference_system: ReferenceSystem, + // threading_strategy: ThreadingStrategy, TODO: implement this + ) -> StorageResult<&mut Self> { + let path = match store { + Backend::FileSystem(path) => { + let path = PathBuf::from_str(path)?; + + match path.exists() { + true => return Err(RemoteHDTError::PathExists), + false => path, + } + } + Backend::HTTP(_) => return Err(RemoteHDTError::ReadOnlyBackend), + }; // 2. We can create the FileSystemStore appropiately let store = Arc::new(FilesystemStore::new(path)?); @@ -96,18 +97,19 @@ impl LocalStorage { let group = GroupBuilder::new().build(store.clone(), "/group")?; group.store_metadata()?; - // rayon::ThreadPoolBuilder::new() + // TODO: rayon::ThreadPoolBuilder::new() // .num_threads(1) // .build_global() // .unwrap(); // 3. Import the RDF dump using `rdf-rs` - let graph = match RdfParser::parse(rdf_path) { + let graph = match RdfParser::parse(rdf_path, &reference_system) { Ok((graph, dictionary)) => { self.dictionary = dictionary; + self.dimensionality = Dimensionality::new(&self.dictionary, &graph); graph } - Err(_) => todo!(), + Err(_) => return Err(RemoteHDTError::RdfParse), }; // 4. Build the structure of the Array; as such, several parameters of it are @@ -117,54 +119,63 @@ impl LocalStorage { let predicates = self.dictionary.predicates(); let objects = self.dictionary.objects(); let arr = ArrayBuilder::new( - self.layout.shape(&self.dictionary, &graph), + self.layout.shape(&self.dimensionality), self.layout.data_type(), - self.layout.chunk_shape(chunking_strategy, &self.dictionary), + self.layout + .chunk_shape(chunking_strategy, &self.dimensionality), self.layout.fill_value(), ) - .dimension_names(self.layout.dimension_names()) - .array_to_bytes_codec(self.layout.array_to_bytes_codec(&self.dictionary)?) + .dimension_names(self.layout.dimension_names(&reference_system)) + .array_to_bytes_codec(self.layout.array_to_bytes_codec(&self.dimensionality)?) .attributes({ let mut attributes = Map::new(); attributes.insert("subjects".into(), rdf_to_value(subjects)); attributes.insert("predicates".into(), rdf_to_value(predicates)); attributes.insert("objects".into(), rdf_to_value(objects)); + attributes.insert("reference_system".into(), reference_system.as_ref().into()); attributes - }) // TODO: one attribute should be the Layout - .build(store, ARRAY_NAME)?; + }) + .build(store.clone(), ARRAY_NAME)?; arr.store_metadata()?; + self.layout.serialize(&arr, graph)?; - self.layout.serialize(arr, graph)?; + let shape = ArraySubset::new_with_ranges(&[0..10, 1..2]); + arr.retrieve_array_subset_elements::(&shape).unwrap(); Ok(self) } - pub fn load(&mut self, zarr_path: &str) -> StorageResult> { - let store = Arc::new(FilesystemStore::new(zarr_path)?); - let arr = Array::new(store, ARRAY_NAME)?; - self.dictionary = self.layout.retrieve_attributes(&arr); - Ok(arr) - } - - // TODO: improve this naming convention - pub fn load_sparse(&mut self, zarr_path: &str) -> StorageResult { - let arr = self.load(zarr_path)?; - self.layout.parse(arr, &self.dictionary) - } -} + pub fn load( + &mut self, + store: Backend<'_>, + // threading_strategy: ThreadingStrategy, TODO: implement this + ) -> StorageResult<&mut Self> { + let store: Arc = match store { + Backend::FileSystem(path) => { + let path = PathBuf::from_str(path)?; + + match path.exists() { + false => return Err(RemoteHDTError::PathDoesNotExist), + true => Arc::new(FilesystemStore::new(path)?), + } + } + Backend::HTTP(url) => Arc::new(HTTPStore::new(url)?), + }; -impl HTTPStorage { - pub fn connect(&mut self, url: &str) -> StorageResult> { - let store = Arc::new(HTTPStore::new(url)?); let arr = Array::new(store, ARRAY_NAME)?; - self.dictionary = self.layout.retrieve_attributes(&arr); - Ok(arr) - } + let dictionary = self.layout.retrieve_attributes(&arr)?; + self.dictionary = dictionary; + self.reference_system = self.dictionary.get_reference_system(); + self.dimensionality = Dimensionality::new(&self.dictionary, &Graph::default()); + + match self.serialization { + Serialization::Zarr => self.array = Some(arr), + Serialization::Sparse => { + self.sparse_array = Some(self.layout.parse(&arr, &self.dimensionality)?) + } + } - // TODO: improve this naming convention - pub fn connect_sparse(&mut self, url: &str) -> StorageResult { - let arr = self.connect(url)?; - self.layout.parse(arr, &self.dictionary) + Ok(self) } } diff --git a/src/storage/ops.rs b/src/storage/ops.rs new file mode 100644 index 0000000..afb7de2 --- /dev/null +++ b/src/storage/ops.rs @@ -0,0 +1,108 @@ +use crate::engine::EngineStrategy; +use crate::error::OpsError; + +use super::params::ReferenceSystem; +use super::params::Serialization; +use super::Storage; +use super::ZarrArray; + +pub type OpsResult = Result; + +pub enum OpsFormat { + SparseArray(ZarrArray), + Zarr(Vec), +} + +pub trait Ops { + fn get_subject(&self, subject: &str) -> OpsResult; + fn get_predicate(&self, predicate: &str) -> OpsResult; + fn get_object(&self, object: &str) -> OpsResult; +} + +impl Ops for Storage { + fn get_subject(&self, subject: &str) -> OpsResult { + let index = match self.dictionary.get_subject_idx(subject) { + Some(index) => index, + None => return Err(OpsError::SubjectNotFound), + }; + + let ans = match self.serialization { + Serialization::Zarr => match &self.array { + Some(array) => OpsFormat::Zarr(match self.reference_system { + ReferenceSystem::SPO | ReferenceSystem::SOP => array.get_first_term(index)?, + ReferenceSystem::PSO | ReferenceSystem::OSP => array.get_second_term(index)?, + ReferenceSystem::POS | ReferenceSystem::OPS => array.get_third_term(index)?, + }), + None => return Err(OpsError::EmptyArray), + }, + Serialization::Sparse => match &self.sparse_array { + Some(array) => OpsFormat::SparseArray(match self.reference_system { + ReferenceSystem::SPO | ReferenceSystem::SOP => array.get_first_term(index)?, + ReferenceSystem::PSO | ReferenceSystem::OSP => array.get_second_term(index)?, + ReferenceSystem::POS | ReferenceSystem::OPS => { + array.get_third_term(index).unwrap() + } + }), + None => return Err(OpsError::EmptySparseArray), + }, + }; + + Ok(ans) + } + + fn get_predicate(&self, predicate: &str) -> OpsResult { + let index = match self.dictionary.get_predicate_idx(predicate) { + Some(index) => index, + None => return Err(OpsError::PredicateNotFound), + }; + + let ans = match self.serialization { + Serialization::Zarr => match &self.array { + Some(array) => OpsFormat::Zarr(match self.reference_system { + ReferenceSystem::PSO | ReferenceSystem::POS => array.get_first_term(index)?, + ReferenceSystem::SPO | ReferenceSystem::OPS => array.get_second_term(index)?, + ReferenceSystem::SOP | ReferenceSystem::OSP => array.get_third_term(index)?, + }), + None => return Err(OpsError::EmptyArray), + }, + Serialization::Sparse => match &self.sparse_array { + Some(array) => OpsFormat::SparseArray(match self.reference_system { + ReferenceSystem::PSO | ReferenceSystem::POS => array.get_first_term(index)?, + ReferenceSystem::SPO | ReferenceSystem::OPS => array.get_second_term(index)?, + ReferenceSystem::SOP | ReferenceSystem::OSP => array.get_third_term(index)?, + }), + None => return Err(OpsError::EmptySparseArray), + }, + }; + + Ok(ans) + } + + fn get_object(&self, object: &str) -> OpsResult { + let index = match self.dictionary.get_object_idx(object) { + Some(index) => index, + None => return Err(OpsError::ObjectNotFound), + }; + + let ans = match self.serialization { + Serialization::Zarr => match &self.array { + Some(array) => OpsFormat::Zarr(match self.reference_system { + ReferenceSystem::OPS | ReferenceSystem::OSP => array.get_first_term(index)?, + ReferenceSystem::SOP | ReferenceSystem::POS => array.get_second_term(index)?, + ReferenceSystem::SPO | ReferenceSystem::PSO => array.get_third_term(index)?, + }), + None => return Err(OpsError::EmptyArray), + }, + Serialization::Sparse => match &self.sparse_array { + Some(array) => OpsFormat::SparseArray(match self.reference_system { + ReferenceSystem::OPS | ReferenceSystem::OSP => array.get_first_term(index)?, + ReferenceSystem::SOP | ReferenceSystem::POS => array.get_second_term(index)?, + ReferenceSystem::SPO | ReferenceSystem::PSO => array.get_third_term(index)?, + }), + None => return Err(OpsError::EmptySparseArray), + }, + }; + + Ok(ans) + } +} diff --git a/src/storage/params.rs b/src/storage/params.rs new file mode 100644 index 0000000..a21559b --- /dev/null +++ b/src/storage/params.rs @@ -0,0 +1,122 @@ +use std::num::NonZeroU64; + +use crate::dictionary::Dictionary; +use crate::io::Graph; + +pub enum Backend<'a> { + FileSystem(&'a str), + HTTP(&'a str), +} + +pub enum Serialization { + Zarr, + Sparse, +} + +pub enum ChunkingStrategy { + Chunk, + Sharding(u64), + Best, +} + +pub enum ThreadingStrategy { + Single, + Multi, +} + +#[derive(Clone)] +pub enum ReferenceSystem { + SPO, + SOP, + PSO, + POS, + OSP, + OPS, +} + +#[derive(Default)] +pub struct Dimensionality { + graph_size: Option, + pub(crate) first_term_size: usize, + _second_term_size: usize, + pub(crate) third_term_size: usize, +} + +impl From for NonZeroU64 { + fn from(value: ChunkingStrategy) -> Self { + match value { + ChunkingStrategy::Chunk => NonZeroU64::new(1).unwrap(), + ChunkingStrategy::Sharding(size) => NonZeroU64::new(size).unwrap(), + ChunkingStrategy::Best => NonZeroU64::new(16).unwrap(), // TODO: set to the number of threads + } + } +} + +impl AsRef for ReferenceSystem { + fn as_ref(&self) -> &str { + match self { + ReferenceSystem::SPO => "spo", + ReferenceSystem::SOP => "sop", + ReferenceSystem::PSO => "pso", + ReferenceSystem::POS => "pos", + ReferenceSystem::OSP => "osp", + ReferenceSystem::OPS => "ops", + } + } +} + +impl From<&str> for ReferenceSystem { + fn from(value: &str) -> Self { + match value { + "spo" => ReferenceSystem::SPO, + "sop" => ReferenceSystem::SOP, + "pso" => ReferenceSystem::PSO, + "pos" => ReferenceSystem::POS, + "osp" => ReferenceSystem::OSP, + "ops" => ReferenceSystem::OPS, + _ => ReferenceSystem::SPO, + } + } +} + +impl Dimensionality { + pub(crate) fn new(dictionary: &Dictionary, graph: &Graph) -> Self { + Dimensionality { + graph_size: graph + .iter() + .map(|triples| triples.len()) + .reduce(|acc, a| acc + a), + first_term_size: match dictionary.get_reference_system() { + ReferenceSystem::SPO | ReferenceSystem::SOP => dictionary.subjects_size(), + ReferenceSystem::POS | ReferenceSystem::PSO => dictionary.predicates_size(), + ReferenceSystem::OPS | ReferenceSystem::OSP => dictionary.objects_size(), + }, + _second_term_size: match dictionary.get_reference_system() { + ReferenceSystem::PSO | ReferenceSystem::OSP => dictionary.subjects_size(), + ReferenceSystem::SPO | ReferenceSystem::OPS => dictionary.predicates_size(), + ReferenceSystem::SOP | ReferenceSystem::POS => dictionary.objects_size(), + }, + third_term_size: match dictionary.get_reference_system() { + ReferenceSystem::POS | ReferenceSystem::OPS => dictionary.subjects_size(), + ReferenceSystem::SOP | ReferenceSystem::OSP => dictionary.predicates_size(), + ReferenceSystem::SPO | ReferenceSystem::PSO => dictionary.objects_size(), + }, + } + } + + pub(crate) fn get_graph_size(&self) -> u64 { + self.graph_size.unwrap() as u64 + } + + pub(crate) fn get_first_term_size(&self) -> u64 { + self.first_term_size as u64 + } + + // pub(crate) fn get_second_term_size(&self) -> u64 { + // self._second_term_size as u64 + // } + + pub(crate) fn get_third_term_size(&self) -> u64 { + self.third_term_size as u64 + } +} diff --git a/src/storage/tabular.rs b/src/storage/tabular.rs deleted file mode 100644 index 8cca3f1..0000000 --- a/src/storage/tabular.rs +++ /dev/null @@ -1,130 +0,0 @@ -use sprs::TriMat; -use std::sync::Mutex; -use zarrs::array::codec::array_to_bytes::sharding::ShardingCodecBuilder; -use zarrs::array::codec::ArrayToBytesCodecTraits; -use zarrs::array::codec::GzipCodec; -use zarrs::array::Array; -use zarrs::array::ChunkGrid; -use zarrs::array::DataType; -use zarrs::array::DimensionName; -use zarrs::array::FillValue; -use zarrs::storage::ReadableStorageTraits; - -use crate::dictionary::Dictionary; -use crate::io::Graph; - -use super::layout::Layout; -use super::layout::LayoutOps; -use super::ChunkingStrategy; -use super::StorageResult; -use super::ZarrArray; - -type ZarrType = u64; -type Chunk = (u32, u32, u32); - -pub struct TabularLayout; - -impl Layout for TabularLayout -where - R: ReadableStorageTraits + Sized, -{ - fn shape(&self, _dictionary: &Dictionary, graph: &Graph) -> Vec { - vec![ - graph - .iter() - .map(|triples| triples.len() as u64) - .reduce(|acc, a| acc + a) - .unwrap(), - 3, - ] - } - - fn data_type(&self) -> DataType { - DataType::UInt64 - } - - fn chunk_shape( - &self, - chunking_strategy: ChunkingStrategy, - _dictionary: &Dictionary, - ) -> ChunkGrid { - vec![chunking_strategy.into(), 3].into() // TODO: make this a constant value - } - - fn fill_value(&self) -> FillValue { - FillValue::from(0u64) - } - - fn dimension_names(&self) -> Option> { - Some(vec![ - DimensionName::new("Triples"), - DimensionName::new("Fields"), - ]) - } - - fn array_to_bytes_codec( - &self, - _dictionary: &Dictionary, - ) -> StorageResult> { - let mut sharding_codec_builder = ShardingCodecBuilder::new(vec![1, 3]); - sharding_codec_builder.bytes_to_bytes_codecs(vec![Box::new(GzipCodec::new(5)?)]); - Ok(Box::new(sharding_codec_builder.build())) - } -} - -impl LayoutOps for TabularLayout -where - R: ReadableStorageTraits + Sized, -{ - fn graph_iter(&self, graph: Graph) -> Vec { - graph - .iter() - .enumerate() - .flat_map(|(subject, triples)| { - triples - .iter() - .map(|&(predicate, object)| (subject as u32, predicate, object)) - .collect::>() - }) - .collect::>() - } - - fn chunk_elements(&self, chunk: &[Chunk], _: usize) -> Vec { - let mut ans = Vec::new(); - for &(subject, predicate, object) in chunk { - ans.push(subject as ZarrType); - ans.push(predicate as ZarrType); - ans.push(object as ZarrType); - } - ans - } - - fn parse(&mut self, arr: Array, dictionary: &Dictionary) -> StorageResult { - let matrix = Mutex::new(TriMat::new(( - dictionary.subjects_size(), - dictionary.objects_size(), - ))); - (0..arr.chunk_grid_shape().unwrap()[0] as usize).for_each(|i| { - // Using this chunking strategy allows us to keep RAM usage low, - // as we load elements by row - arr.retrieve_chunk_elements::(&[i as ZarrType, 0]) - .unwrap() - .chunks(3) - .for_each(|triple| { - matrix - .lock() - .unwrap() - .add_triplet(triple[0], triple[2], triple[1] as u8); - }) - }); - - // We use a CSC Matrix because typically, RDF knowledge graphs tend to - // have more rows than columns - let x = matrix.lock().unwrap(); - Ok(x.to_csc()) - } - - fn sharding_factor(&self, subjects: usize, objects: usize) -> usize { - subjects * objects - } -} diff --git a/src/utils.rs b/src/utils.rs index 631ec71..1c498a1 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,7 +1,6 @@ -use std::collections::HashSet; - use fcsd::Set; use serde_json::Value; +use std::collections::HashSet; use zarrs::array::Array; pub fn rdf_to_value(terms: Set) -> Value { @@ -32,20 +31,20 @@ pub fn hash_to_set(terms: HashSet) -> Vec { vec } -pub fn subjects_per_chunk(arr: &Array) -> u64 { +pub fn rows_per_shard(arr: &Array) -> u64 { match arr.chunk_grid().chunk_shape(&[0, 0], arr.shape()) { Ok(shape) => match shape { - Some(chunk_shape) => chunk_shape[0], + Some(chunk_shape) => chunk_shape[0].into(), None => todo!(), }, Err(_) => todo!(), } } -pub fn objects_per_chunk(arr: &Array) -> u64 { +pub fn columns_per_shard(arr: &Array) -> u64 { match arr.chunk_grid().chunk_shape(&[0, 0], arr.shape()) { Ok(shape) => match shape { - Some(chunk_shape) => chunk_shape[1], + Some(chunk_shape) => chunk_shape[1].into(), None => todo!(), }, Err(_) => todo!(), diff --git a/test.sh b/test.sh new file mode 100755 index 0000000..515468a --- /dev/null +++ b/test.sh @@ -0,0 +1,3 @@ +rm -r tests/out/* +cargo test +rm -r tests/out/* \ No newline at end of file diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 3f08f76..e97dae3 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,31 +1,40 @@ #![allow(dead_code)] -use safe_transmute::TriviallyTransmutable; -use sprs::{CsMat, TriMat}; +use remote_hdt::dictionary::Dictionary; +use remote_hdt::storage::params::Backend; +use remote_hdt::storage::params::ChunkingStrategy; +use remote_hdt::storage::params::ReferenceSystem; +use remote_hdt::storage::Storage; +use sprs::CsMat; +use sprs::TriMat; use std::fs::File; -use zarrs::storage::store::FilesystemStore; - -use remote_hdt::{ - dictionary::Dictionary, - storage::{ChunkingStrategy, Storage}, -}; pub const TABULAR_ZARR: &str = "tests/out/tabular.zarr"; pub const MATRIX_ZARR: &str = "tests/out/matrix.zarr"; pub const SHARDING_ZARR: &str = "tests/out/sharding.zarr"; pub const LARGER_ZARR: &str = "tests/out/larger.zarr"; +pub const PSO_ZARR: &str = "tests/out/pso.zarr"; +pub const OPS_ZARR: &str = "tests/out/ops.zarr"; +pub const TABULAR_PSO_ZARR: &str = "tests/out/tabular_pso.zarr"; +pub const TABULAR_OPS_ZARR: &str = "tests/out/tabular_ops.zarr"; -pub fn setup( +pub fn setup( path: &str, - storage: &mut Storage, + storage: &mut Storage, chunking_strategy: ChunkingStrategy, + reference_system: ReferenceSystem, ) { if File::open(path).is_err() { storage - .serialize(path, "resources/rdf.nt", chunking_strategy) + .serialize( + Backend::FileSystem(path), + "resources/rdf.nt", + chunking_strategy, + reference_system, + ) .unwrap(); } else { - storage.load(path).unwrap(); + storage.load(Backend::FileSystem(path)).unwrap(); } } @@ -65,8 +74,8 @@ pub enum Predicate { } impl Predicate { - pub fn get_idx(self, dictionary: &Dictionary) -> u8 { - dictionary.get_predicate_idx_unchecked(self.into()) as u8 + pub(crate) fn get_idx(self, dictionary: &Dictionary) -> usize { + dictionary.get_predicate_idx_unchecked(self.into()) } } @@ -98,7 +107,7 @@ pub enum Object { } impl Object { - pub fn get_idx(self, dictionary: &Dictionary) -> usize { + pub(crate) fn get_idx(self, dictionary: &Dictionary) -> usize { dictionary.get_object_idx_unchecked(self.into()) } } @@ -122,7 +131,7 @@ impl From for &str { pub struct Graph; impl Graph { - pub fn new(dictionary: &Dictionary) -> CsMat { + pub fn new(dictionary: &Dictionary) -> CsMat { let mut ans = TriMat::new((4, 9)); ans.add_triplet( @@ -183,3 +192,81 @@ impl Graph { ans.to_csc() } } + +pub fn set_expected_first_term_matrix( + expected: &mut Vec, + subject: Subject, + predicate: Predicate, + object: Object, + dictionary: &Dictionary, + reference_system: ReferenceSystem, +) { + let subject_idx = subject.get_idx(dictionary); + let predicate_idx = predicate.get_idx(dictionary); + let object_idx = object.get_idx(dictionary); + + match reference_system { + ReferenceSystem::SPO => expected[object_idx] = predicate_idx as u32, + ReferenceSystem::SOP => expected[predicate_idx] = object_idx as u32, + ReferenceSystem::PSO => expected[object_idx] = subject_idx as u32, + ReferenceSystem::POS => expected[subject_idx] = object_idx as u32, + ReferenceSystem::OSP => expected[predicate_idx] = subject_idx as u32, + ReferenceSystem::OPS => expected[subject_idx] = predicate_idx as u32, + } +} + +pub fn set_expected_second_term_matrix( + expected: &mut Vec, + subject: Subject, + predicate: Predicate, + object: Object, + dictionary: &Dictionary, + reference_system: ReferenceSystem, +) { + let subject_idx = subject.get_idx(dictionary); + let predicate_idx = predicate.get_idx(dictionary); + let object_idx = object.get_idx(dictionary); + + match reference_system { + ReferenceSystem::SPO => { + expected[subject_idx * dictionary.objects_size() + object_idx] = predicate_idx as u32 + } + ReferenceSystem::SOP => { + expected[subject_idx * dictionary.predicates_size() + predicate_idx] = object_idx as u32 + } + ReferenceSystem::PSO => { + expected[predicate_idx * dictionary.objects_size() + object_idx] = subject_idx as u32 + } + ReferenceSystem::POS => { + expected[predicate_idx * dictionary.subjects_size() + subject_idx] = object_idx as u32 + } + ReferenceSystem::OSP => { + expected[object_idx * dictionary.predicates_size() + predicate_idx] = subject_idx as u32 + } + ReferenceSystem::OPS => { + expected[object_idx * dictionary.subjects_size() + subject_idx] = predicate_idx as u32 + } + } +} + +pub fn set_expected_third_term_matrix( + expected: &mut Vec, + subject: Subject, + predicate: Predicate, + object: Object, + dictionary: &Dictionary, + reference_system: ReferenceSystem, +) { + let subject_idx = subject.get_idx(dictionary); + let predicate_idx = predicate.get_idx(dictionary); + let object_idx = object.get_idx(dictionary); + + match reference_system { + ReferenceSystem::SPO => expected[subject_idx] = predicate_idx as u32, + ReferenceSystem::SOP => expected[subject_idx] = object_idx as u32, + ReferenceSystem::PSO => expected[predicate_idx] = subject_idx as u32, + ReferenceSystem::POS => expected[predicate_idx] = object_idx as u32, + ReferenceSystem::OSP => expected[object_idx] = subject_idx as u32, + ReferenceSystem::OPS => expected[object_idx] = predicate_idx as u32, + } +} diff --git a/tests/get_object_test.rs b/tests/get_object_test.rs index 15bfd4e..984ab83 100644 --- a/tests/get_object_test.rs +++ b/tests/get_object_test.rs @@ -1,55 +1,86 @@ -use remote_hdt::{ - engine::EngineStrategy, - storage::{matrix::MatrixLayout, tabular::TabularLayout, ChunkingStrategy, LocalStorage}, -}; +use common::set_expected_third_term_matrix; +use remote_hdt::storage::layout::matrix::MatrixLayout; +use remote_hdt::storage::layout::tabular::TabularLayout; +use remote_hdt::storage::ops::Ops; +use remote_hdt::storage::ops::OpsFormat; +use remote_hdt::storage::params::Backend; +use remote_hdt::storage::params::ChunkingStrategy; +use remote_hdt::storage::params::ReferenceSystem; +use remote_hdt::storage::params::Serialization; +use remote_hdt::storage::Storage; use sprs::TriMat; +use std::error::Error; + mod common; #[test] -fn get_object_matrix_chunk_test() { - let mut storage = LocalStorage::new(MatrixLayout); - common::setup(common::MATRIX_ZARR, &mut storage, ChunkingStrategy::Chunk); - - let actual = storage - .load(common::MATRIX_ZARR) - .unwrap() - .get_object(common::Object::Alan.get_idx(&storage.get_dictionary())) - .unwrap(); - - assert_eq!(actual, vec![0, 3, 0, 0, 0]) -} +fn get_object_matrix_sharding_test() -> Result<(), Box> { + let mut storage = Storage::new(MatrixLayout, Serialization::Zarr); -#[test] -fn get_object_matrix_sharding_test() { - let mut storage = LocalStorage::new(MatrixLayout); common::setup( common::SHARDING_ZARR, &mut storage, ChunkingStrategy::Sharding(3), + ReferenceSystem::SPO, ); - let actual = storage - .load(common::SHARDING_ZARR) - .unwrap() - .get_object(0) - .unwrap(); + let actual = match storage + .load(Backend::FileSystem(common::SHARDING_ZARR))? + .get_object(common::Object::Date.into())? + { + OpsFormat::Zarr(actual) => actual, + _ => unreachable!(), + }; + + let mut expected = vec![0u32; storage.get_dictionary().subjects_size()]; + set_expected_third_term_matrix( + &mut expected, + common::Subject::Alan, + common::Predicate::DateOfBirth, + common::Object::Date, + &storage.get_dictionary(), + ReferenceSystem::SPO, + ); - assert_eq!(actual, vec![2, 0, 0, 0, 0]) + if actual == expected { + Ok(()) + } else { + Err(String::from("Expected and actual results are not equals").into()) + } } #[test] -fn get_object_tabular_test() { - let mut storage = LocalStorage::new(TabularLayout); - common::setup(common::TABULAR_ZARR, &mut storage, ChunkingStrategy::Chunk); - - let actual = storage - .load_sparse(common::TABULAR_ZARR) - .unwrap() - .get_object(common::Object::Alan.get_idx(&storage.get_dictionary())) - .unwrap(); - - let mut expected = TriMat::new((4, 9)); - expected.add_triplet(1, 3, 3); - let expected = expected.to_csc(); - assert_eq!(actual, expected) +fn get_object_tabular_test() -> Result<(), Box> { + let mut storage = Storage::new(TabularLayout, Serialization::Sparse); + + common::setup( + common::TABULAR_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + ); + + let actual = match storage + .load(Backend::FileSystem(common::TABULAR_ZARR))? + .get_object(common::Object::Alan.into())? + { + OpsFormat::SparseArray(actual) => actual, + _ => unreachable!(), + }; + + let mut expected = TriMat::new(( + storage.get_dictionary().subjects_size(), + storage.get_dictionary().objects_size(), + )); + expected.add_triplet( + common::Subject::Bombe.get_idx(&storage.get_dictionary()), + common::Object::Alan.get_idx(&storage.get_dictionary()), + common::Predicate::Discoverer.get_idx(&storage.get_dictionary()), + ); + + if actual == expected.to_csc() { + Ok(()) + } else { + Err(String::from("Expected and actual results are not equals").into()) + } } diff --git a/tests/get_predicate_test.rs b/tests/get_predicate_test.rs new file mode 100644 index 0000000..be46da1 --- /dev/null +++ b/tests/get_predicate_test.rs @@ -0,0 +1,117 @@ +use common::set_expected_second_term_matrix; +use remote_hdt::storage::layout::matrix::MatrixLayout; +use remote_hdt::storage::layout::tabular::TabularLayout; +use remote_hdt::storage::ops::Ops; +use remote_hdt::storage::ops::OpsFormat; +use remote_hdt::storage::params::Backend; +use remote_hdt::storage::params::ChunkingStrategy; +use remote_hdt::storage::params::ReferenceSystem; +use remote_hdt::storage::params::Serialization; +use remote_hdt::storage::Storage; +use sprs::TriMat; +use std::error::Error; + +mod common; + +#[test] +fn get_predicate_matrix_chunk_test() -> Result<(), Box> { + let mut storage = Storage::new(MatrixLayout, Serialization::Zarr); + + common::setup( + common::MATRIX_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + ); + + let actual = match storage + .load(Backend::FileSystem(common::MATRIX_ZARR))? + .get_predicate(common::Predicate::InstanceOf.into())? + { + OpsFormat::Zarr(actual) => actual, + _ => unreachable!(), + }; + + let mut expected = vec![ + 0u32; + storage.get_dictionary().subjects_size() + * storage.get_dictionary().objects_size() + ]; + set_expected_second_term_matrix( + &mut expected, + common::Subject::Alan, + common::Predicate::InstanceOf, + common::Object::Human, + &storage.get_dictionary(), + ReferenceSystem::SPO, + ); + set_expected_second_term_matrix( + &mut expected, + common::Subject::Wilmslow, + common::Predicate::InstanceOf, + common::Object::Town, + &storage.get_dictionary(), + ReferenceSystem::SPO, + ); + set_expected_second_term_matrix( + &mut expected, + common::Subject::Bombe, + common::Predicate::InstanceOf, + common::Object::Computer, + &storage.get_dictionary(), + ReferenceSystem::SPO, + ); + + if actual == expected { + Ok(()) + } else { + Err(String::from("Expected and actual results are not equals").into()) + } +} + +#[test] +fn get_predicate_tabular_test() -> Result<(), Box> { + let mut storage = Storage::new(TabularLayout, Serialization::Sparse); + + common::setup( + common::TABULAR_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + ); + + let actual = match storage + .load(Backend::FileSystem(common::TABULAR_ZARR))? + .get_predicate(common::Predicate::InstanceOf.into())? + { + OpsFormat::SparseArray(actual) => actual, + _ => unreachable!(), + }; + + let mut expected = TriMat::new(( + storage.get_dictionary().subjects_size(), + storage.get_dictionary().objects_size(), + )); + expected.add_triplet( + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Object::Human.get_idx(&storage.get_dictionary()), + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Subject::Wilmslow.get_idx(&storage.get_dictionary()), + common::Object::Town.get_idx(&storage.get_dictionary()), + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Subject::Bombe.get_idx(&storage.get_dictionary()), + common::Object::Computer.get_idx(&storage.get_dictionary()), + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + ); + let expected = expected.to_csc(); + + if actual == expected { + Ok(()) + } else { + Err(String::from("Expected and actual results are not equals").into()) + } +} diff --git a/tests/get_subject_test.rs b/tests/get_subject_test.rs index 4a8d5fb..fdc8adc 100644 --- a/tests/get_subject_test.rs +++ b/tests/get_subject_test.rs @@ -1,59 +1,168 @@ -use remote_hdt::{ - engine::EngineStrategy, - storage::{matrix::MatrixLayout, tabular::TabularLayout, ChunkingStrategy, LocalStorage}, -}; +use common::set_expected_first_term_matrix; +use remote_hdt::storage::layout::matrix::MatrixLayout; +use remote_hdt::storage::layout::tabular::TabularLayout; +use remote_hdt::storage::ops::Ops; +use remote_hdt::storage::ops::OpsFormat; +use remote_hdt::storage::params::Backend; +use remote_hdt::storage::params::ChunkingStrategy; +use remote_hdt::storage::params::ReferenceSystem; +use remote_hdt::storage::params::Serialization; +use remote_hdt::storage::Storage; use sprs::TriMat; +use std::error::Error; + mod common; #[test] -fn get_subject_matrix_chunk_test() { - let mut storage = LocalStorage::new(MatrixLayout); - common::setup(common::MATRIX_ZARR, &mut storage, ChunkingStrategy::Chunk); +fn get_subject_matrix_chunk_test() -> Result<(), Box> { + let mut storage = Storage::new(MatrixLayout, Serialization::Zarr); - let actual = storage - .load(common::MATRIX_ZARR) - .unwrap() - .get_subject(common::Subject::Alan.get_idx(&storage.get_dictionary())) - .unwrap(); + common::setup( + common::MATRIX_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + ); - assert_eq!(actual, vec![2, 4, 5, 0, 0, 0, 0, 7, 8]) + let actual = match storage + .load(Backend::FileSystem(common::MATRIX_ZARR))? + .get_subject(common::Subject::Alan.into())? + { + OpsFormat::Zarr(actual) => actual, + _ => unreachable!(), + }; + + let mut expected = vec![0u32; storage.get_dictionary().objects_size()]; + set_expected_first_term_matrix( + &mut expected, + common::Subject::Alan, + common::Predicate::InstanceOf, + common::Object::Human, + &storage.get_dictionary(), + ReferenceSystem::SPO, + ); + set_expected_first_term_matrix( + &mut expected, + common::Subject::Alan, + common::Predicate::PlaceOfBirth, + common::Object::Warrington, + &storage.get_dictionary(), + ReferenceSystem::SPO, + ); + set_expected_first_term_matrix( + &mut expected, + common::Subject::Alan, + common::Predicate::PlaceOfDeath, + common::Object::Wilmslow, + &storage.get_dictionary(), + ReferenceSystem::SPO, + ); + set_expected_first_term_matrix( + &mut expected, + common::Subject::Alan, + common::Predicate::DateOfBirth, + common::Object::Date, + &storage.get_dictionary(), + ReferenceSystem::SPO, + ); + set_expected_first_term_matrix( + &mut expected, + common::Subject::Alan, + common::Predicate::Employer, + common::Object::GCHQ, + &storage.get_dictionary(), + ReferenceSystem::SPO, + ); + + if actual == expected { + Ok(()) + } else { + Err(String::from("Expected and actual results are not equals").into()) + } } #[test] -fn get_subject_matrix_sharding_test() { - let mut storage = LocalStorage::new(MatrixLayout); +fn get_subject_matrix_sharding_test() -> Result<(), Box> { + let mut storage = Storage::new(MatrixLayout, Serialization::Zarr); + common::setup( common::SHARDING_ZARR, &mut storage, - ChunkingStrategy::Sharding(3), + ChunkingStrategy::Sharding(4), + ReferenceSystem::SPO, ); - let actual = storage - .load(common::SHARDING_ZARR) - .unwrap() - .get_subject(3) - .unwrap(); + let actual = match storage + .load(Backend::FileSystem(common::SHARDING_ZARR))? + .get_subject(common::Subject::Wilmslow.into())? + { + OpsFormat::Zarr(actual) => actual, + _ => unreachable!(), + }; + + let mut expected = vec![0u32; storage.get_dictionary().objects_size()]; + expected[5] = common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()) as u32; + expected[6] = common::Predicate::Country.get_idx(&storage.get_dictionary()) as u32; - assert_eq!(actual, vec![0, 0, 0, 0, 0, 5, 1, 0, 0]) + if actual == expected { + Ok(()) + } else { + Err(String::from("Expected and actual results are not equals").into()) + } } #[test] -fn get_subject_tabular_test() { - let mut storage = LocalStorage::new(TabularLayout); - common::setup(common::TABULAR_ZARR, &mut storage, ChunkingStrategy::Chunk); - - let actual = storage - .load_sparse(common::TABULAR_ZARR) - .unwrap() - .get_subject(common::Subject::Alan.get_idx(&storage.get_dictionary())) - .unwrap(); - - let mut result = TriMat::new((4, 9)); - result.add_triplet(0, 0, 2); - result.add_triplet(0, 1, 4); - result.add_triplet(0, 2, 5); - result.add_triplet(0, 7, 7); - result.add_triplet(0, 8, 8); - let result = result.to_csc(); - assert_eq!(actual, result) +fn get_subject_tabular_test() -> Result<(), Box> { + let mut storage = Storage::new(TabularLayout, Serialization::Sparse); + + common::setup( + common::TABULAR_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + ); + + let actual = match storage + .load(Backend::FileSystem(common::TABULAR_ZARR))? + .get_subject(common::Subject::Alan.into())? + { + OpsFormat::SparseArray(actual) => actual, + _ => unreachable!(), + }; + + let mut expected = TriMat::new(( + storage.get_dictionary().subjects_size(), + storage.get_dictionary().objects_size(), + )); + expected.add_triplet( + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Object::Human.get_idx(&storage.get_dictionary()), + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Object::Warrington.get_idx(&storage.get_dictionary()), + common::Predicate::PlaceOfBirth.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Object::Wilmslow.get_idx(&storage.get_dictionary()), + common::Predicate::PlaceOfDeath.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Object::Date.get_idx(&storage.get_dictionary()), + common::Predicate::DateOfBirth.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Object::GCHQ.get_idx(&storage.get_dictionary()), + common::Predicate::Employer.get_idx(&storage.get_dictionary()), + ); + + if actual == expected.to_csc() { + Ok(()) + } else { + Err(String::from("Expected and actual results are not equals").into()) + } } diff --git a/tests/orientation.rs b/tests/orientation.rs new file mode 100644 index 0000000..7b56d25 --- /dev/null +++ b/tests/orientation.rs @@ -0,0 +1,206 @@ +use common::set_expected_first_term_matrix; +use remote_hdt::storage::layout::matrix::MatrixLayout; +use remote_hdt::storage::layout::tabular::TabularLayout; +use remote_hdt::storage::ops::Ops; +use remote_hdt::storage::ops::OpsFormat; +use remote_hdt::storage::params::Backend; +use remote_hdt::storage::params::ChunkingStrategy; +use remote_hdt::storage::params::ReferenceSystem; +use remote_hdt::storage::params::Serialization; +use remote_hdt::storage::Storage; +use sprs::TriMat; +use std::error::Error; + +mod common; + +#[test] +fn orientation_pso_matrix_test() -> Result<(), Box> { + let mut storage = Storage::new(MatrixLayout, Serialization::Zarr); + + common::setup( + common::PSO_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::PSO, + ); + + let actual = match storage + .load(Backend::FileSystem(common::PSO_ZARR))? + .get_predicate(common::Predicate::InstanceOf.into())? + { + OpsFormat::Zarr(actual) => actual, + _ => unreachable!(), + }; + + let mut expected = vec![0u32; storage.get_dictionary().objects_size()]; + set_expected_first_term_matrix( + &mut expected, + common::Subject::Alan, + common::Predicate::InstanceOf, + common::Object::Human, + &storage.get_dictionary(), + ReferenceSystem::PSO, + ); + set_expected_first_term_matrix( + &mut expected, + common::Subject::Wilmslow, + common::Predicate::InstanceOf, + common::Object::Town, + &storage.get_dictionary(), + ReferenceSystem::PSO, + ); + set_expected_first_term_matrix( + &mut expected, + common::Subject::Bombe, + common::Predicate::InstanceOf, + common::Object::Computer, + &storage.get_dictionary(), + ReferenceSystem::PSO, + ); + + if actual == expected { + Ok(()) + } else { + Err(String::from("Expected and actual results are not equals").into()) + } +} + +#[test] +fn orientation_ops_matrix_test() -> Result<(), Box> { + let mut storage = Storage::new(MatrixLayout, Serialization::Zarr); + + common::setup( + common::OPS_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::OPS, + ); + + let actual = match storage + .load(Backend::FileSystem(common::OPS_ZARR))? + .get_object(common::Object::Alan.into())? + { + OpsFormat::Zarr(actual) => actual, + _ => unreachable!(), + }; + + let mut expected = vec![0u32; storage.get_dictionary().subjects_size()]; + set_expected_first_term_matrix( + &mut expected, + common::Subject::Bombe, + common::Predicate::Discoverer, + common::Object::Alan, + &storage.get_dictionary(), + ReferenceSystem::OPS, + ); + + if actual == expected { + Ok(()) + } else { + println!("{:?}", actual); + Err(String::from("Expected and actual results are not equals").into()) + } +} + +#[test] +fn orientation_pso_tabular_test() -> Result<(), Box> { + let mut storage = Storage::new(TabularLayout, Serialization::Sparse); + + common::setup( + common::TABULAR_PSO_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::PSO, + ); + + let actual = match storage + .load(Backend::FileSystem(common::TABULAR_PSO_ZARR))? + .get_predicate(common::Predicate::InstanceOf.into())? + { + OpsFormat::SparseArray(actual) => actual, + _ => unreachable!(), + }; + + let mut expected = TriMat::new(( + storage.get_dictionary().predicates_size(), + storage.get_dictionary().objects_size(), + )); + expected.add_triplet( + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + common::Object::Human.get_idx(&storage.get_dictionary()), + common::Subject::Alan.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + common::Object::Town.get_idx(&storage.get_dictionary()), + common::Subject::Wilmslow.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + common::Object::Computer.get_idx(&storage.get_dictionary()), + common::Subject::Bombe.get_idx(&storage.get_dictionary()), + ); + + if actual == expected.to_csc() { + Ok(()) + } else { + println!("{:?}", actual); + Err(String::from("Expected and actual results are not equals").into()) + } +} + +#[test] +fn orientation_ops_tabular_test() -> Result<(), Box> { + let mut storage = Storage::new(TabularLayout, Serialization::Sparse); + + common::setup( + common::TABULAR_OPS_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::OPS, + ); + + let actual = match storage + .load(Backend::FileSystem(common::TABULAR_OPS_ZARR))? + .get_subject(common::Subject::Alan.into())? + { + OpsFormat::SparseArray(actual) => actual, + _ => unreachable!(), + }; + + let mut expected = TriMat::new(( + storage.get_dictionary().objects_size(), + storage.get_dictionary().subjects_size(), + )); + expected.add_triplet( + common::Object::Human.get_idx(&storage.get_dictionary()), + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Predicate::InstanceOf.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Object::Warrington.get_idx(&storage.get_dictionary()), + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Predicate::PlaceOfBirth.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Object::Wilmslow.get_idx(&storage.get_dictionary()), + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Predicate::PlaceOfDeath.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Object::Date.get_idx(&storage.get_dictionary()), + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Predicate::DateOfBirth.get_idx(&storage.get_dictionary()), + ); + expected.add_triplet( + common::Object::GCHQ.get_idx(&storage.get_dictionary()), + common::Subject::Alan.get_idx(&storage.get_dictionary()), + common::Predicate::Employer.get_idx(&storage.get_dictionary()), + ); + + if actual == expected.to_csc() { + Ok(()) + } else { + Err(String::from("Expected and actual results are not equals").into()) + } +} diff --git a/tests/write_read_test.rs b/tests/write_read_test.rs index cebabe8..6ee9550 100644 --- a/tests/write_read_test.rs +++ b/tests/write_read_test.rs @@ -1,57 +1,93 @@ -use remote_hdt::storage::{ - matrix::MatrixLayout, tabular::TabularLayout, ChunkingStrategy, LocalStorage, -}; +use remote_hdt::storage::layout::matrix::MatrixLayout; +use remote_hdt::storage::layout::tabular::TabularLayout; +use remote_hdt::storage::params::Backend; +use remote_hdt::storage::params::ChunkingStrategy; +use remote_hdt::storage::params::ReferenceSystem; +use remote_hdt::storage::params::Serialization; +use remote_hdt::storage::Storage; mod common; #[test] fn write_read_tabular_test() { - let mut storage = LocalStorage::new(TabularLayout); - common::setup(common::MATRIX_ZARR, &mut storage, ChunkingStrategy::Chunk); + let mut storage = Storage::new(TabularLayout, Serialization::Sparse); + + common::setup( + common::TABULAR_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + ); + + storage + .load(Backend::FileSystem(common::TABULAR_ZARR)) + .unwrap(); + assert_eq!( - storage.load_sparse(common::TABULAR_ZARR).unwrap(), + storage.get_sparse_array().unwrap(), common::Graph::new(&storage.get_dictionary()) ) } #[test] fn write_read_matrix_test() { - let mut storage = LocalStorage::new(MatrixLayout); - common::setup(common::MATRIX_ZARR, &mut storage, ChunkingStrategy::Chunk); + let mut storage = Storage::new(MatrixLayout, Serialization::Sparse); + + common::setup( + common::MATRIX_ZARR, + &mut storage, + ChunkingStrategy::Chunk, + ReferenceSystem::SPO, + ); + + storage + .load(Backend::FileSystem(common::MATRIX_ZARR)) + .unwrap(); + assert_eq!( - storage.load_sparse(common::MATRIX_ZARR).unwrap(), + storage.get_sparse_array().unwrap(), common::Graph::new(&storage.get_dictionary()) ) } #[test] fn write_read_matrix_sharding_test() { - let mut storage = LocalStorage::new(MatrixLayout); + let mut storage = Storage::new(MatrixLayout, Serialization::Sparse); common::setup( common::SHARDING_ZARR, &mut storage, ChunkingStrategy::Sharding(3), + ReferenceSystem::SPO, ); + storage + .load(Backend::FileSystem(common::SHARDING_ZARR)) + .unwrap(); + assert_eq!( - storage.load_sparse(common::SHARDING_ZARR).unwrap(), + storage.get_sparse_array().unwrap(), common::Graph::new(&storage.get_dictionary()) ) } #[test] fn write_read_larger_than_triples_shard_test() { - let mut storage = LocalStorage::new(MatrixLayout); + let mut storage = Storage::new(MatrixLayout, Serialization::Sparse); common::setup( common::LARGER_ZARR, &mut storage, ChunkingStrategy::Sharding(10000), + ReferenceSystem::SPO, ); + storage + .load(Backend::FileSystem(common::LARGER_ZARR)) + .unwrap(); + assert_eq!( - storage.load_sparse(common::LARGER_ZARR).unwrap(), + storage.get_sparse_array().unwrap(), common::Graph::new(&storage.get_dictionary()) ) }