Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Multi thread with hash aggregates #6

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions vayu-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use arrow::record_batch::RecordBatch;
use datafusion::common::Result;
use datafusion::physical_plan::aggregates::AggregateMode;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use std::sync::Arc;
pub mod store;
// use vayu::operators::aggregate::AggregateOperator;

pub trait PhysicalOperator {
fn name(&self) -> String;
Expand All @@ -24,21 +26,30 @@ pub enum SchedulerSourceType {
RecordBatchStore(i32),
}

#[derive(Clone)]
pub enum SchedulerSinkType {
// StoreRecordBatch(i32),
BuildAndStoreHashMap(i32, Arc<dyn ExecutionPlan>),
StoreRecordBatch(i32),
// FinalAggregation(i32, AggregateOperator),
PrintOutput,
}

#[derive(Clone)]
pub enum FinalizeSinkType {
PrintFromStore(i32),
FinalAggregate(Arc<dyn ExecutionPlan>, i32),
BuildAndStoreHashMap(i32, Arc<dyn ExecutionPlan>),
}
#[derive(Clone)]
pub struct DatafusionPipeline {
pub plan: Arc<dyn ExecutionPlan>,
pub sink: Option<SchedulerSinkType>,
pub id: i32,
}
pub struct DatafusionPipelineWithSource {
pub source: Arc<dyn ExecutionPlan>,
pub plan: Arc<dyn ExecutionPlan>,
pub sink: Option<SchedulerSinkType>,
#[derive(Clone)]
pub struct SchedulerPipeline {
pub source: Option<Arc<dyn ExecutionPlan>>,
pub pipeline: DatafusionPipeline,
pub finalize: FinalizeSinkType,
}

pub struct DatafusionPipelineWithData {
Expand All @@ -55,13 +66,18 @@ pub struct VayuPipelineWithData {
pub data: RecordBatch,
}
pub struct Task {
pub pipelines: Vec<DatafusionPipelineWithSource>,
pub pipelines: Vec<SchedulerPipeline>,
}

pub enum VayuMessage {
Normal(DatafusionPipelineWithData),
Finalize((FinalizeSinkType, i32)),
}
impl Task {
pub fn new() -> Self {
Task { pipelines: vec![] }
}
pub fn add_pipeline(&mut self, pipeline: DatafusionPipelineWithSource) {
pub fn add_pipeline(&mut self, pipeline: SchedulerPipeline) {
self.pipelines.push(pipeline);
}
}
95 changes: 54 additions & 41 deletions vayu-common/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,62 +1,75 @@
use arrow::record_batch::RecordBatch;
use core::panic;
use datafusion::physical_plan::joins::hash_join::JoinLeftData;
use std::collections::HashMap;
#[derive(Clone)]
pub enum Blob {
// RecordBatchBlob(Vec<RecordBatch>),
RecordBatchBlob(RecordBatch),
HashMapBlob(JoinLeftData),
}

impl Blob {
pub fn get_map(self) -> JoinLeftData {
match self {
Blob::HashMapBlob(m) => m,
_ => panic!("error"),
}
}
// pub fn get_records(self) -> Vec<RecordBatch> {
// match self {
// Blob::RecordBatchBlob(records) => records,
// _ => panic!("error"),
// }
// }
// pub fn append_records(&mut self, batches: Vec<RecordBatch>) {
// match self {
// Blob::RecordBatchBlob(records) => {
// // TODO: check if schema is same
// records.extend(batches)
// }
// _ => panic!("error"),
// }
// }
}
// impl Blob {
// // pub fn get_map(self) -> JoinLeftData {
// // match self {
// // Blob::HashMapBlob(m) => m,
// // _ => panic!("error"),
// // }
// // }
// pub fn get_records(self) -> Vec<RecordBatch> {
// match self {
// Blob::RecordBatchBlob(records) => records,
// _ => panic!("error"),
// }
// }
// pub fn append_records(&mut self, batches: Vec<RecordBatch>) {
// match self {
// Blob::RecordBatchBlob(records) => {
// // TODO: check if schema is same
// records.extend(batches)
// }
// _ => panic!("error"),
// }
// }
// }

// right now this is typedef of HashMap<i32, Blob>,
// but we may need something else in near future
// store store a vector of blobs
// each blob would be output of one of the threads
// finalize step would remove the vec of blob and combine then store the result again

#[derive(Clone)]
pub struct Store {
store: HashMap<i32, Blob>,
pub store: HashMap<i32, Vec<Blob>>,
}
impl Store {
pub fn new() -> Store {
Store {
store: HashMap::new(),
}
}
pub fn insert(&mut self, key: i32, value: Blob) {
self.store.insert(key, value);
pub fn insert(&mut self, key: i32, mut value: Blob) {
let blob = self.store.get_mut(&key);
let mut blob = match blob {
Some(r) => r,
None => {
self.store.insert(key, vec![]);
self.store.get_mut(&key).unwrap()
}
};
blob.push(value);
}
// pub fn append(&mut self, key: i32, value: Vec<RecordBatch>) {
// let blob = self.remove(key);
// let mut blob = match blob {
// Some(r) => r,
// None => Blob::RecordBatchBlob(Vec::new()),
// };
// blob.append_records(value);
// self.store.insert(key, blob);
// }
pub fn remove(&mut self, key: i32) -> Option<Blob> {
pub fn append(&mut self, key: i32, mut value: Vec<Blob>) {
let blob = self.remove(key);
let mut blob = match blob {
Some(r) => r,
None => vec![],
};
blob.append(&mut value);
self.store.insert(key, blob);
}
pub fn remove(&mut self, key: i32) -> Option<Vec<Blob>> {
self.store.remove(&key)
// let x = self.store.remove(&key).unwrap().value();
// Some(x)
}
pub fn get(&mut self, key: i32) -> Option<&Vec<Blob>> {
self.store.get(&key)
}
}
96 changes: 89 additions & 7 deletions vayu/src/df2vayu.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
use crate::dummy;
use crate::operators::aggregate::AggregateOperator;
use crate::operators::filter::FilterOperator;
use crate::operators::join::HashProbeOperator;
use crate::operators::projection::ProjectionOperator;
use crate::Store;
use ahash::random_state::RandomSource;
use ahash::RandomState;
use arrow::array::BooleanBufferBuilder;
use arrow::compute::kernels::concat_elements;
use datafusion::datasource::physical_plan::CsvExec;
use datafusion::datasource::physical_plan::ParquetExec;
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::aggregates::AggregateMode;
use datafusion::physical_plan::aggregates::StreamType;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::hash_join::BuildSide;
use datafusion::physical_plan::joins::hash_join::BuildSideReadyState;
Expand All @@ -20,7 +28,9 @@ use std::sync::Arc;
use vayu_common::VayuPipeline;

pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store, pipeline_id: i32) -> VayuPipeline {
let plan2 = plan.clone();
let p = plan.as_any();

let batch_size = 1024;
let config = SessionConfig::new().with_batch_size(batch_size);
let ctx = Arc::new(SessionContext::new_with_config(config));
Expand All @@ -34,6 +44,24 @@ pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store, pipeline_id: i32
sink: None,
};
}
if let Some(_) = p.downcast_ref::<ParquetExec>() {
return VayuPipeline {
operators: vec![],
sink: None,
};
}
if let Some(exec) = p.downcast_ref::<AggregateExec>() {
let mut pipeline = df2vayu(exec.input().clone(), store, pipeline_id);
// check if no group by present
if !exec.group_by().expr().is_empty() {
panic!("group by present- not handled");
}

let tt = Box::new(AggregateOperator::new(exec));
println!("adding aggregate");
pipeline.operators.push(tt);
return pipeline;
}
if let Some(exec) = p.downcast_ref::<FilterExec>() {
let mut pipeline = df2vayu(exec.input().clone(), store, pipeline_id);
let tt = Box::new(FilterOperator::new(exec.predicate().clone()));
Expand All @@ -51,25 +79,53 @@ pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store, pipeline_id: i32
return pipeline;
}
if let Some(exec) = p.downcast_ref::<HashJoinExec>() {
let mut exec2 = exec.clone();
// this function will only be called for probe side
// build side wont have hashjoinexec in make_pipeline call

// let dummy = exec.left().execute(0, context.clone());
let mut pipeline = df2vayu(exec.right().clone(), store, pipeline_id);
println!("adding hashprobe");

let mut hashjoinstream = exec.get_hash_join_stream(0, context).unwrap();
println!("got joinstream");

let tt = dummy::DummyExec::new(
exec.properties().clone(),
exec.statistics().unwrap(),
exec.left().schema(),
);
let tt2 = dummy::DummyExec::new(
exec.properties().clone(),
exec.statistics().unwrap(),
exec.right().schema(),
);
let x = plan2
.with_new_children(vec![Arc::new(tt), Arc::new(tt2)])
.unwrap();
let x1 = x.as_any();
let exec = if let Some(exec) = x1.downcast_ref::<HashJoinExec>() {
exec
} else {
panic!("wrongg");
};
// using uuid but this value would be present in HashProbeExec itself
// TODO: remove from the correct key
let build_map = store.remove(42).unwrap();
let left_data = Arc::new(build_map.get_map());
println!("{:?}", store.store.keys());
let mut build_map = store.remove(42).unwrap();
let mut cmap = build_map.clone();
store.append(42, cmap);
let map = build_map.remove(0);
let build_map = match map {
vayu_common::store::Blob::HashMapBlob(map) => map,
_ => panic!("what nooo"),
};
let c = build_map.clone();
let left_data = Arc::new(build_map);
let visited_left_side = BooleanBufferBuilder::new(0);

let mut hashjoinstream = exec.get_hash_join_stream(0, context).unwrap();
hashjoinstream.build_side = BuildSide::Ready(BuildSideReadyState {
left_data,
visited_left_side,
});
println!("got joinstream");
let tt = Box::new(HashProbeOperator::new(hashjoinstream));
pipeline.operators.push(tt);
return pipeline;
Expand All @@ -80,6 +136,9 @@ pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store, pipeline_id: i32
if let Some(exec) = p.downcast_ref::<CoalesceBatchesExec>() {
return df2vayu(exec.input().clone(), store, pipeline_id);
}
if let Some(exec) = p.downcast_ref::<CoalescePartitionsExec>() {
return df2vayu(exec.input().clone(), store, pipeline_id);
}
panic!("should never reach the end");
}

Expand Down Expand Up @@ -128,6 +187,15 @@ pub fn get_source_node(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
if let Some(_) = p.downcast_ref::<CsvExec>() {
return plan;
}
if let Some(_) = p.downcast_ref::<ParquetExec>() {
return plan;
}
if let Some(exec) = p.downcast_ref::<AggregateExec>() {
return get_source_node(exec.input().clone());
}
if let Some(exec) = p.downcast_ref::<CoalescePartitionsExec>() {
return get_source_node(exec.input().clone());
}
if let Some(exec) = p.downcast_ref::<FilterExec>() {
return get_source_node(exec.input().clone());
}
Expand All @@ -140,5 +208,19 @@ pub fn get_source_node(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
if let Some(exec) = p.downcast_ref::<CoalesceBatchesExec>() {
return get_source_node(exec.input().clone());
}
panic!("No join node found");
panic!("No source node found");
}

pub fn aggregate(exec: Arc<dyn ExecutionPlan>) -> AggregateOperator {
let p = exec.as_any();
let final_aggregate = if let Some(exec) = p.downcast_ref::<AggregateExec>() {
if !exec.group_by().expr().is_empty() {
panic!("group by present- not handled");
}
let tt = AggregateOperator::new(exec);
tt
} else {
panic!("not an aggregate");
};
final_aggregate
}
Loading
Loading