Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Got schema built and columns indices
Browse files Browse the repository at this point in the history
  • Loading branch information
SarveshOO7 committed Apr 6, 2024
1 parent ff966c9 commit 58a5636
Showing 1 changed file with 60 additions and 17 deletions.
77 changes: 60 additions & 17 deletions eggstrain/src/execution/operators/hash_join.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use super::{BinaryOperator, Operator};
use crate::execution::record_table::RecordTable;
use arrow::array::ArrayRef;
use arrow::datatypes::{Schema, SchemaRef};
use arrow::datatypes::{Schema, SchemaBuilder, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion::common::arrow::row::{Row, RowConverter, Rows};
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr::{left, ColumnarValue};
use datafusion::physical_expr::PhysicalExprRef;
use datafusion::physical_plan::joins::utils::{build_join_schema, ColumnIndex};
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::{DataFusionError, Result};
use datafusion_common::{DataFusionError, JoinSide, JoinType, Result};
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
Expand All @@ -24,17 +26,17 @@ pub struct HashJoin {

/// TODO docs
impl HashJoin {
pub(crate) fn new(
left_schema: SchemaRef,
right_schema: SchemaRef,
equate_on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Self {
pub(crate) fn new(hash_exec: &HashJoinExec) -> Self {
Self {
children,
left_schema,
right_schema,
equate_on,
children: hash_exec.children().clone(),
left_schema: hash_exec.left().schema(),
right_schema: hash_exec.right().schema(),
//correctly convert to match the type
equate_on: hash_exec
.on()
.iter()
.map(|(left, right)| (left.clone().clone(), right.clone()))
.collect(),
}
}

Expand Down Expand Up @@ -97,6 +99,44 @@ impl HashJoin {
Ok(record_table)
}

pub fn build_join_schema(left: &Schema, right: &Schema) -> (Schema, Vec<ColumnIndex>) {
let (fields, column_indices): (SchemaBuilder, Vec<ColumnIndex>) = {
let left_fields =
left.fields()
.iter()
.map(|f| f.clone())
.enumerate()
.map(|(index, f)| {
(
f,
ColumnIndex {
index,
side: JoinSide::Left,
},
)
});
let right_fields =
right
.fields()
.iter()
.map(|f| f.clone())
.enumerate()
.map(|(index, f)| {
(
f,
ColumnIndex {
index,
side: JoinSide::Right,
},
)
});

// left then right
left_fields.chain(right_fields).unzip()
};
(fields.finish(), column_indices)
}

/// Given a single batch (coming from the right child), probes the hash table and outputs a
/// [`RecordBatch`] for every tuple on the right that gets matched with a tuple in the hash table.
///
Expand Down Expand Up @@ -132,18 +172,21 @@ impl HashJoin {
let left_schema: Schema = (*self.left_schema).clone();
let right_schema: Schema = (*self.right_schema).clone();

let new_schema = Schema::try_merge(vec![left_schema, right_schema])?;
// let new_schema = Schema::try_merge(vec![left_schema, right_schema])?;
let (new_schema, column_indices) =
HashJoin::build_join_schema(&left_schema, &right_schema);
let joined_schema: SchemaRef = Arc::new(new_schema);

let row_converter = RowConverter::new(new_schema);
// There are records associated with this hash value, so we need to emit things
for &record in records {
let left_tuple = table.buffer.get(record).unwrap();
let right_tuple: Row = right_rows.row(row);

let joined_tuple = todo!("Join the two tuples in some way");

todo!("Join the two tuples in some way, then append to a `Rows`")
// let cols = vec[cols]
for col in column_indices {}
self.column_index =
todo!("Join the two tuples in some way, then append to a `Rows`")
}
todo!("Convert the `Rows` back into a `RecordBatch` with `RowConverter::convert_rows`");
let out_columns: Vec<ArrayRef> = RowConverter::convert_rows(joined_schema, rows)?;
Expand Down

0 comments on commit 58a5636

Please sign in to comment.