Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
multithread working
Browse files Browse the repository at this point in the history
  • Loading branch information
yashkothari42 committed Apr 3, 2024
1 parent 68a0f63 commit 0ac48ed
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 157 deletions.
Binary file modified .DS_Store
Binary file not shown.
9 changes: 4 additions & 5 deletions vayu-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ pub enum SchedulerSinkType {
PrintOutput,
}

pub struct DatafusionPipelineWithSource {
pub source: Arc<dyn ExecutionPlan>,
pub struct DatafusionPipeline {
pub plan: Arc<dyn ExecutionPlan>,
pub sink: Option<SchedulerSinkType>,
pub id: i32,
}

pub struct DatafusionPipeline {
pub struct DatafusionPipelineWithSource {
pub source: Arc<dyn ExecutionPlan>,
pub plan: Arc<dyn ExecutionPlan>,
pub sink: Option<SchedulerSinkType>,
}
Expand All @@ -45,7 +45,6 @@ pub struct DatafusionPipelineWithData {
pub pipeline: DatafusionPipeline,
pub data: RecordBatch,
}

pub struct VayuPipeline {
pub operators: Vec<Box<dyn IntermediateOperator>>,
pub sink: Option<SchedulerSinkType>,
Expand Down
36 changes: 1 addition & 35 deletions vayu-common/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
use arrow::array::RecordBatch;
use crossbeam_skiplist::SkipMap;
use crossbeam_utils::thread::scope;
use datafusion::physical_plan::joins::hash_join::JoinLeftData;

use core::panic;
use datafusion::physical_plan::joins::hash_join::JoinLeftData;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
pub enum Blob {
// RecordBatchBlob(Vec<RecordBatch>),
HashMapBlob(JoinLeftData),
Expand Down Expand Up @@ -66,31 +60,3 @@ impl Store {
// Some(x)
}
}
pub struct Store1 {
store: SkipMap<i32, i32>,
}
impl Store1 {
pub fn new() -> Self {
Store1 {
store: SkipMap::new(),
}
}
pub fn insert(&mut self, key: i32, value: i32) {
self.store.insert(key, value);
}
// pub fn append(&mut self, key: i32, value: Vec<RecordBatch>) {
// let blob = self.remove(key);
// let mut blob = match blob {
// Some(r) => r,
// None => Blob::RecordBatchBlob(Vec::new()),
// };
// blob.append_records(value);
// self.store.insert(key, blob);
// }
// pub fn remove(&mut self, key: i32) -> &Option<Arc<Blob>> {
// let value = self.store.remove(&key).unwrap().value();
// Some(value.un)
// // let x = self.store.remove(&key).unwrap().value();
// // Some(x)
// }
}
28 changes: 16 additions & 12 deletions vayu/src/df2vayu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use crate::operators::filter::FilterOperator;
use crate::operators::join::HashProbeOperator;
use crate::operators::projection::ProjectionOperator;
use crate::Store;
use ahash::random_state::RandomSource;
use ahash::RandomState;
use arrow::array::BooleanBufferBuilder;
use datafusion::datasource::physical_plan::CsvExec;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
Expand All @@ -17,7 +19,7 @@ use datafusion::prelude::SessionContext;
use std::sync::Arc;
use vayu_common::VayuPipeline;

pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store) -> VayuPipeline {
pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store, pipeline_id: i32) -> VayuPipeline {
let p = plan.as_any();
let batch_size = 1024;
let config = SessionConfig::new().with_batch_size(batch_size);
Expand All @@ -33,14 +35,14 @@ pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store) -> VayuPipeline
};
}
if let Some(exec) = p.downcast_ref::<FilterExec>() {
let mut pipeline = df2vayu(exec.input().clone(), store);
let mut pipeline = df2vayu(exec.input().clone(), store, pipeline_id);
let tt = Box::new(FilterOperator::new(exec.predicate().clone()));
println!("adding filter");
pipeline.operators.push(tt);
return pipeline;
}
if let Some(exec) = p.downcast_ref::<ProjectionExec>() {
let mut pipeline = df2vayu(exec.input().clone(), store);
let mut pipeline = df2vayu(exec.input().clone(), store, pipeline_id);
println!("adding projection");
let expr = exec.expr().iter().map(|x| x.0.clone()).collect();
let schema = exec.schema().clone();
Expand All @@ -53,29 +55,30 @@ pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store) -> VayuPipeline
// build side wont have hashjoinexec in make_pipeline call

// let dummy = exec.left().execute(0, context.clone());
let mut pipeline = df2vayu(exec.right().clone(), store);
let mut pipeline = df2vayu(exec.right().clone(), store, pipeline_id);
println!("adding hashprobe");

let mut hashjoinstream = exec.get_hash_join_stream(0, context).unwrap();
println!("got joinstream");

// using uuid but this value would be present in HashProbeExec itself
let build_map = store.remove(1).unwrap();
// TODO: remove from the correct key
let build_map = store.remove(42).unwrap();
let left_data = Arc::new(build_map.get_map());
let visited_left_side = BooleanBufferBuilder::new(0);
hashjoinstream.build_side = BuildSide::Ready(BuildSideReadyState {
left_data,
visited_left_side,
});
let tt = Box::new(HashProbeOperator::new(1, hashjoinstream));
let tt = Box::new(HashProbeOperator::new(hashjoinstream));
pipeline.operators.push(tt);
return pipeline;
}
if let Some(exec) = p.downcast_ref::<RepartitionExec>() {
return df2vayu(exec.input().clone(), store);
return df2vayu(exec.input().clone(), store, pipeline_id);
}
if let Some(exec) = p.downcast_ref::<CoalesceBatchesExec>() {
return df2vayu(exec.input().clone(), store);
return df2vayu(exec.input().clone(), store, pipeline_id);
}
panic!("should never reach the end");
}
Expand All @@ -86,6 +89,7 @@ pub fn df2vayu(plan: Arc<dyn ExecutionPlan>, store: &mut Store) -> VayuPipeline
*/
pub fn get_hash_build_pipeline(
plan: Arc<dyn ExecutionPlan>,
uuid: i32,
) -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>) {
let plan1 = plan.clone();
let p = plan.as_any();
Expand All @@ -97,16 +101,16 @@ pub fn get_hash_build_pipeline(
panic!("should never reach csvexec in get_hash_build_pipeline ");
}
if let Some(exec) = p.downcast_ref::<FilterExec>() {
return get_hash_build_pipeline(exec.input().clone());
return get_hash_build_pipeline(exec.input().clone(), uuid);
}
if let Some(exec) = p.downcast_ref::<ProjectionExec>() {
return get_hash_build_pipeline(exec.input().clone());
return get_hash_build_pipeline(exec.input().clone(), uuid);
}
if let Some(exec) = p.downcast_ref::<RepartitionExec>() {
return get_hash_build_pipeline(exec.input().clone());
return get_hash_build_pipeline(exec.input().clone(), uuid);
}
if let Some(exec) = p.downcast_ref::<CoalesceBatchesExec>() {
return get_hash_build_pipeline(exec.input().clone());
return get_hash_build_pipeline(exec.input().clone(), uuid);
}
panic!("No join node found");
}
Expand Down
4 changes: 2 additions & 2 deletions vayu/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use arrow::util::pretty;
use vayu_common::DatafusionPipelineWithData;
use vayu_common::VayuPipeline;
pub mod operators;
pub mod pipeline;
use std::sync::{Arc, Mutex};

pub mod sinks;
Expand Down Expand Up @@ -55,7 +54,8 @@ impl VayuExecutionEngine {
let sink = pipeline.pipeline.sink;

let mut store = self.global_store.lock().unwrap();
let mut pipeline: VayuPipeline = df2vayu::df2vayu(pipeline.pipeline.plan, &mut store);
let mut pipeline: VayuPipeline =
df2vayu::df2vayu(pipeline.pipeline.plan, &mut store, pipeline.pipeline.id);
drop(store);

pipeline.sink = sink;
Expand Down
11 changes: 5 additions & 6 deletions vayu/src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@ use vayu_common::{IntermediateOperator, PhysicalOperator};

pub struct HashProbeOperator {
probe: HashJoinStream,
build_map: Option<JoinLeftData>,
}
impl HashProbeOperator {
pub fn new(build_uuid: i32, probe: HashJoinStream) -> Self {
Self {
probe,
build_map: None,
}
pub fn new(probe: HashJoinStream) -> Self {
Self { probe }
}
}

Expand All @@ -33,6 +29,9 @@ impl IntermediateOperator for HashProbeOperator {
&random_state,
)?;
probe.hashes_buffer = hashes_buffer.clone();

// build side has already been added in df2vayu
// TODO: move that code in here
probe.state = HashJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState {
batch: input.clone(),
offset: (0, None),
Expand Down
1 change: 0 additions & 1 deletion vayu/src/pipeline.rs

This file was deleted.

46 changes: 24 additions & 22 deletions vayuDB/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,38 @@ This crates has basic implementation of a database using vayu execution engine.

## Common Vayu Structures
```
pub enum SchedulerSinkType {
PrintOutput,
}
pub struct VayuPipeline {
pub operators: Vec<Box<dyn IntermediateOperator>>,
pub sink: Option<SchedulerSinkType>,
}
pub struct Pipeline {
pub source: Option<SendableRecordBatchStream>,
pub vayu_pipeline: VayuPipeline,
}
pub struct Task {
pub pipelines: Vec<Pipeline>,
pub struct DatafusionPipelineWithSource {
pub source: Arc<dyn ExecutionPlan>,
pub plan: Arc<dyn ExecutionPlan>,
pub sink: SchedulerSinkType,
}
```


## How to add a custom scheduler/io_engine ?
1. Scheduler
```
pub fn new() -> Self;
pub fn get_task(&mut self) -> Poll<vayu_common::Task>;
pub fn new() -> Self
pub fn get_pipeline(&mut self) -> Poll<vayu_common::DatafusionPipelineWithSource>
pub fn ack_pipeline(&mut self, pipeline_id: i32);
```

2. IO
IO takes an SendableRecordBatchStream as an input and returns RecordBatch.
Note: for now each source operator has to return data in one Record batch only. If polling again returns Some(RecordBatch), database would panic.
IO takes an ExecutionPlan as an input and returns RecordBatch.
Note: for now each source operator has to return data in one Record batch only.

```
pub fn new() -> Self;
pub fn submit_request(&mut self, stream: SendableRecordBatchStream) -> i32;
pub fn poll_response(&mut self) -> Poll<(i32, RecordBatch)>;
```
pub fn new() -> Self
pub fn submit_request(&mut self, source: Arc<dyn ExecutionPlan>) -> i32
pub fn poll_response(&mut self) -> Poll<(i32, RecordBatch)>
```

rest would be done by vayu execution engine

TODO:
[] make each worker and main thread run on separate CPU
[] script which checks if only those processes are running on assigned cores
[] script to check context switch and system calls from each threads (perf)
[] support to handler multiple pipeline breakers in one task for eg multiple joins
[] benchmarking
[] tests
17 changes: 10 additions & 7 deletions vayuDB/src/dummy_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use core::panic;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::CsvReadOptions;
use datafusion::prelude::SessionContext;
use std::sync::Arc;
use vayu::df2vayu;
use vayu::operators::join;
use vayu_common::DatafusionPipelineWithSource;
use vayu_common::Task;

Expand Down Expand Up @@ -49,27 +52,27 @@ pub async fn test_hash_join() -> Result<Task> {
CsvReadOptions::new(),
)
.await?;
// get executor

let uuid = 1;
// get execution plan from th sql query
let sql = "SELECT * FROM a,b WHERE a.a1 = b.b1 ";
let plan = get_execution_plan_from_sql(&ctx, sql).await?;
let mut task = Task::new();

let (join_node, build_plan) = df2vayu::get_hash_build_pipeline(plan.clone());
let uuid = 42;
let (join_node, build_plan) = df2vayu::get_hash_build_pipeline(plan.clone(), uuid);

let build_source_pipeline = df2vayu::get_source_node(build_plan.clone());
let sink = vayu_common::SchedulerSinkType::BuildAndStoreHashMap(uuid, join_node);
let build_pipeline = DatafusionPipelineWithSource {
source: build_source_pipeline,
plan: build_plan,
sink: Some(vayu_common::SchedulerSinkType::BuildAndStoreHashMap(
1, join_node,
)),
sink: Some(sink),
};
task.add_pipeline(build_pipeline);

// TODO: set this uuid in probe also
let probe_plan = plan.clone();
let probe_source_node = df2vayu::get_source_node(probe_plan.clone());

let probe_pipeline = DatafusionPipelineWithSource {
source: probe_source_node,
plan: probe_plan,
Expand Down
7 changes: 5 additions & 2 deletions vayuDB/src/io_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@ impl IOService {
uuid: 0,
}
}
pub fn submit_request(&mut self, plan: Arc<dyn ExecutionPlan>) -> i32 {
pub fn submit_request(&mut self, source: Arc<dyn ExecutionPlan>) -> i32 {
let context = SessionContext::new().task_ctx();
let stream = plan.execute(0, context).unwrap();
let stream = source.execute(0, context).unwrap();
self.stream = Some(stream);
self.uuid = 1;
self.uuid
}
pub fn poll_response(&mut self) -> Poll<(i32, RecordBatch)> {
if self.stream.is_none() {
return Poll::Pending;
}
let stream = self.stream.take();

let data = futures::executor::block_on(stream.unwrap().next())
Expand Down
Loading

0 comments on commit 0ac48ed

Please sign in to comment.