Skip to content

Commit

Permalink
TupleSet storage now working. On to Column Storage!
Browse files Browse the repository at this point in the history
  • Loading branch information
jhellerstein committed Sep 21, 2024
1 parent 775e367 commit ca08cc2
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 122 deletions.
23 changes: 19 additions & 4 deletions hydroflow/tests/surface_lattice_bimorphism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ use std::collections::{HashMap, HashSet};

use hydroflow::util::collect_ready;
use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax};
use lattices::ght::GeneralizedHashTrieNode;
use lattices::ght::{GeneralizedHashTrieNode, GhtInner};
use lattices::ght_lattice::{DeepJoinLatticeBimorphism, GhtBimorphism};
use lattices::map_union::{KeyedBimorphism, MapUnionHashMap, MapUnionSingletonMap};
use lattices::set_union::{CartesianProductBimorphism, SetUnionHashSet, SetUnionSingletonSet};
use lattices::GhtType;
use multiplatform_test::multiplatform_test;
use variadics::hash_set::VariadicHashSet;
use variadics::{var_expr, CloneVariadic};

#[multiplatform_test]
Expand Down Expand Up @@ -146,14 +147,28 @@ fn test_ght_join_bimorphism() {
// type MyGhtATrie = <MyGhtA as GeneralizedHashTrie>::Trie;
// type MyGhtBTrie = <MyGhtB as GeneralizedHashTrie>::Trie;
type MyGhtATrie = GhtType!(u32, u64, u16 => &'static str);
let ght_a = MyGhtATrie::default();
type MyGhtBTrie = GhtType!(u32, u64, u16 => &'static str);

type Output = variadics::var_type!(u32, u64, u16, &'static str, &'static str);

type MyNodeBim =
<(MyGhtATrie, MyGhtBTrie) as DeepJoinLatticeBimorphism>::DeepJoinLatticeBimorphism;
type MyNodeBim = <(MyGhtATrie, MyGhtBTrie) as DeepJoinLatticeBimorphism<
VariadicHashSet<Output>,
>>::DeepJoinLatticeBimorphism;
let me_node_bim = MyNodeBim::default();
type MyBim = GhtBimorphism<MyNodeBim>;
// let me_bim = MyBim::default();
let me_bim = MyBim::default();

let mut hf = hydroflow_syntax! {
lhs = source_iter_delta([
var_expr!(123, 2, 5, "hello"),
var_expr!(50, 1, 1, "hi"),
var_expr!(5, 1, 7, "hi"),
var_expr!(5, 1, 7, "bye"),
])
-> map(|row| MyGhtATrie::new_from([row]))
-> state::<'tick, MyGhtATrie>();
};

let mut hf = hydroflow_syntax! {
lhs = source_iter_delta([
Expand Down
7 changes: 5 additions & 2 deletions hydroflow/tests/surface_lattice_generalized_hash_trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use hydroflow::lattices::ght::GeneralizedHashTrieNode;
use hydroflow::lattices::ght_lattice::{DeepJoinLatticeBimorphism, GhtBimorphism};
use hydroflow::lattices::GhtType;
use hydroflow::util::collect_ready;
use hydroflow::variadics::{var_expr, var_type}; // Import the Insert trait
use hydroflow::variadics::{var_expr, var_type};
use variadics::hash_set::VariadicHashSet; // Import the Insert trait

#[test]
fn test_basic() {
Expand Down Expand Up @@ -45,7 +46,9 @@ fn test_join() {
];
let s = vec![var_expr!(1, 10), var_expr!(5, 50)];

type MyNodeBim = <(MyGht, MyGht) as DeepJoinLatticeBimorphism>::DeepJoinLatticeBimorphism;
type MyNodeBim = <(MyGht, MyGht) as DeepJoinLatticeBimorphism<
VariadicHashSet<var_type!(u8, u16, u16)>,
>>::DeepJoinLatticeBimorphism;
type MyBim = GhtBimorphism<MyNodeBim>;

let mut df = hydroflow_syntax! {
Expand Down
55 changes: 32 additions & 23 deletions lattices/src/ght.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::hash::Hash;
use std::marker::PhantomData;

use sealed::sealed;
use variadics::hash_set::VariadicHashSet;
use variadics::{
var_args, var_type, PartialEqVariadic, RefVariadic, Split, SplitBySuffix, VariadicExt,
};
Expand All @@ -23,7 +24,7 @@ pub trait GeneralizedHashTrieNode: Default {
/// This type is the same in all nodes of the trie.
type ValType: VariadicExt + Eq + Hash + Clone;
/// The type that holds the data in the leaves
type Storage: TupleSet<Schema = Self::Schema> + Default;
type Storage: TupleSet<Schema = Self::Schema> + Default + IntoIterator<Item = Self::Schema>;

/// SuffixSchema variadic: the suffix of the schema *from this node of the trie
/// downward*. The first entry in this variadic is of type Head.
Expand Down Expand Up @@ -65,10 +66,10 @@ pub trait GeneralizedHashTrieNode: Default {
// ) -> impl Iterator<Item = <Self::Schema as VariadicExt>::AsRefVar<'_>>;

/// Bimorphism for joining on full tuple keys (all GhtInner keys) in the trie
type DeepJoin<Other>
type DeepJoin<Other, Storage>
where
Other: GeneralizedHashTrieNode,
(Self, Other): DeepJoinLatticeBimorphism;
(Self, Other): DeepJoinLatticeBimorphism<Storage>;

// /// For Inner nodes only, this is the type of the Child node
// type ChildNode: GeneralizedHashTrieNode;
Expand Down Expand Up @@ -171,10 +172,10 @@ where
// .flat_map(|(_k, vs)| vs.recursive_iter_keys().map(move |v| v))
// }

type DeepJoin<Other> = <(Self, Other) as DeepJoinLatticeBimorphism>::DeepJoinLatticeBimorphism
type DeepJoin<Other, Storage> = <(Self, Other) as DeepJoinLatticeBimorphism<Storage>>::DeepJoinLatticeBimorphism
where
Other: GeneralizedHashTrieNode,
(Self, Other): DeepJoinLatticeBimorphism;
(Self, Other): DeepJoinLatticeBimorphism<Storage>;

fn find_containing_leaf(
&self,
Expand Down Expand Up @@ -231,38 +232,46 @@ pub trait TupleSet {
fn len(&self) -> usize;

/// Return true if empty
fn is_empty(&self) -> bool;
fn is_empty(&self) -> bool {
self.len() == 0
}

fn drain(&mut self) -> impl Iterator<Item = Self::Schema>;

Check failure on line 239 in lattices/src/ght.rs

View workflow job for this annotation

GitHub Actions / Docs (rustdoc)

missing documentation for a method

/// Check for containment
fn contains(&self, value: &Self::Schema) -> bool;
fn contains(&self, value: <Self::Schema as VariadicExt>::AsRefVar<'_>) -> bool;
}

impl<Schema> TupleSet for HashSet<Schema>
impl<Schema> TupleSet for VariadicHashSet<Schema>
where
Schema: 'static + Eq + Hash + PartialEqVariadic,
for<'a> <Schema as VariadicExt>::AsRefVar<'a>: Hash,
{
type Schema = Schema;

fn insert(&mut self, element: Self::Schema) -> bool {
HashSet::insert(self, element)
self.insert(element)
}

fn iter(&self) -> impl Iterator<Item = <Self::Schema as VariadicExt>::AsRefVar<'_>> {
self.iter().map(Self::Schema::as_ref_var)
self.iter()
}

fn len(&self) -> usize {
HashSet::len(self)
self.len()
}

/// Return true if empty
fn is_empty(&self) -> bool {
self.len() == 0
}

fn contains(&self, value: &Self::Schema) -> bool {
let t = value;
HashSet::contains(self, &t)
fn drain(&mut self) -> impl Iterator<Item = Self::Schema> {
self.drain()
}

fn contains(&self, value: <Self::Schema as VariadicExt>::AsRefVar<'_>) -> bool {
self.get(value).is_some()
}
}

Expand Down Expand Up @@ -308,7 +317,7 @@ where
var_type!(ValHead, ...ValRest): Clone + Eq + Hash + PartialEqVariadic,
<Schema as SplitBySuffix<var_type!(ValHead, ...ValRest)>>::Prefix: Eq + Hash + Clone,
// for<'a> Schema::AsRefVar<'a>: PartialEq,
Storage: TupleSet<Schema = Schema> + Default,
Storage: TupleSet<Schema = Schema> + Default + IntoIterator<Item = Schema>,
{
type Schema = Schema;
type SuffixSchema = var_type!(ValHead, ...ValRest);
Expand Down Expand Up @@ -346,10 +355,10 @@ where
self.elements.iter() // .map(Schema::as_ref_var)
}

type DeepJoin<Other> = <(Self, Other) as DeepJoinLatticeBimorphism>::DeepJoinLatticeBimorphism
type DeepJoin<Other, TheStorage> = <(Self, Other) as DeepJoinLatticeBimorphism<TheStorage>>::DeepJoinLatticeBimorphism
where
Other: GeneralizedHashTrieNode,
(Self, Other): DeepJoinLatticeBimorphism;
(Self, Other): DeepJoinLatticeBimorphism<TheStorage>;

fn find_containing_leaf(
&self,
Expand Down Expand Up @@ -378,7 +387,7 @@ where
+ Clone
// + SplitBySuffix<var_type!(ValHead, ...ValRest)>
+ PartialEqVariadic,
Storage: TupleSet<Schema = Schema> + Default,
Storage: TupleSet<Schema = Schema> + Default + IntoIterator<Item = Schema>,
// ValHead: Clone + Eq + Hash,
// var_type!(ValHead, ...ValRest): Clone + Eq + Hash + PartialEqVariadic,
// <Schema as SplitBySuffix<var_type!(ValHead, ...ValRest)>>::Prefix: Eq + Hash + Clone,
Expand Down Expand Up @@ -420,10 +429,10 @@ where
self.elements.iter() //.map(Schema::as_ref_var)
}

type DeepJoin<Other> = <(Self, Other) as DeepJoinLatticeBimorphism>::DeepJoinLatticeBimorphism
type DeepJoin<Other, TheStorage> = <(Self, Other) as DeepJoinLatticeBimorphism<TheStorage>>::DeepJoinLatticeBimorphism
where
Other: GeneralizedHashTrieNode,
(Self, Other): DeepJoinLatticeBimorphism;
(Self, Other): DeepJoinLatticeBimorphism<TheStorage>;

fn find_containing_leaf(
&self,
Expand Down Expand Up @@ -776,17 +785,17 @@ macro_rules! GhtTypeWithSchema {

// Empty key (Leaf)
(() => $( $z:ty ),* => $schema:ty ) => (
$crate::ght::GhtLeaf::<$schema, $crate::variadics::var_type!($( $z ),* ), HashSet<$schema> >
$crate::ght::GhtLeaf::<$schema, $crate::variadics::var_type!($( $z ),* ), $crate::variadics::hash_set::VariadicHashSet<$schema> >
);

// Singleton key & Empty val (Inner over Leaf)
($a:ty => () => $schema:ty ) => (
$crate::ght::GhtInner::<$a, $crate::ght::GhtLeaf::<$schema, (), HashSet<$schema> >>
$crate::ght::GhtInner::<$a, $crate::ght::GhtLeaf::<$schema, (), $crate::variadics::hash_set::VariadicHashSet<$schema> >>
);

// Singleton key (Inner over Leaf)
($a:ty => $( $z:ty ),* => $schema:ty ) => (
$crate::ght::GhtInner::<$a, $crate::ght::GhtLeaf::<$schema, $crate::variadics::var_type!($( $z ),*), HashSet<$schema> >>
$crate::ght::GhtInner::<$a, $crate::ght::GhtLeaf::<$schema, $crate::variadics::var_type!($( $z ),*), $crate::variadics::hash_set::VariadicHashSet<$schema> >>
);

// Recursive case with empty val
Expand Down
47 changes: 23 additions & 24 deletions lattices/src/ght_lattice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{IsBot, IsTop, LatticeBimorphism, LatticeOrd, Merge};

impl<Head, Node> Merge<GhtInner<Head, Node>> for GhtInner<Head, Node>
where
Node: GeneralizedHashTrieNode + Merge<Node> + Clone,
Node: GeneralizedHashTrieNode + Merge<Node>,
Self: GeneralizedHashTrieNode,
Head: Hash + Eq + Clone,
{
Expand All @@ -47,13 +47,13 @@ where
}
}

impl<Schema, SuffixSchema, Storage> Merge<GhtLeaf<Schema, SuffixSchema, Storage>>
for GhtLeaf<Schema, SuffixSchema, Storage>
impl<Schema, ValType, Storage> Merge<GhtLeaf<Schema, ValType, Storage>>
for GhtLeaf<Schema, ValType, Storage>
where
Schema: Eq + Hash,
Storage: TupleSet<Schema = Schema> + Extend<Schema> + Iterator<Item = Schema>,
Storage: TupleSet<Schema = Schema> + Extend<Schema> + IntoIterator<Item = Schema>,
{
fn merge(&mut self, other: GhtLeaf<Schema, SuffixSchema, Storage>) -> bool {
fn merge(&mut self, other: GhtLeaf<Schema, ValType, Storage>) -> bool {
let old_len = self.elements.len();
self.elements.extend(other.elements);
self.elements.len() > old_len
Expand Down Expand Up @@ -472,48 +472,47 @@ where
}

/// bimorphism trait for equijoin on full tuple (keys in all GhtInner nodes)
pub trait DeepJoinLatticeBimorphism {
pub trait DeepJoinLatticeBimorphism<Storage> {
/// bimorphism type for equijoin on full tuple (keys in all GhtInner nodes)
type DeepJoinLatticeBimorphism;
}
/// bimorphism implementation for equijoin on full tuple (keys in all GhtInner nodes)
impl<Head, NodeA, NodeB> DeepJoinLatticeBimorphism
impl<Head, NodeA, NodeB, Storage> DeepJoinLatticeBimorphism<Storage>
for (GhtInner<Head, NodeA>, GhtInner<Head, NodeB>)
where
Head: 'static + Hash + Eq + Clone,
NodeA: 'static + GeneralizedHashTrieNode,
NodeB: 'static + GeneralizedHashTrieNode,
(NodeA, NodeB): DeepJoinLatticeBimorphism,
(NodeA, NodeB): DeepJoinLatticeBimorphism<Storage>,
Storage: TupleSet<Schema = var_type!(...NodeA::Schema, ...NodeB::ValType)>,
{
type DeepJoinLatticeBimorphism = GhtNodeKeyedBimorphism<
<(NodeA, NodeB) as DeepJoinLatticeBimorphism>::DeepJoinLatticeBimorphism,
<(NodeA, NodeB) as DeepJoinLatticeBimorphism<Storage>>::DeepJoinLatticeBimorphism,
>;
}
impl<SchemaA, SuffixSchemaA, StorageA, SchemaB, SuffixSchemaB, StorageB> DeepJoinLatticeBimorphism
impl<SchemaA, ValTypeA, StorageA, SchemaB, ValTypeB, StorageB, StorageOut>
DeepJoinLatticeBimorphism<StorageOut>
for (
GhtLeaf<SchemaA, SuffixSchemaA, StorageA>,
GhtLeaf<SchemaB, SuffixSchemaB, StorageB>,
GhtLeaf<SchemaA, ValTypeA, StorageA>,
GhtLeaf<SchemaB, ValTypeB, StorageB>,
)
where
SchemaA: 'static
+ VariadicExt<Extend<SuffixSchemaB> = SchemaA>
+ Eq
+ Hash
+ SplitBySuffix<SuffixSchemaA>, // + AsRefVariadicPartialEq
SuffixSchemaA: 'static + VariadicExt + Eq + Hash, // + AsRefVariadicPartialEq
SchemaB: 'static + VariadicExt + Eq + Hash + SplitBySuffix<SuffixSchemaB>, /* + AsRefVariadicPartialEq */
SuffixSchemaB: 'static + VariadicExt + Eq + Hash, // + AsRefVariadicPartialEq
SchemaA: 'static + VariadicExt + Eq + Hash + SplitBySuffix<ValTypeA>, /* + AsRefVariadicPartialEq */
ValTypeA: 'static + VariadicExt + Eq + Hash, // + AsRefVariadicPartialEq
SchemaB: 'static + VariadicExt + Eq + Hash + SplitBySuffix<ValTypeB>, /* + AsRefVariadicPartialEq */
ValTypeB: 'static + VariadicExt + Eq + Hash, // + AsRefVariadicPartialEq
StorageA: TupleSet<Schema = SchemaA>,
StorageB: TupleSet<Schema = SchemaB>,
StorageOut: TupleSet<Schema = var_type!(...SchemaA, ...ValTypeB)>,
for<'x> SchemaA::AsRefVar<'x>: CloneVariadic,
for<'x> SchemaB::AsRefVar<'x>: CloneVariadic,
var_type!(...SchemaA, ...SuffixSchemaB): Eq + Hash,
var_type!(...SchemaA, ...ValTypeB): Eq + Hash,
{
type DeepJoinLatticeBimorphism = GhtValTypeProductBimorphism<
GhtLeaf<
var_type!(...SchemaA, ...SuffixSchemaB),
var_type!(...SuffixSchemaA, ...SuffixSchemaB),
StorageA,
var_type!(...SchemaA, ...ValTypeB),
var_type!(...ValTypeA, ...ValTypeB),
StorageOut,
>,
>;
}
Loading

0 comments on commit ca08cc2

Please sign in to comment.