Skip to content

Commit

Permalink
perf: use idxvec in join hashtables (#14133)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Jan 31, 2024
1 parent 504bfeb commit 2c5f4f3
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 16 deletions.
18 changes: 10 additions & 8 deletions crates/polars-ops/src/frame/join/hash_join/multiple_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use polars_core::hashing::{
use polars_core::utils::{_set_partition_size, split_df};
use polars_core::POOL;
use polars_utils::hashing::hash_to_partition;
use polars_utils::idx_vec::IdxVec;
use polars_utils::idxvec;

use super::*;

Expand All @@ -31,7 +33,7 @@ pub(crate) unsafe fn compare_df_rows2(
pub(crate) fn create_probe_table(
hashes: &[UInt64Chunked],
keys: &DataFrame,
) -> Vec<HashMap<IdxHash, Vec<IdxSize>, IdBuildHasher>> {
) -> Vec<HashMap<IdxHash, IdxVec, IdBuildHasher>> {
let n_partitions = _set_partition_size();

// We will create a hashtable in every thread.
Expand All @@ -41,7 +43,7 @@ pub(crate) fn create_probe_table(
(0..n_partitions)
.into_par_iter()
.map(|part_no| {
let mut hash_tbl: HashMap<IdxHash, Vec<IdxSize>, IdBuildHasher> =
let mut hash_tbl: HashMap<IdxHash, IdxVec, IdBuildHasher> =
HashMap::with_capacity_and_hasher(_HASHMAP_INIT_SIZE, Default::default());

let mut offset = 0;
Expand All @@ -59,7 +61,7 @@ pub(crate) fn create_probe_table(
idx,
*h,
keys,
|| vec![idx],
|| idxvec![idx],
|v| v.push(idx),
)
}
Expand All @@ -78,7 +80,7 @@ pub(crate) fn create_probe_table(
fn create_build_table_outer(
hashes: &[UInt64Chunked],
keys: &DataFrame,
) -> Vec<HashMap<IdxHash, (bool, Vec<IdxSize>), IdBuildHasher>> {
) -> Vec<HashMap<IdxHash, (bool, IdxVec), IdBuildHasher>> {
// Outer join equivalent of create_build_table() adds a bool in the hashmap values for tracking
// whether a value in the hash table has already been matched to a value in the probe hashes.
let n_partitions = _set_partition_size();
Expand All @@ -88,7 +90,7 @@ fn create_build_table_outer(
// Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition.
POOL.install(|| {
(0..n_partitions).into_par_iter().map(|part_no| {
let mut hash_tbl: HashMap<IdxHash, (bool, Vec<IdxSize>), IdBuildHasher> =
let mut hash_tbl: HashMap<IdxHash, (bool, IdxVec), IdBuildHasher> =
HashMap::with_capacity_and_hasher(_HASHMAP_INIT_SIZE, Default::default());

let mut offset = 0;
Expand All @@ -106,7 +108,7 @@ fn create_build_table_outer(
idx,
*h,
keys,
|| (false, vec![idx]),
|| (false, idxvec![idx]),
|v| v.1.push(idx),
)
}
Expand All @@ -126,7 +128,7 @@ fn create_build_table_outer(
#[allow(clippy::too_many_arguments)]
fn probe_inner<F>(
probe_hashes: &UInt64Chunked,
hash_tbls: &[HashMap<IdxHash, Vec<IdxSize>, IdBuildHasher>],
hash_tbls: &[HashMap<IdxHash, IdxVec, IdBuildHasher>],
results: &mut Vec<(IdxSize, IdxSize)>,
local_offset: usize,
n_tables: usize,
Expand Down Expand Up @@ -492,7 +494,7 @@ pub fn _left_semi_multiple_keys(
#[allow(clippy::type_complexity)]
fn probe_outer<F, G, H>(
probe_hashes: &[UInt64Chunked],
hash_tbls: &mut [HashMap<IdxHash, (bool, Vec<IdxSize>), IdBuildHasher>],
hash_tbls: &mut [HashMap<IdxHash, (bool, IdxVec), IdBuildHasher>],
results: &mut (
MutablePrimitiveArray<IdxSize>,
MutablePrimitiveArray<IdxSize>,
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-ops/src/frame/join/hash_join/single_keys.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use polars_utils::hashing::{hash_to_partition, DirtyHash};
use polars_utils::idx_vec::IdxVec;
use polars_utils::idxvec;
use polars_utils::nulls::IsNull;
use polars_utils::sync::SyncPtr;

Expand Down Expand Up @@ -141,8 +142,7 @@ where
o.get_mut().push(idx as IdxSize);
},
Entry::Vacant(v) => {
let mut iv = IdxVec::new();
iv.push(idx as IdxSize);
let iv = idxvec![idx as IdxSize];
v.insert(iv);
},
};
Expand Down
10 changes: 6 additions & 4 deletions crates/polars-ops/src/frame/join/hash_join/single_keys_outer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use arrow::array::{MutablePrimitiveArray, PrimitiveArray};
use arrow::legacy::utils::CustomIterTools;
use polars_utils::hashing::hash_to_partition;
use polars_utils::idx_vec::IdxVec;
use polars_utils::idxvec;
use polars_utils::nulls::IsNull;

use super::*;
Expand Down Expand Up @@ -31,7 +33,7 @@ where

pub(crate) fn prepare_hashed_relation_threaded<T, I>(
iters: Vec<I>,
) -> Vec<PlHashMap<T, (bool, Vec<IdxSize>)>>
) -> Vec<PlHashMap<T, (bool, IdxVec)>>
where
I: Iterator<Item = T> + Send + TrustedLen,
T: Send + Hash + Eq + Sync + Copy,
Expand All @@ -48,7 +50,7 @@ where
.map(|partition_no| {
let build_hasher = build_hasher.clone();
let hashes_and_keys = &hashes_and_keys;
let mut hash_tbl: PlHashMap<T, (bool, Vec<IdxSize>)> =
let mut hash_tbl: PlHashMap<T, (bool, IdxVec)> =
PlHashMap::with_hasher(build_hasher);

let mut offset = 0;
Expand All @@ -70,7 +72,7 @@ where

match entry {
RawEntryMut::Vacant(entry) => {
entry.insert_hashed_nocheck(*h, *k, (false, vec![idx]));
entry.insert_hashed_nocheck(*h, *k, (false, idxvec![idx]));
},
RawEntryMut::Occupied(mut entry) => {
let (_k, v) = entry.get_key_value_mut();
Expand All @@ -92,7 +94,7 @@ where
#[allow(clippy::too_many_arguments)]
fn probe_outer<T, F, G, H>(
probe_hashes: &[Vec<(u64, T)>],
hash_tbls: &mut [PlHashMap<T, (bool, Vec<IdxSize>)>],
hash_tbls: &mut [PlHashMap<T, (bool, IdxVec)>],
results: &mut (
MutablePrimitiveArray<IdxSize>,
MutablePrimitiveArray<IdxSize>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use polars_utils::hashing::hash_to_partition;
use polars_utils::slice::GetSaferUnchecked;

use super::*;
use crate::executors::sinks::joins::inner_left::GenericJoinProbe;
use crate::executors::sinks::joins::generic_probe_inner_left::GenericJoinProbe;
use crate::executors::sinks::utils::{hash_rows, load_vec};
use crate::executors::sinks::HASHMAP_INIT_SIZE;
use crate::expressions::PhysicalPipedExpr;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-pipe/src/executors/sinks/joins/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(feature = "cross_join")]
mod cross;
mod generic_build;
mod inner_left;
mod generic_probe_inner_left;

#[cfg(feature = "cross_join")]
pub(crate) use cross::*;
Expand Down

0 comments on commit 2c5f4f3

Please sign in to comment.