Skip to content

Commit

Permalink
want to try something, just in case i fk up
Browse files Browse the repository at this point in the history
  • Loading branch information
DiegoMfer committed Feb 28, 2024
1 parent 8d27df5 commit 1e48773
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 105 deletions.
12 changes: 11 additions & 1 deletion examples/complement_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,18 @@ use remote_hdt::error::RemoteHDTError;
use remote_hdt::complement::layout::default::DefaultComplementLayout;
use remote_hdt::complement::ops::Ops;
use remote_hdt::complement::ComplementStorage;
use remote_hdt::storage::params::Backend;
fn main() {

let args: Vec<String> = env::args().collect();
if args.len() <= 1 {
panic!("Usage: cargo run --example query_bench <number_of_universities>");
}

let number_of_universities: &String = &args[1];
let zarr_path = format!("{}-lubm", number_of_universities);

let mut binding = ComplementStorage::new(DefaultComplementLayout);

let arr = binding.load(Backend::FileSystem(format!("{}.zarr", zarr_path).as_str()))?;

}
62 changes: 61 additions & 1 deletion src/complement/layout/default.rs
Original file line number Diff line number Diff line change
@@ -1 +1,61 @@
pub struct DefaultComplementLayout;
use super::ComplementLayout;


pub struct DefaultComplementLayout;
use crate::complement::Dimensionality;
use crate::complement::layout::DataType;
use crate::complement::ChunkingStrategy;
use crate::complement::layout::ComplementStorageResult;
use crate::complement::layout::ComplementLayoutOps;

use std::num::NonZeroU64;

use zarrs::array::ChunkGrid;
use zarrs::array::FillValue;
use zarrs::array::DimensionName;
use zarrs::array::codec::ArrayToBytesCodecTraits;
use zarrs::array::codec::array_to_bytes::sharding::ShardingCodecBuilder;
use zarrs::array::codec::GzipCodec;


impl ComplementLayout for DefaultComplementLayout{

Check failure on line 21 in src/complement/layout/default.rs

View workflow job for this annotation

GitHub Actions / Test Suite (stable)

missing generics for trait `ComplementLayout`

Check failure on line 21 in src/complement/layout/default.rs

View workflow job for this annotation

GitHub Actions / Clippy (stable)

missing generics for trait `complement::layout::ComplementLayout`

Check failure on line 21 in src/complement/layout/default.rs

View workflow job for this annotation

GitHub Actions / Check (stable)

missing generics for trait `ComplementLayout`
fn shape(&self, dimensionality: &Dimensionality) -> Vec<u64> {
vec![
dimensionality.get_graph_size(),
dimensionality.get_nodes_size(),
]
}

fn data_type(&self) -> DataType {
DataType::UInt64
}

fn chunk_shape(&self, chunking_strategy: ChunkingStrategy, _: &Dimensionality) -> ChunkGrid {
vec![chunking_strategy.into(), NonZeroU64::new(3).unwrap()].into() // TODO: make this a constant value
}

fn fill_value(&self) -> FillValue {
FillValue::from(0u64)
}

fn dimension_names(&self) -> Option<Vec<DimensionName>> {
Some(vec![
DimensionName::new("Triples"),
DimensionName::new("Fields"),
])
}

//TODO
fn array_to_bytes_codec(
&self,
_: &Dimensionality,
) -> ComplementStorageResult<Box<dyn ArrayToBytesCodecTraits>> {
let mut sharding_codec_builder = ShardingCodecBuilder::new(vec![1, 3].try_into()?);
sharding_codec_builder.bytes_to_bytes_codecs(vec![Box::new(GzipCodec::new(5)?)]);
Ok(Box::new(sharding_codec_builder.build()))
}
}

impl ComplementLayoutOps for DefaultComplementLayout {

Check failure on line 59 in src/complement/layout/default.rs

View workflow job for this annotation

GitHub Actions / Test Suite (stable)

missing generics for trait `ComplementLayoutOps`

Check failure on line 59 in src/complement/layout/default.rs

View workflow job for this annotation

GitHub Actions / Clippy (stable)

missing generics for trait `complement::layout::ComplementLayoutOps`

Check failure on line 59 in src/complement/layout/default.rs

View workflow job for this annotation

GitHub Actions / Check (stable)

missing generics for trait `ComplementLayoutOps`

}
142 changes: 45 additions & 97 deletions src/complement/layout/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,33 @@ pub mod default;

use zarrs::storage::store::OpendalStore;
use zarrs::array::Array;
pub trait ComplementLayout {
fn retrieve_attributes(&mut self, arr: &Array<OpendalStore>) {
// 4. We get the attributes so we can obtain some values that we will need
let attributes = arr.attributes();

let subjects = &value_to_term(match attributes.get("subjects") {
Some(subjects) => subjects,
None => return Err(RemoteHDTError::SubjectsNotInJSON),
});
let predicates = &value_to_term(match attributes.get("predicates") {
Some(predicates) => predicates,
None => return Err(RemoteHDTError::PredicatesNotInJSON),
});
let objects = &value_to_term(match attributes.get("objects") {
Some(objects) => objects,
None => return Err(RemoteHDTError::ObjectsNotInJSON),
});

let reference_system: ReferenceSystem = match attributes.get("reference_system") {
Some(reference_system) => reference_system,
None => return Err(RemoteHDTError::ReferenceSystemNotInJSON),
}
.as_str()
.unwrap()
.into();

Ok(Dictionary::from_vec_str(
reference_system,
subjects,
predicates,
objects,
))
}
use zarrs::array::DataType;
use zarrs::array::ChunkGrid;
use zarrs::array::FillValue;
use zarrs::array::DimensionName;
use zarrs::array::codec::ArrayToBytesCodecTraits;
use zarrs::array_subset::ArraySubset;

use std::sync::atomic::AtomicU64;

use std::sync::atomic::Ordering;

use crate::complement::Dimensionality;
use crate::complement::params::ChunkingStrategy;
use crate::complement::RemoteHDTError;
use crate::complement::Graph;

use crate::utils::columns_per_shard;
use crate::utils::rows_per_shard;
use crate::utils::value_to_term;

Check warning on line 23 in src/complement/layout/mod.rs

View workflow job for this annotation

GitHub Actions / Test Suite (stable)

unused import: `crate::utils::value_to_term`

Check failure on line 23 in src/complement/layout/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy (stable)

unused import: `crate::utils::value_to_term`

Check warning on line 23 in src/complement/layout/mod.rs

View workflow job for this annotation

GitHub Actions / Check (stable)

unused import: `crate::utils::value_to_term`

fn serialize(&mut self, arr: Array<OpendalStore>, graph: Graph) -> StorageResult<()> {

pub type ComplementStorageResult<T> = Result<T, RemoteHDTError>;
type ArrayToBytesCodec = Box<dyn ArrayToBytesCodecTraits>;


pub trait ComplementLayoutOps<C>{
fn serialize(&mut self, arr: Array<OpendalStore>, graph: Graph) -> ComplementStorageResult<()> {
let columns = arr.shape()[1] as usize;
let count = AtomicU64::new(0);
let binding = self.graph_iter(graph.to_owned());
Expand Down Expand Up @@ -64,70 +56,26 @@ pub trait ComplementLayout {
Ok(())
}

fn parse(
&mut self,
arr: &Array<OpendalStore>,
dimensionality: &Dimensionality,
) -> StorageResult<ZarrArray> {
// First, we create the 2D matrix in such a manner that the number of
// rows is the same as the size of the first terms; i.e, in the SPO
// orientation, that will be equals to the number of subjects, while
// the number of columns is equals to the size of the third terms; i.e,
// following the same example as before, it will be equals to the number
// of objects. In our case the dimensionality abstracts the process
// of getting the size of the concrete dimension
let matrix = Mutex::new(TriMat::new((
dimensionality.first_term_size, // we obtain the size of the first terms
dimensionality.third_term_size, // we obtain the size of the third terms
)));

// We compute the number of shards; for us to achieve so, we have to obtain
// first dimension of the chunk grid
let number_of_shards = match arr.chunk_grid_shape() {
Some(chunk_grid) => chunk_grid[0],

None => 0,
};

let number_of_columns = arr.shape()[1] as usize;

// For each chunk in the Zarr array we retrieve it and parse it into a
// matrix, inserting the triplet in its corresponding position. The idea
// of parsing the array chunk-by-chunk allows us to keep the RAM usage
// low, as instead of parsing the whole array, we process smaller pieces
// of it. Once we have all the pieces processed, we will have parsed the
// whole array
for shard in 0..number_of_shards {
arr.retrieve_chunk_elements(&[shard, 0])?
// We divide each shard by the number of columns, as a shard is
// composed of chunks having the size of [1, number of cols]
.chunks(number_of_columns)
.enumerate()
.for_each(|(first_term_idx, chunk)| {
self.retrieve_chunk_elements(
&matrix,
first_term_idx + (shard * rows_per_shard(arr)) as usize,
chunk,
);
})
}

// We use a CSC Matrix because typically, RDF knowledge graphs tend to
// have more rows than columns; as such, CSC matrices are optimized
// for that precise scenario
let x = matrix.lock();
Ok(x.to_csc())
}

fn graph_iter(&self, graph: Graph) -> Vec<C>;
fn store_chunk_elements(&self, chunk: &[C], columns: usize) -> Vec<u64>;
fn retrieve_chunk_elements(
&mut self,
matrix: &Mutex<TriMat<usize>>,
first_term_idx: usize,
chunk: &[usize],
);
fn sharding_factor(&self, dimensionality: &Dimensionality) -> usize;

}

pub trait ComplementLayout<C>: ComplementLayoutOps <C>{
fn shape(&self, dimensionality: &Dimensionality) -> Vec<u64>;
fn data_type(&self) -> DataType;
fn chunk_shape(
&self,
chunking_strategy: ChunkingStrategy,
dimensionality: &Dimensionality,
) -> ChunkGrid;
fn fill_value(&self) -> FillValue;
fn dimension_names(&self) -> Option<Vec<DimensionName>>;
fn array_to_bytes_codec(
&self,
dimensionality: &Dimensionality,
) -> ComplementStorageResult<ArrayToBytesCodec>;
}




Loading

0 comments on commit 1e48773

Please sign in to comment.