Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
angelip2303 committed Dec 26, 2023
1 parent 2bb6763 commit 03bb447
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 54 deletions.
33 changes: 30 additions & 3 deletions src/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ use std::collections::HashSet;

use fcsd::Set;

use crate::storage::params::ReferenceSystem;

use super::utils::hash_to_set;

#[derive(Clone)]
pub struct Dictionary {
reference_system: ReferenceSystem,
subjects: Set,
predicates: Set,
objects: Set,
Expand All @@ -14,6 +17,7 @@ pub struct Dictionary {
impl Default for Dictionary {
fn default() -> Self {
Dictionary {
reference_system: ReferenceSystem::SPO,
subjects: Set::new(vec!["PlaceHolder"]).unwrap(),
predicates: Set::new(vec!["PlaceHolder"]).unwrap(),
objects: Set::new(vec!["PlaceHolder"]).unwrap(),
Expand All @@ -23,23 +27,27 @@ impl Default for Dictionary {

impl Dictionary {
pub(crate) fn from_vec_str(
reference_system: ReferenceSystem,
subjects: &Vec<String>,
predicates: &Vec<String>,
objects: &Vec<String>,
) -> Self {
Dictionary {
reference_system,
subjects: Set::new(subjects).unwrap(),
predicates: Set::new(predicates).unwrap(),
objects: Set::new(objects).unwrap(),
}
}

pub(crate) fn from_set_terms(
reference_system: ReferenceSystem,
subjects: HashSet<String>,
predicates: HashSet<String>,
objects: HashSet<String>,
) -> Self {
Dictionary {
reference_system,
subjects: Set::new(hash_to_set(subjects)).unwrap(),
predicates: Set::new(hash_to_set(predicates)).unwrap(),
objects: Set::new(hash_to_set(objects)).unwrap(),
Expand Down Expand Up @@ -70,9 +78,18 @@ impl Dictionary {
self.objects.to_owned()
}

pub fn get_reference_system(&self) -> ReferenceSystem {
self.reference_system.to_owned()
}

pub fn get_subject_idx(&self, subject: &str) -> Option<usize> {
let mut locator = self.subjects.locator();
locator.run(subject)
match self.reference_system {
ReferenceSystem::PSO | ReferenceSystem::OSP => {
locator.run(subject).map(|value| value + 1)
}
_ => locator.run(subject),
}
}

pub fn get_subject_idx_unchecked(&self, subject: &str) -> usize {
Expand All @@ -81,7 +98,12 @@ impl Dictionary {

pub fn get_predicate_idx(&self, predicate: &str) -> Option<usize> {
let mut locator = self.predicates.locator();
locator.run(predicate).map(|value| value + 1)
match self.reference_system {
ReferenceSystem::SPO | ReferenceSystem::OPS => {
locator.run(predicate).map(|value| value + 1)
}
_ => locator.run(predicate),
}
}

pub fn get_predicate_idx_unchecked(&self, predicate: &str) -> usize {
Expand All @@ -90,7 +112,12 @@ impl Dictionary {

pub fn get_object_idx(&self, object: &str) -> Option<usize> {
let mut locator = self.objects.locator();
locator.run(object)
match self.reference_system {
ReferenceSystem::SOP | ReferenceSystem::POS => {
locator.run(object).map(|value| value + 1)
}
_ => locator.run(object),
}
}

pub fn get_object_idx_unchecked(&self, object: &str) -> usize {
Expand Down
3 changes: 2 additions & 1 deletion src/engine/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use sprs::TriMat;

use crate::storage::ZarrArray;

use super::{EngineResult, EngineStrategy};
use super::EngineResult;
use super::EngineStrategy;

impl EngineStrategy<ZarrArray> for ZarrArray {
fn get_first_term(&self, index: usize) -> EngineResult<ZarrArray> {
Expand Down
29 changes: 15 additions & 14 deletions src/engine/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,33 @@ use zarrs::array::Array;
use zarrs::array_subset::ArraySubset;
use zarrs::storage::ReadableStorageTraits;

use crate::error::EngineError;
use crate::storage::ZarrType;
use crate::utils::columns_per_shard;
use crate::utils::rows_per_shard;

use super::EngineResult;
use super::EngineStrategy;

impl<T: ReadableStorageTraits> EngineStrategy<Vec<u8>> for Array<T> {
fn get_first_term(&self, index: usize) -> EngineResult<Vec<u8>> {
impl<T: ReadableStorageTraits> EngineStrategy<Vec<ZarrType>> for Array<T> {
fn get_first_term(&self, index: usize) -> EngineResult<Vec<ZarrType>> {
let index_to_chunk = index as u64 / rows_per_shard(self);
let chunk_to_index = index % rows_per_shard(self) as usize;
match self
.retrieve_chunk(&[index_to_chunk, 0])?
.chunks(columns_per_shard(self) as usize)
.nth(chunk_to_index)
{
Some(ans) => Ok(ans.to_owned()),
None => Err(EngineError::Operation),
}
let chunk_to_index = index as u64 % rows_per_shard(self);
Ok(self
.retrieve_chunk_subset_elements(
&[index_to_chunk, 0],
&ArraySubset::new_with_start_end_inc(
vec![chunk_to_index, 0],
vec![chunk_to_index, columns_per_shard(self) - 1],
)?,
)?
.to_vec())
}

fn get_second_term(&self, _index: usize) -> EngineResult<Vec<u8>> {
fn get_second_term(&self, _index: usize) -> EngineResult<Vec<ZarrType>> {
unimplemented!()
}

fn get_third_term(&self, index: usize) -> EngineResult<Vec<u8>> {
fn get_third_term(&self, index: usize) -> EngineResult<Vec<ZarrType>> {
let start = vec![0, index as u64];
let end = vec![self.shape()[0], index as u64];
let shape = &ArraySubset::new_with_start_end_inc(start, end)?;
Expand Down
12 changes: 10 additions & 2 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,16 @@ trait Backend<T: TriplesParser, E: From<<T>::Error>> {
return Err(ParserError::Dictionary(err));
}

let mut graph = vec![Vec::new(); subjects.len()];
let dictionary = Dictionary::from_set_terms(subjects, predicates, objects);
let mut graph = vec![
Vec::new();
match reference_system {
ReferenceSystem::SPO | ReferenceSystem::SOP => subjects.len(),
ReferenceSystem::PSO | ReferenceSystem::POS => predicates.len(),
ReferenceSystem::OSP | ReferenceSystem::OPS => objects.len(),
}
];
let dictionary =
Dictionary::from_set_terms(reference_system.to_owned(), subjects, predicates, objects);

if let Err(err) = Self::parser_fn(path, &mut |triple: Triple| {
{
Expand Down
13 changes: 6 additions & 7 deletions src/storage/layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use super::ZarrArray;
type ArrayToBytesCodec = Box<dyn ArrayToBytesCodecTraits>;

pub trait LayoutOps<R, T: TriviallyTransmutable, C> {
fn retrieve_attributes(
&mut self,
arr: &Array<R>,
) -> StorageResult<(Dictionary, ReferenceSystem)> {
fn retrieve_attributes(&mut self, arr: &Array<R>) -> StorageResult<Dictionary> {
// 4. We get the attributes so we can obtain some values that we will need
let attributes = arr.attributes();

Expand All @@ -55,16 +52,18 @@ pub trait LayoutOps<R, T: TriviallyTransmutable, C> {
.unwrap()
.into();

Ok((
Dictionary::from_vec_str(subjects, predicates, objects),
Ok(Dictionary::from_vec_str(
reference_system,
subjects,
predicates,
objects,
))
}

fn serialize(&mut self, arr: Array<FilesystemStore>, graph: Graph) -> StorageResult<()> {
let columns = arr.shape()[1] as usize;
let count = AtomicU64::new(0);
let binding = self.graph_iter(graph);
let binding = self.graph_iter(graph.to_owned());
let iter = binding.chunks_exact(rows_per_shard(&arr) as usize);
let remainder = iter.remainder();

Expand Down
12 changes: 6 additions & 6 deletions src/storage/matrix.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use parking_lot::Mutex;
use sprs::TriMat;
use std::sync::atomic::AtomicU8;
use std::sync::atomic::Ordering;
use zarrs::array::codec::array_to_bytes::sharding::ShardingCodecBuilder;
use zarrs::array::codec::ArrayToBytesCodecTraits;
Expand All @@ -14,16 +13,17 @@ use zarrs::storage::ReadableStorageTraits;

use super::layout::Layout;
use super::layout::LayoutOps;
use super::AtomicZarrType;
use super::ChunkingStrategy;
use super::Dimensionality;
use super::ReferenceSystem;
use super::StorageResult;
use super::ZarrArray;
use super::ZarrType;

use crate::io::Graph;
use crate::utils::rows_per_shard;

type ZarrType = u8;
type Chunk = Vec<(u32, u32)>;

pub struct MatrixLayout;
Expand All @@ -40,7 +40,7 @@ where
}

fn data_type(&self) -> DataType {
DataType::UInt8
DataType::UInt64
}

fn chunk_shape(
Expand All @@ -56,7 +56,7 @@ where
}

fn fill_value(&self) -> FillValue {
FillValue::from(0u8)
FillValue::from(0 as ZarrType)
}

fn dimension_names(&self, reference_system: &ReferenceSystem) -> Option<Vec<DimensionName>> {
Expand Down Expand Up @@ -112,9 +112,9 @@ where
// having the size of the shard; that is, number of rows, and a given
// number of columns. This value is converted into an AtomicU8 for us to
// be able to share it among threads
let slice: Vec<AtomicU8> = vec![0u8; chunk.len() * columns]
let slice: Vec<AtomicZarrType> = vec![0 as ZarrType; chunk.len() * columns]
.iter()
.map(|&n| AtomicU8::new(n))
.map(|&n| AtomicZarrType::new(n))
.collect();

for (first_term, triples) in chunk.iter().enumerate() {
Expand Down
17 changes: 9 additions & 8 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde_json::Map;
use sprs::CsMat;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use zarrs::array::Array;
use zarrs::array::ArrayBuilder;
Expand All @@ -29,7 +30,9 @@ pub mod ops;
pub mod params;
pub mod tabular;

pub type ZarrArray = CsMat<u8>;
pub type ZarrArray = CsMat<ZarrType>;
pub type ZarrType = u64;
type AtomicZarrType = AtomicU64;
pub type StorageResult<T> = Result<T, RemoteHDTError>;
pub type LocalStorage<T, C> = Storage<FilesystemStore, T, C>;
pub type HTTPStorage<T, C> = Storage<HTTPStore, T, C>;
Expand Down Expand Up @@ -70,11 +73,10 @@ impl<R: ReadableStorageTraits, T: TriviallyTransmutable, C> Storage<R, T, C> {
fn process_zarr(&mut self, storage: R) -> StorageResult<&Self> {
let store = Arc::new(storage);
let arr = Array::new(store, ARRAY_NAME)?;
let (dictionary, ref_system) = self.layout.retrieve_attributes(&arr)?;
let dictionary = self.layout.retrieve_attributes(&arr)?;
self.dictionary = dictionary;
self.reference_system = ref_system;
self.dimensionality =
Dimensionality::new(&self.reference_system, &self.dictionary, &Graph::default());
self.reference_system = self.dictionary.get_reference_system();
self.dimensionality = Dimensionality::new(&self.dictionary, &Graph::default());

match self.serialization {
Serialization::Zarr => self.array = Some(arr),
Expand Down Expand Up @@ -126,8 +128,7 @@ impl<T: TriviallyTransmutable, C> LocalStorage<T, C> {
let graph = match RdfParser::parse(rdf_path, &reference_system) {
Ok((graph, dictionary)) => {
self.dictionary = dictionary;
self.dimensionality =
Dimensionality::new(&reference_system, &self.dictionary, &graph);
self.dimensionality = Dimensionality::new(&self.dictionary, &graph);
graph
}
Err(_) => todo!(),
Expand Down Expand Up @@ -155,7 +156,7 @@ impl<T: TriviallyTransmutable, C> LocalStorage<T, C> {
attributes.insert("objects".into(), rdf_to_value(objects));
attributes.insert("reference_system".into(), reference_system.as_ref().into());
attributes
}) // TODO: one attribute should be the Layout
})
.build(store, ARRAY_NAME)?;

arr.store_metadata()?;
Expand Down
3 changes: 2 additions & 1 deletion src/storage/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ use super::params::ReferenceSystem;
use super::params::Serialization;
use super::Storage;
use super::ZarrArray;
use super::ZarrType;

pub type OpsResult = Result<OpsFormat, OpsError>;

pub enum OpsFormat {
SparseArray(ZarrArray),
Zarr(Vec<u8>),
Zarr(Vec<ZarrType>),
}

pub trait Ops {
Expand Down
13 changes: 5 additions & 8 deletions src/storage/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub enum ThreadingStrategy {
Multi,
}

#[derive(Clone)]
pub enum ReferenceSystem {
SPO,
SOP,
Expand Down Expand Up @@ -72,27 +73,23 @@ impl From<&str> for ReferenceSystem {
}

impl Dimensionality {
pub(crate) fn new(
reference_system: &ReferenceSystem,
dictionary: &Dictionary,
graph: &Graph,
) -> Self {
pub(crate) fn new(dictionary: &Dictionary, graph: &Graph) -> Self {
Dimensionality {
graph_size: graph
.iter()
.map(|triples| triples.len())
.reduce(|acc, a| acc + a),
first_term_size: match reference_system {
first_term_size: match dictionary.get_reference_system() {
ReferenceSystem::SPO | ReferenceSystem::SOP => dictionary.subjects_size(),
ReferenceSystem::POS | ReferenceSystem::PSO => dictionary.predicates_size(),
ReferenceSystem::OPS | ReferenceSystem::OSP => dictionary.objects_size(),
},
second_term_size: match reference_system {
second_term_size: match dictionary.get_reference_system() {
ReferenceSystem::PSO | ReferenceSystem::OSP => dictionary.subjects_size(),
ReferenceSystem::SPO | ReferenceSystem::OPS => dictionary.predicates_size(),
ReferenceSystem::SOP | ReferenceSystem::POS => dictionary.objects_size(),
},
third_term_size: match reference_system {
third_term_size: match dictionary.get_reference_system() {
ReferenceSystem::POS | ReferenceSystem::OPS => dictionary.subjects_size(),
ReferenceSystem::SOP | ReferenceSystem::OSP => dictionary.predicates_size(),
ReferenceSystem::SPO | ReferenceSystem::PSO => dictionary.objects_size(),
Expand Down
4 changes: 3 additions & 1 deletion src/storage/tabular.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,18 @@ where
// as we load elements by row
if let Ok(chunk_elements) = arr.retrieve_chunk_elements::<usize>(&[i as ZarrType, 0]) {
chunk_elements.chunks(3).for_each(|triple| {
println!("{} {} {}", triple[0], triple[2], triple[1] as ZarrType);
matrix
.lock()
.add_triplet(triple[0], triple[2], triple[1] as u8);
.add_triplet(triple[0], triple[2], triple[1] as ZarrType);
})
}
});

// We use a CSC Matrix because typically, RDF knowledge graphs tend to
// have more rows than columns
let x = matrix.lock();

Ok(x.to_csc())
}

Expand Down
Loading

0 comments on commit 03bb447

Please sign in to comment.