Skip to content

Commit

Permalink
[Issue 362] Improve Performance constructing the asofJoin() (#363)
Browse files Browse the repository at this point in the history
* WIP checkpoint commit - testing the performance of running the as-of join on very large DFs (large number of columns)

* simplified building of expressions to find last right-hand value in __getLastRightRow
all test-code passing

* refactored __addPrefixToColumns and __addColumnsFromOtherDF to improve performance

* cleaning up test instrumentation

* satisfying linters: black, flake8, type-check

* trying to fix type-check

* more black formatting

* update linter commands

* fixing type-check issues

* fixing type-check issues

* fixing type-check issues

* removing unnecessary lambda function

Co-authored-by: Lorin Dawson <[email protected]>

* Clarifying suggestion for the future with additional commentary

* Include TODO in comment suggestion

Co-authored-by: Lorin Dawson <[email protected]>

* fixing typo

* fixing type-check

---------

Co-authored-by: Lorin Dawson <[email protected]>
  • Loading branch information
tnixon and R7L208 authored Sep 22, 2023
1 parent 9f2bbe4 commit b531dac
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 92 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ jobs:
with:
python-version: "3.9"
- name: Type check
working-directory: ./python
run: |
pip install tox
tox -e type-check
test:
name: Buid and Test Module
name: Build and Test Module
runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand Down
151 changes: 62 additions & 89 deletions python/tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@
import logging
import operator
from abc import ABCMeta, abstractmethod
from functools import reduce
from typing import Any, Callable, List, Optional, Sequence, TypeVar, Union

import numpy as np
import pandas as pd
from scipy.fft import fft, fftfreq # type: ignore
import pyspark.sql.functions as sfn
from IPython.core.display import HTML
from IPython.display import display as ipydisplay

import pyspark.sql.functions as sfn
from pyspark.sql import SparkSession
from pyspark.sql.column import Column
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.window import Window, WindowSpec
from scipy.fft import fft, fftfreq # type: ignore

import tempo.interpol as t_interpolation
import tempo.io as t_io
Expand Down Expand Up @@ -112,7 +110,7 @@ def __validate_ts_string(ts_text: str) -> None:

@staticmethod
def __validated_column(df: DataFrame, colname: str) -> str:
if type(colname) != str:
if not isinstance(colname, str):
raise TypeError(
f"Column names must be of type str; found {type(colname)} instead!"
)
Expand All @@ -124,12 +122,12 @@ def __validated_columns(
self, df: DataFrame, colnames: Optional[Union[str, List[str]]]
) -> List[str]:
# if provided a string, treat it as a single column
if type(colnames) == str:
if isinstance(colnames, str):
colnames = [colnames]
# otherwise we really should have a list or None
elif colnames is None:
colnames = []
elif type(colnames) != list:
elif not isinstance(colnames, list):
raise TypeError(
f"Columns must be of type list, str, or None; found {type(colnames)} instead!"
)
Expand Down Expand Up @@ -157,38 +155,38 @@ def __addPrefixToColumns(self, col_list: list[str], prefix: str) -> "TSDF":
"""
Add prefix to all specified columns.
"""
if prefix != "":
prefix = prefix + "_"
# no-op if no prefix
if not prefix:
return self

df = reduce(
lambda df, idx: df.withColumnRenamed(
col_list[idx], "".join([prefix, col_list[idx]])
),
range(len(col_list)),
self.df,
)
# build a column rename map
col_map = {col: "_".join([prefix, col]) for col in col_list}
# TODO - In the future (when Spark 3.4+ is standard) we should implement batch rename using:
# df = self.df.withColumnsRenamed(col_map)

if prefix == "":
ts_col = self.ts_col
seq_col = self.sequence_col if self.sequence_col else self.sequence_col
else:
ts_col = "".join([prefix, self.ts_col])
seq_col = (
"".join([prefix, self.sequence_col])
if self.sequence_col
else self.sequence_col
)
return TSDF(df, ts_col, self.partitionCols, sequence_col=seq_col)
# build a list of column expressions to rename columns in a select
select_exprs = [
sfn.col(col).alias(col_map[col]) if col in col_map else sfn.col(col)
for col in self.df.columns
]
# select the renamed columns
renamed_df = self.df.select(*select_exprs)

# find the structural columns
ts_col = col_map.get(self.ts_col, self.ts_col)
partition_cols = [col_map.get(c, c) for c in self.partitionCols]
sequence_col = col_map.get(self.sequence_col, self.sequence_col)
return TSDF(renamed_df, ts_col, partition_cols, sequence_col=sequence_col)

def __addColumnsFromOtherDF(self, other_cols: Sequence[str]) -> "TSDF":
"""
Add columns from some other DF as lit(None), as pre-step before union.
"""
new_df = reduce(
lambda df, idx: df.withColumn(other_cols[idx], sfn.lit(None)),
range(len(other_cols)),
self.df,
)

# build a list of column expressions to rename columns in a select
current_cols = [sfn.col(col) for col in self.df.columns]
new_cols = [sfn.lit(None).alias(col) for col in other_cols]
new_df = self.df.select(current_cols + new_cols)

return TSDF(new_df, self.ts_col, self.partitionCols)

Expand Down Expand Up @@ -227,54 +225,41 @@ def __getLastRightRow(
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

# generate expressions to find the last value of each right-hand column
if ignoreNulls is False:
if tsPartitionVal is not None:
raise ValueError(
"Disabling null skipping with a partition value is not supported yet."
)
df = reduce(
lambda df, idx: df.withColumn(
right_cols[idx],
sfn.last(
sfn.when(
sfn.col("rec_ind") == -1, sfn.struct(right_cols[idx])
).otherwise(None),
True, # ignore nulls because it indicates rows from the left side
).over(window_spec),
),
range(len(right_cols)),
self.df,
)
df = reduce(
lambda df, idx: df.withColumn(
right_cols[idx], sfn.col(right_cols[idx])[right_cols[idx]]
),
range(len(right_cols)),
df,
)
mod_right_cols = [
sfn.last(
sfn.when(sfn.col("rec_ind") == -1, sfn.struct(col)).otherwise(None),
True,
)
.over(window_spec)[col]
.alias(col)
for col in right_cols
]
elif tsPartitionVal is None:
# splitting off the condition as we want different columns in the reduce if implementing the skew AS OF join
df = reduce(
lambda df, idx: df.withColumn(
right_cols[idx],
sfn.last(right_cols[idx], ignoreNulls).over(window_spec),
),
range(len(right_cols)),
self.df,
)
mod_right_cols = [
sfn.last(col, ignoreNulls).over(window_spec).alias(col)
for col in right_cols
]
else:
df = reduce(
lambda df, idx: df.withColumn(
right_cols[idx],
sfn.last(right_cols[idx], ignoreNulls).over(window_spec),
).withColumn(
"non_null_ct" + right_cols[idx],
sfn.count(right_cols[idx]).over(window_spec),
),
range(len(right_cols)),
self.df,
)
mod_right_cols = [
sfn.last(col, ignoreNulls).over(window_spec).alias(col)
for col in right_cols
]
# non-null count columns, these will be dropped below
mod_right_cols += [
sfn.count(col).over(window_spec).alias("non_null_ct" + col)
for col in right_cols
]

# select the left-hand side columns, and the modified right-hand side columns
non_right_cols = list(set(self.df.columns) - set(right_cols))
df = self.df.select(non_right_cols + mod_right_cols)
# drop the null left-hand side rows
df = (df.filter(sfn.col(left_ts_col).isNotNull()).drop(self.ts_col)).drop(
"rec_ind"
)
Expand Down Expand Up @@ -764,16 +749,8 @@ def asofJoin(
left_cols = list(set(left_df.columns) - set(self.partitionCols))
right_cols = list(set(right_df.columns) - set(right_tsdf.partitionCols))

left_prefix = (
""
if ((left_prefix is None) | (left_prefix == ""))
else left_prefix + "_"
)
right_prefix = (
""
if ((right_prefix is None) | (right_prefix == ""))
else right_prefix + "_"
)
left_prefix = left_prefix + "_" if left_prefix else ""
right_prefix = right_prefix + "_" if right_prefix else ""

w = Window.partitionBy(*partition_cols).orderBy(
right_prefix + right_tsdf.ts_col
Expand Down Expand Up @@ -843,17 +820,13 @@ def asofJoin(
[right_tsdf.ts_col] + orig_right_col_diff, right_prefix
)

left_nonpartition_cols = list(
left_columns = list(
set(left_tsdf.df.columns).difference(set(self.partitionCols))
)
right_nonpartition_cols = list(
right_columns = list(
set(right_tsdf.df.columns).difference(set(self.partitionCols))
)

# For both dataframes get all non-partition columns (including ts_col)
left_columns = [left_tsdf.ts_col] + left_nonpartition_cols
right_columns = [right_tsdf.ts_col] + right_nonpartition_cols

# Union both dataframes, and create a combined TS column
combined_ts_col = "combined_ts"
combined_df = left_tsdf.__addColumnsFromOtherDF(right_columns).__combineTSDF(
Expand Down Expand Up @@ -1541,7 +1514,7 @@ def null_safe_equals(col1: Column, col2: Column) -> Column:
)

def state_comparison_fn(a: CT, b: CT) -> Callable[[Column, Column], Column]:
return operator_dict[state_definition](a, b) # type: ignore
return operator_dict[state_definition](a, b)

elif callable(state_definition):
state_comparison_fn = state_definition # type: ignore
Expand Down
4 changes: 2 additions & 2 deletions python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ deps =
flake8
black
commands =
black --check {toxinidir}
flake8
black --check {toxinidir}/tempo
flake8 --config {toxinidir}/.flake8 {toxinidir}/tempo

[testenv:type-check]
description = run type checks
Expand Down

0 comments on commit b531dac

Please sign in to comment.