diff --git a/.DS_Store b/.DS_Store index d8f70d2..5757afe 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/vayu-common/src/lib.rs b/vayu-common/src/lib.rs index 25aba60..d7370c8 100644 --- a/vayu-common/src/lib.rs +++ b/vayu-common/src/lib.rs @@ -30,13 +30,13 @@ pub enum SchedulerSinkType { PrintOutput, } -pub struct DatafusionPipelineWithSource { - pub source: Arc, +pub struct DatafusionPipeline { pub plan: Arc, pub sink: Option, + pub id: i32, } - -pub struct DatafusionPipeline { +pub struct DatafusionPipelineWithSource { + pub source: Arc, pub plan: Arc, pub sink: Option, } @@ -45,7 +45,6 @@ pub struct DatafusionPipelineWithData { pub pipeline: DatafusionPipeline, pub data: RecordBatch, } - pub struct VayuPipeline { pub operators: Vec>, pub sink: Option, diff --git a/vayu-common/src/store.rs b/vayu-common/src/store.rs index f65917e..ff3b7a0 100644 --- a/vayu-common/src/store.rs +++ b/vayu-common/src/store.rs @@ -1,12 +1,6 @@ -use arrow::array::RecordBatch; -use crossbeam_skiplist::SkipMap; -use crossbeam_utils::thread::scope; -use datafusion::physical_plan::joins::hash_join::JoinLeftData; - use core::panic; +use datafusion::physical_plan::joins::hash_join::JoinLeftData; use std::collections::HashMap; -use std::hash::Hash; -use std::sync::Arc; pub enum Blob { // RecordBatchBlob(Vec), HashMapBlob(JoinLeftData), @@ -66,31 +60,3 @@ impl Store { // Some(x) } } -pub struct Store1 { - store: SkipMap, -} -impl Store1 { - pub fn new() -> Self { - Store1 { - store: SkipMap::new(), - } - } - pub fn insert(&mut self, key: i32, value: i32) { - self.store.insert(key, value); - } - // pub fn append(&mut self, key: i32, value: Vec) { - // let blob = self.remove(key); - // let mut blob = match blob { - // Some(r) => r, - // None => Blob::RecordBatchBlob(Vec::new()), - // }; - // blob.append_records(value); - // self.store.insert(key, blob); - // } - // pub fn remove(&mut self, key: i32) -> &Option> { - // let value = self.store.remove(&key).unwrap().value(); - // Some(value.un) - // // let x = self.store.remove(&key).unwrap().value(); - // // Some(x) - // } -} diff --git a/vayu/src/df2vayu.rs b/vayu/src/df2vayu.rs index 6270dce..f80613a 100644 --- a/vayu/src/df2vayu.rs +++ b/vayu/src/df2vayu.rs @@ -2,6 +2,8 @@ use crate::operators::filter::FilterOperator; use crate::operators::join::HashProbeOperator; use crate::operators::projection::ProjectionOperator; use crate::Store; +use ahash::random_state::RandomSource; +use ahash::RandomState; use arrow::array::BooleanBufferBuilder; use datafusion::datasource::physical_plan::CsvExec; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; @@ -17,7 +19,7 @@ use datafusion::prelude::SessionContext; use std::sync::Arc; use vayu_common::VayuPipeline; -pub fn df2vayu(plan: Arc, store: &mut Store) -> VayuPipeline { +pub fn df2vayu(plan: Arc, store: &mut Store, pipeline_id: i32) -> VayuPipeline { let p = plan.as_any(); let batch_size = 1024; let config = SessionConfig::new().with_batch_size(batch_size); @@ -33,14 +35,14 @@ pub fn df2vayu(plan: Arc, store: &mut Store) -> VayuPipeline }; } if let Some(exec) = p.downcast_ref::() { - let mut pipeline = df2vayu(exec.input().clone(), store); + let mut pipeline = df2vayu(exec.input().clone(), store, pipeline_id); let tt = Box::new(FilterOperator::new(exec.predicate().clone())); println!("adding filter"); pipeline.operators.push(tt); return pipeline; } if let Some(exec) = p.downcast_ref::() { - let mut pipeline = df2vayu(exec.input().clone(), store); + let mut pipeline = df2vayu(exec.input().clone(), store, pipeline_id); println!("adding projection"); let expr = exec.expr().iter().map(|x| x.0.clone()).collect(); let schema = exec.schema().clone(); @@ -53,29 +55,30 @@ pub fn df2vayu(plan: Arc, store: &mut Store) -> VayuPipeline // build side wont have hashjoinexec in make_pipeline call // let dummy = exec.left().execute(0, context.clone()); - let mut pipeline = df2vayu(exec.right().clone(), store); + let mut pipeline = df2vayu(exec.right().clone(), store, pipeline_id); println!("adding hashprobe"); let mut hashjoinstream = exec.get_hash_join_stream(0, context).unwrap(); println!("got joinstream"); // using uuid but this value would be present in HashProbeExec itself - let build_map = store.remove(1).unwrap(); + // TODO: remove from the correct key + let build_map = store.remove(42).unwrap(); let left_data = Arc::new(build_map.get_map()); let visited_left_side = BooleanBufferBuilder::new(0); hashjoinstream.build_side = BuildSide::Ready(BuildSideReadyState { left_data, visited_left_side, }); - let tt = Box::new(HashProbeOperator::new(1, hashjoinstream)); + let tt = Box::new(HashProbeOperator::new(hashjoinstream)); pipeline.operators.push(tt); return pipeline; } if let Some(exec) = p.downcast_ref::() { - return df2vayu(exec.input().clone(), store); + return df2vayu(exec.input().clone(), store, pipeline_id); } if let Some(exec) = p.downcast_ref::() { - return df2vayu(exec.input().clone(), store); + return df2vayu(exec.input().clone(), store, pipeline_id); } panic!("should never reach the end"); } @@ -86,6 +89,7 @@ pub fn df2vayu(plan: Arc, store: &mut Store) -> VayuPipeline */ pub fn get_hash_build_pipeline( plan: Arc, + uuid: i32, ) -> (Arc, Arc) { let plan1 = plan.clone(); let p = plan.as_any(); @@ -97,16 +101,16 @@ pub fn get_hash_build_pipeline( panic!("should never reach csvexec in get_hash_build_pipeline "); } if let Some(exec) = p.downcast_ref::() { - return get_hash_build_pipeline(exec.input().clone()); + return get_hash_build_pipeline(exec.input().clone(), uuid); } if let Some(exec) = p.downcast_ref::() { - return get_hash_build_pipeline(exec.input().clone()); + return get_hash_build_pipeline(exec.input().clone(), uuid); } if let Some(exec) = p.downcast_ref::() { - return get_hash_build_pipeline(exec.input().clone()); + return get_hash_build_pipeline(exec.input().clone(), uuid); } if let Some(exec) = p.downcast_ref::() { - return get_hash_build_pipeline(exec.input().clone()); + return get_hash_build_pipeline(exec.input().clone(), uuid); } panic!("No join node found"); } diff --git a/vayu/src/lib.rs b/vayu/src/lib.rs index 6eae71c..0fa494d 100644 --- a/vayu/src/lib.rs +++ b/vayu/src/lib.rs @@ -3,7 +3,6 @@ use arrow::util::pretty; use vayu_common::DatafusionPipelineWithData; use vayu_common::VayuPipeline; pub mod operators; -pub mod pipeline; use std::sync::{Arc, Mutex}; pub mod sinks; @@ -55,7 +54,8 @@ impl VayuExecutionEngine { let sink = pipeline.pipeline.sink; let mut store = self.global_store.lock().unwrap(); - let mut pipeline: VayuPipeline = df2vayu::df2vayu(pipeline.pipeline.plan, &mut store); + let mut pipeline: VayuPipeline = + df2vayu::df2vayu(pipeline.pipeline.plan, &mut store, pipeline.pipeline.id); drop(store); pipeline.sink = sink; diff --git a/vayu/src/operators/join.rs b/vayu/src/operators/join.rs index 559aeae..8e55c3d 100644 --- a/vayu/src/operators/join.rs +++ b/vayu/src/operators/join.rs @@ -8,14 +8,10 @@ use vayu_common::{IntermediateOperator, PhysicalOperator}; pub struct HashProbeOperator { probe: HashJoinStream, - build_map: Option, } impl HashProbeOperator { - pub fn new(build_uuid: i32, probe: HashJoinStream) -> Self { - Self { - probe, - build_map: None, - } + pub fn new(probe: HashJoinStream) -> Self { + Self { probe } } } @@ -33,6 +29,9 @@ impl IntermediateOperator for HashProbeOperator { &random_state, )?; probe.hashes_buffer = hashes_buffer.clone(); + + // build side has already been added in df2vayu + // TODO: move that code in here probe.state = HashJoinStreamState::ProcessProbeBatch(ProcessProbeBatchState { batch: input.clone(), offset: (0, None), diff --git a/vayu/src/pipeline.rs b/vayu/src/pipeline.rs deleted file mode 100644 index 8b13789..0000000 --- a/vayu/src/pipeline.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/vayuDB/README.md b/vayuDB/README.md index 9e10f34..190d255 100644 --- a/vayuDB/README.md +++ b/vayuDB/README.md @@ -6,20 +6,10 @@ This crates has basic implementation of a database using vayu execution engine. ## Common Vayu Structures ``` -pub enum SchedulerSinkType { - PrintOutput, -} -pub struct VayuPipeline { - pub operators: Vec>, - pub sink: Option, -} -pub struct Pipeline { - pub source: Option, - pub vayu_pipeline: VayuPipeline, -} - -pub struct Task { - pub pipelines: Vec, +pub struct DatafusionPipelineWithSource { + pub source: Arc, + pub plan: Arc, + pub sink: SchedulerSinkType, } ``` @@ -27,15 +17,27 @@ pub struct Task { ## How to add a custom scheduler/io_engine ? 1. Scheduler ``` -pub fn new() -> Self; -pub fn get_task(&mut self) -> Poll; +pub fn new() -> Self +pub fn get_pipeline(&mut self) -> Poll +pub fn ack_pipeline(&mut self, pipeline_id: i32); ``` + 2. IO - IO takes an SendableRecordBatchStream as an input and returns RecordBatch. - Note: for now each source operator has to return data in one Record batch only. If polling again returns Some(RecordBatch), database would panic. + IO takes an ExecutionPlan as an input and returns RecordBatch. + Note: for now each source operator has to return data in one Record batch only. ``` -pub fn new() -> Self; -pub fn submit_request(&mut self, stream: SendableRecordBatchStream) -> i32; -pub fn poll_response(&mut self) -> Poll<(i32, RecordBatch)>; -``` \ No newline at end of file +pub fn new() -> Self +pub fn submit_request(&mut self, source: Arc) -> i32 +pub fn poll_response(&mut self) -> Poll<(i32, RecordBatch)> +``` + +rest would be done by vayu execution engine + +TODO: +[] make each worker and main thread run on separate CPU +[] script which checks if only those processes are running on assigned cores +[] script to check context switch and system calls from each threads (perf) +[] support to handler multiple pipeline breakers in one task for eg multiple joins +[] benchmarking +[] tests \ No newline at end of file diff --git a/vayuDB/src/dummy_tasks.rs b/vayuDB/src/dummy_tasks.rs index afcccaf..cb51013 100644 --- a/vayuDB/src/dummy_tasks.rs +++ b/vayuDB/src/dummy_tasks.rs @@ -1,10 +1,13 @@ +use core::panic; use datafusion::error::Result; use datafusion::execution::context::SessionState; +use datafusion::physical_plan::joins::HashJoinExec; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::CsvReadOptions; use datafusion::prelude::SessionContext; use std::sync::Arc; use vayu::df2vayu; +use vayu::operators::join; use vayu_common::DatafusionPipelineWithSource; use vayu_common::Task; @@ -49,27 +52,27 @@ pub async fn test_hash_join() -> Result { CsvReadOptions::new(), ) .await?; - // get executor - let uuid = 1; // get execution plan from th sql query let sql = "SELECT * FROM a,b WHERE a.a1 = b.b1 "; let plan = get_execution_plan_from_sql(&ctx, sql).await?; let mut task = Task::new(); - let (join_node, build_plan) = df2vayu::get_hash_build_pipeline(plan.clone()); + let uuid = 42; + let (join_node, build_plan) = df2vayu::get_hash_build_pipeline(plan.clone(), uuid); + let build_source_pipeline = df2vayu::get_source_node(build_plan.clone()); + let sink = vayu_common::SchedulerSinkType::BuildAndStoreHashMap(uuid, join_node); let build_pipeline = DatafusionPipelineWithSource { source: build_source_pipeline, plan: build_plan, - sink: Some(vayu_common::SchedulerSinkType::BuildAndStoreHashMap( - 1, join_node, - )), + sink: Some(sink), }; task.add_pipeline(build_pipeline); - + // TODO: set this uuid in probe also let probe_plan = plan.clone(); let probe_source_node = df2vayu::get_source_node(probe_plan.clone()); + let probe_pipeline = DatafusionPipelineWithSource { source: probe_source_node, plan: probe_plan, diff --git a/vayuDB/src/io_service.rs b/vayuDB/src/io_service.rs index fc2227c..e03db11 100644 --- a/vayuDB/src/io_service.rs +++ b/vayuDB/src/io_service.rs @@ -16,14 +16,17 @@ impl IOService { uuid: 0, } } - pub fn submit_request(&mut self, plan: Arc) -> i32 { + pub fn submit_request(&mut self, source: Arc) -> i32 { let context = SessionContext::new().task_ctx(); - let stream = plan.execute(0, context).unwrap(); + let stream = source.execute(0, context).unwrap(); self.stream = Some(stream); self.uuid = 1; self.uuid } pub fn poll_response(&mut self) -> Poll<(i32, RecordBatch)> { + if self.stream.is_none() { + return Poll::Pending; + } let stream = self.stream.take(); let data = futures::executor::block_on(stream.unwrap().next()) diff --git a/vayuDB/src/main.rs b/vayuDB/src/main.rs index 9eef091..6dac326 100644 --- a/vayuDB/src/main.rs +++ b/vayuDB/src/main.rs @@ -1,5 +1,4 @@ use crossbeam_channel::{bounded, Receiver, Sender}; -use futures::stream::Skip; use std::collections::HashMap; use std::task::Poll; use std::thread; @@ -7,30 +6,25 @@ use vayu_common::{DatafusionPipeline, DatafusionPipelineWithData}; mod dummy_tasks; mod io_service; mod scheduler; -use crossbeam_skiplist::SkipMap; -use crossbeam_utils::thread::scope; -use lockfree; -use std::sync::mpsc; +use std::collections::LinkedList; use std::sync::Arc; use std::sync::Mutex; use vayu_common; + use vayu_common::store::Store; -fn round_robin(worker_id: usize, num_threads: usize) -> usize { - (worker_id + 1) % num_threads -} fn start_worker( receiver: Receiver, - sender: Sender, + sender: Sender<(usize, i32)>, global_store: Arc>, + thread_id: usize, ) { - // TODO: set cpu affinity let mut executor = vayu::VayuExecutionEngine::new(global_store); // Receive structs sent over the channel - sender.send(0).unwrap(); while let Ok(pipeline) = receiver.recv() { - println!("got a pipeline for the thread, executing ..."); + let pipeline_id = pipeline.pipeline.id; + println!("{thread_id}:got a pipeline for the thread, executing ..."); executor.execute(pipeline); - sender.send(0).unwrap(); + sender.send((thread_id, pipeline_id)).unwrap(); } } @@ -49,10 +43,12 @@ fn main() { // this MPSC queue would be used by workers to inform - // 1. pipeline has finished // 2. request for new work - let (informer_sender, informer_receiver): (Sender, Receiver) = bounded(0); + let (informer_sender, informer_receiver): (Sender<(usize, i32)>, Receiver<(usize, i32)>) = + bounded(0); // vector to store main_thread->worker channels let mut senders: Vec> = Vec::new(); + let mut free_threads: LinkedList = LinkedList::new(); for thread_num in 0..num_threads { // channel to send pipeline and data from main thread the to worker thread @@ -67,8 +63,15 @@ fn main() { // start worker thread which will keep looking for new entries in the channel thread::spawn(move || { - start_worker(receiver, informer_sender_clone, global_store_clone); + // TODO: set cpu affinity + start_worker( + receiver, + informer_sender_clone, + global_store_clone, + thread_num, + ); }); + free_threads.push_back(thread_num); } println!("total number of workers {}", senders.len()); @@ -77,58 +80,63 @@ fn main() { let mut io_service = io_service::IOService::new(); // TODO: create task_queue - buffer tasks - let mut worker_id = 0; let mut request_pipeline_map: HashMap = HashMap::new(); // right now a pipeline would be assigned to a worker only when it is free // but we will poll some extra pipelines from the scheduler and send it to the io service // so that we can start working on it once any worker is free - let mut non_assigned_pipelines = 0; + let mut next_id = 0; loop { // poll scheduler for a new task - if non_assigned_pipelines < 10 { - let pipeline = scheduler.get_pipeline(); - if let Poll::Ready(pipeline) = pipeline { - non_assigned_pipelines += 1; - // TODO: add support for multiple dependent pipeline - println!("got a pipeline from scheduler"); - assert!(pipeline.sink.is_some()); - - // submit the source request to io service - let request_num = io_service.submit_request(pipeline.source); - println!("sent the request to the io_service"); - - // insert the pipeline into the local map - request_pipeline_map.insert( - request_num, - DatafusionPipeline { - plan: pipeline.plan, - sink: pipeline.sink, - }, - ); + let pipeline = scheduler.get_pipeline(next_id); + if let Poll::Ready(pipeline) = pipeline { + // TODO: add support for multiple dependent pipeline + println!("got a pipeline from scheduler"); + + // submit the source request to io service + let request_num = io_service.submit_request(pipeline.source); + println!("sent the request to the io_service"); + + // insert the pipeline into the local map + request_pipeline_map.insert( + request_num, + DatafusionPipeline { + plan: pipeline.plan, + sink: pipeline.sink, + id: next_id, + }, + ); + next_id += 1; + } + + if let Ok((thread_id, finished_pipeline_id)) = informer_receiver.try_recv() { + if finished_pipeline_id != -1 { + scheduler.ack_pipeline(finished_pipeline_id); } + // add in the queue + free_threads.push_back(thread_id); } - if let Ok(value) = informer_receiver.recv() { + if let Some(&thread_id) = free_threads.front() { // poll io_service for a response let response = io_service.poll_response(); if let Poll::Ready((request_num, data)) = response { + free_threads.pop_front(); println!("got a response from the io_service"); // TODO: handle when a source gives multiple record batches // get the pipeline from the local map - let pipeline = request_pipeline_map.remove(&request_num).unwrap(); + let pipeline = request_pipeline_map.remove(&request_num); + assert!(pipeline.is_some()); + let pipeline = pipeline.unwrap(); // send over channel let msg = DatafusionPipelineWithData { pipeline, data }; - senders[worker_id].send(msg).expect("Failed to send struct"); + senders[thread_id].send(msg).expect("Failed to send struct"); println!("sent the pipeline and the data to the worker"); - non_assigned_pipelines -= 1; // assign the next pipeline to some other worker - worker_id = round_robin(worker_id, num_threads); + // worker_id = round_robin(worker_id, num_threads); } - } else { - panic!("what is this?") } } } diff --git a/vayuDB/src/scheduler.rs b/vayuDB/src/scheduler.rs index b3ec5ef..8343eb9 100644 --- a/vayuDB/src/scheduler.rs +++ b/vayuDB/src/scheduler.rs @@ -1,34 +1,66 @@ use crate::dummy_tasks::{test_filter_project, test_hash_join}; -use std::task::Poll; +use std::{hash::Hash, task::Poll}; +use vayu_common::DatafusionPipelineWithSource; +#[derive(PartialEq)] +enum HashJoinState { + CanSendBuild, + BuildSent(i32), + CanSendProbe, + ProbeSent(i32), +} pub struct Scheduler { - next_pipeline: usize, + turn: usize, + // stored_id: i32, + state: HashJoinState, + probe_pipeline: Option, } impl Scheduler { pub fn new() -> Self { - Scheduler { next_pipeline: 0 } + Scheduler { + turn: 0, + state: HashJoinState::CanSendBuild, + probe_pipeline: None, + } } - pub fn get_pipeline(&mut self) -> Poll { - if self.next_pipeline % 3 == 0 { - // filter and project - self.next_pipeline += 1; - let mut task = futures::executor::block_on(test_filter_project()).unwrap(); - let pipeline = task.pipelines.remove(0); - return Poll::Ready(pipeline); - } else if self.next_pipeline % 3 == 1 { - // hash build - self.next_pipeline += 1; + pub fn get_pipeline(&mut self, id: i32) -> Poll { + self.turn = 1 - self.turn; + if self.turn == 0 && self.state == HashJoinState::CanSendBuild { let mut task = futures::executor::block_on(test_hash_join()).unwrap(); + self.probe_pipeline = Some(task.pipelines.remove(1)); + let build_pipeline = task.pipelines.remove(0); + + self.state = HashJoinState::BuildSent(id); + return Poll::Ready(build_pipeline); + } else if self.turn == 0 && self.state == HashJoinState::CanSendProbe { + self.state = HashJoinState::ProbeSent(id); + assert!(self.probe_pipeline.is_some()); + let probe_pipeline = self.probe_pipeline.take().unwrap(); + return Poll::Ready(probe_pipeline); + } else { + let mut task = futures::executor::block_on(test_filter_project()).unwrap(); let pipeline = task.pipelines.remove(0); return Poll::Ready(pipeline); - } else if self.next_pipeline % 3 == 2 { - // hash probe - self.next_pipeline += 1; - let mut task = futures::executor::block_on(test_hash_join()).unwrap(); - let pipeline = task.pipelines.remove(1); - return Poll::Ready(pipeline); - } else { - panic!("magic its magic") + // return Poll::Pending; } } + pub fn ack_pipeline(&mut self, ack_id: i32) { + match self.state { + HashJoinState::BuildSent(id) => { + if id == ack_id { + self.state = HashJoinState::CanSendProbe; + } + } + HashJoinState::ProbeSent(id) => { + if id == ack_id { + self.state = HashJoinState::CanSendBuild; + } + } + _ => {} + } + + // if pipeline_id == self.build_id { + // self.next_pipeline = 2; + // } + } }