Skip to content

Commit

Permalink
[mypy, coverage] Complete Type Checking for the Project and updates t…
Browse files Browse the repository at this point in the history
…o Coverage configuration (#324)

* update gitignore for pyenv files

* ignore tox files

* hardcode version instead of pulling from gh

importing semver in setup caused an error bc it was being imported before requirements file could install it

* move coverage requirement to tox config

* remove version py file since it's hardcoded in setup

* initial working tox commit

* update required coverage versions to be more flexible

* switch order formatters run

* ignore env specific coverage files

* update tox config to use coverage run instead of base python to execute tests

* add coverage report env to tox config

* update coverage-report description

* update a env name for type checking

* remove black and flake8 from requirments package and into tox envs

* update contributing md file with tox instructions

* add pyenv and tox instructions to CONTRIBUTING md

* Update test.yml

* adding comment to force testing on GH actions

* removing comment to force testing on GH actions

* Update test.yml to fix error caused by typo

* Revert "remove version py file since it's hardcoded in setup"

This reverts commit c41d140.

* Revert "hardcode version instead of pulling from gh"

This reverts commit e0f42e6.

* add semver as a dep for testenv in tox ini

* add pyproject toml to install semver as build requirement

* update type default env to type-check

this matches the actual env name since type was renamed to type-check

* move all flake8 configs to tox.ini file

flake8 was returning different results locally in virtual env and using tox. This is because the configuration in tox.ini takes precedence over the .flake8 config file. In addition, flake8 no longer supports a global config file after version 4

* remove toxinidir from flake8 command

* remove all flake8 config from tox ini file and back into flake8 config so that GH actions aren't broken

* add third party stubs for pandas and pytz for mypy as dependencies

* initial mypy ini file with config to ignore pyspark imports

* update mypy ini with global configs

* interval.py passes mypy checks

* resample.py passes mypy checks

* io.py passes mypy checks

* update type for freq_dict in respample.py

* working changes to utils.py

* formatting

* changes to resample.py required for correct typing in utils.py

* utils.py passes mypy checks

* changes to interpol so that tsdf can type check

* changes to resample so that tsdf can type check

* tsdf passes mypy checks

* changes to tsdf so that resample can type check successfully

* changes to utils so that resample can type check successfully

* resample passes mypy checks

* interpol passes mypy checks

* pin mypy version

* update test.yml to include type check for GH Actions

* remove ignore for F403 for flake8 config and violations

* formatting

* linting

* add noqa for utils import

* fix circular dependencies

* interpol unit tests pass

* resample passes unit tests

* tsdf passing unit tests

* utils passing unit tests

updated test_display_unavailable to not take an arg for display_unavailable

arg was never used in the implementation

* fix a resample test case that failed

* formatting

* update workflow to install to before running it :)

* using pip instead of sudo

* using pip instead of sudo

* fix error causing a test to fail with py39

* add coverage for asOfJoin in tempo/tsdf

* increase coverage for interpol

* excluding actual test files from coverage report

* format and lint

* update gitignore for mypy cache

* update to coverage file to actually omit test files

* trying to use tox to run coverage report

* move coverage run configs from tox.ini to .coveragerc

* potential fix for GH action fatal error

* Revert "potential fix for GH action fatal error"

This reverts commit b2661fc.

* revert changes to use tox in gh actions for coverage

* omit version.py from coverage report since used for build and not project and omit line calling unittest.main

* move check for literals to resample from interpol to increase coverate

* add seconds to typed freq dict

* add minutes to typed freq dict

* add second to freq dict and force unit to lowercase before checking

* add additional valid frequences to typed freq dict

* update checkAllowableFreq function to pass type checks

* add test to check raise when invalid freq is passed to _appendAggKey

* remove unneded check because valid freq is handled in checkAllowableFreq now

* remove another unneeded check since checking for valid freq values is fully handled by checkAllowableFreq

* fixes for type checking

* remove unreachable pass after return statement

* add ... as exclusion for non implemented methods

* update typing because partition cols is optional for TSDF

* new unit tests for tsdf

* remove unneded check since literals are handled by checkAllowableFreq

* formatting

* formatting

* new unit tests for tsdf

* fix coverage report error

parallel mode uses system info to not conflict with other reports

* see if parallel mode works again if codecov combine is used

* the updated GH workflow for previous commit

* move coveragerc file to top dir so it's discoverable by IDEs

* Revert "move coveragerc file to top dir so it's discoverable by IDEs"

This reverts commit a4ca9b3.

* move coveragerc file to project root

* Revert "move coveragerc file to project root"

This reverts commit eac55e5.

---------

Co-authored-by: Lorin Dawson <[email protected]>
Co-authored-by: Tristan Nixon <[email protected]>
  • Loading branch information
3 people authored May 31, 2023
1 parent 27f2478 commit cabe0c4
Show file tree
Hide file tree
Showing 20 changed files with 1,321 additions and 372 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ jobs:
with:
args: "--config python/.flake8"
path: "./python"
type-checks:
runs-on: ubuntu-latest
name: Type Checks
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: "3.9"
- name: Type check
run: |
pip install tox
tox -e type-check
test:
name: Buid and Test Module
runs-on: ${{ matrix.os }}
Expand All @@ -56,6 +68,7 @@ jobs:
run: |
python -I -m pip install 'coverage<8,>=7' pyspark==3.2.1 -r requirements.txt
coverage run -m unittest discover -s tests -p '*_tests.py'
coverage combine
coverage xml
- name: Publish test coverage
uses: codecov/codecov-action@v1
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ venv
/docs/_build/

# sphinx build folder
docs/_build
docs/_build

# ignore mypy cache
.mypy_cache
python/.mypy_cache
10 changes: 4 additions & 6 deletions python/.coveragerc
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
[run]
parallel = True
branch = True
source = tempo
omit = *_tests.py

[report]
exclude_lines =
if self.debug:
pragma: no cover
raise NotImplementedError
if __name__ == .__main__.:
unittest.main()
import
from
if TYPE_CHECKING:

ignore_errors = True

omit =
setup.py
*tests*
pass
11 changes: 11 additions & 0 deletions python/mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[mypy]
strict_optional = True
no_implicit_optional = True
disallow_untyped_defs = True
show_error_codes = True
warn_unused_ignores = True
warn_redundant_casts = True
namespace_packages = True

[mypy-pyspark.*]
ignore_missing_imports = True
115 changes: 74 additions & 41 deletions python/tempo/interpol.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from typing import List
from __future__ import annotations

from typing import List, Optional, Union, Callable

from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import col, expr, last, lead, lit, when
from pyspark.sql.window import Window
from tempo.utils import calculate_time_horizon
from tempo.resample import checkAllowableFreq, freq_dict

import tempo.utils as t_utils
import tempo.resample as t_resample
import tempo.tsdf as t_tsdf

# Interpolation fill options
method_options = ["zero", "null", "bfill", "ffill", "linear"]
Expand All @@ -15,7 +19,7 @@ class Interpolation:
def __init__(self, is_resampled: bool):
self.is_resampled = is_resampled

def __validate_fill(self, method: str):
def __validate_fill(self, method: str) -> None:
"""
Validate if the fill provided is within the allowed list of values.
Expand All @@ -29,10 +33,11 @@ def __validate_fill(self, method: str):
def __validate_col(
self,
df: DataFrame,
partition_cols: List[str],
partition_cols: Optional[List[str]],
target_cols: List[str],
ts_col: str,
):
ts_col_dtype: Optional[str] = None, # NB: added for testing purposes only
) -> None:
"""
Validate if target column exists and is of numeric type, and validates if partition column exists.
Expand All @@ -42,11 +47,12 @@ def __validate_col(
:param ts_col: Timestamp column to be validated
"""

for column in partition_cols:
if column not in str(df.columns):
raise ValueError(
f"Partition Column: '{column}' does not exist in DataFrame."
)
if partition_cols is not None:
for column in partition_cols:
if column not in str(df.columns):
raise ValueError(
f"Partition Column: '{column}' does not exist in DataFrame."
)
for column in target_cols:
if column not in str(df.columns):
raise ValueError(
Expand All @@ -62,10 +68,14 @@ def __validate_col(
f"Timestamp Column: '{ts_col}' does not exist in DataFrame."
)

if df.select(ts_col).dtypes[0][1] != "timestamp":
if ts_col_dtype is None:
ts_col_dtype = df.select(ts_col).dtypes[0][1]
if ts_col_dtype != "timestamp":
raise ValueError("Timestamp Column needs to be of timestamp type.")

def __calc_linear_spark(self, df: DataFrame, ts_col: str, target_col: str):
def __calc_linear_spark(
self, df: DataFrame, ts_col: str, target_col: str
) -> DataFrame:
"""
Native Spark function for calculating linear interpolation on a DataFrame.
Expand Down Expand Up @@ -184,7 +194,7 @@ def __interpolate_column(
return output_df

def __generate_time_series_fill(
self, df: DataFrame, partition_cols: List[str], ts_col: str
self, df: DataFrame, partition_cols: Optional[List[str]], ts_col: str
) -> DataFrame:
"""
Create additional timeseries columns for previous and next timestamps
Expand All @@ -202,7 +212,11 @@ def __generate_time_series_fill(
)

def __generate_column_time_fill(
self, df: DataFrame, partition_cols: List[str], ts_col: str, target_col: str
self,
df: DataFrame,
partition_cols: Optional[List[str]],
ts_col: str,
target_col: str,
) -> DataFrame:
"""
Create timeseries columns for previous and next timestamps for a specific target column
Expand All @@ -212,24 +226,30 @@ def __generate_column_time_fill(
:param ts_col: timestamp column name
:param target_col: target column name
"""
window = Window
if partition_cols is not None:
window = Window.partitionBy(*partition_cols)

return df.withColumn(
f"previous_timestamp_{target_col}",
last(col(f"{ts_col}_{target_col}"), ignorenulls=True).over(
Window.partitionBy(*partition_cols)
.orderBy(ts_col)
.rowsBetween(Window.unboundedPreceding, 0)
window.orderBy(ts_col).rowsBetween(Window.unboundedPreceding, 0)
),
).withColumn(
f"next_timestamp_{target_col}",
last(col(f"{ts_col}_{target_col}"), ignorenulls=True).over(
Window.partitionBy(*partition_cols)
.orderBy(col(ts_col).desc())
.rowsBetween(Window.unboundedPreceding, 0)
window.orderBy(col(ts_col).desc()).rowsBetween(
Window.unboundedPreceding, 0
)
),
)

def __generate_target_fill(
self, df: DataFrame, partition_cols: List[str], ts_col: str, target_col: str
self,
df: DataFrame,
partition_cols: Optional[List[str]],
ts_col: str,
target_col: str,
) -> DataFrame:
"""
Create columns for previous and next value for a specific target column
Expand All @@ -239,39 +259,39 @@ def __generate_target_fill(
:param ts_col: timestamp column name
:param target_col: target column name
"""
window = Window

if partition_cols is not None:
window = Window.partitionBy(*partition_cols)
return (
df.withColumn(
f"previous_{target_col}",
last(df[target_col], ignorenulls=True).over(
Window.partitionBy(*partition_cols)
.orderBy(ts_col)
.rowsBetween(Window.unboundedPreceding, 0)
window.orderBy(ts_col).rowsBetween(Window.unboundedPreceding, 0)
),
)
# Handle if subsequent value is null
.withColumn(
f"next_null_{target_col}",
last(df[target_col], ignorenulls=True).over(
Window.partitionBy(*partition_cols)
.orderBy(col(ts_col).desc())
.rowsBetween(Window.unboundedPreceding, 0)
window.orderBy(col(ts_col).desc()).rowsBetween(
Window.unboundedPreceding, 0
)
),
).withColumn(
f"next_{target_col}",
lead(df[target_col]).over(
Window.partitionBy(*partition_cols).orderBy(ts_col)
),
lead(df[target_col]).over(window.orderBy(ts_col)),
)
)

def interpolate(
self,
tsdf,
tsdf: t_tsdf.TSDF,
ts_col: str,
partition_cols: List[str],
partition_cols: Optional[List[str]],
target_cols: List[str],
freq: str,
func: str,
freq: Optional[str],
func: Optional[Union[Callable | str]],
method: str,
show_interpolated: bool,
perform_checks: bool = True,
Expand All @@ -294,21 +314,34 @@ def interpolate(
self.__validate_fill(method)
self.__validate_col(tsdf.df, partition_cols, target_cols, ts_col)

if freq is None:
raise ValueError("freq cannot be None")

if func is None:
raise ValueError("func cannot be None")

if callable(func):
raise ValueError("func must be a string")

# Convert Frequency using resample dictionary
parsed_freq = checkAllowableFreq(freq)
freq = f"{parsed_freq[0]} {freq_dict[parsed_freq[1]]}"
parsed_freq = t_resample.checkAllowableFreq(freq)
period, unit = parsed_freq[0], parsed_freq[1]
freq = f"{period} {t_resample.freq_dict[unit]}" # type: ignore[literal-required]

# Throw warning for user to validate that the expected number of output rows is valid.
if perform_checks:
calculate_time_horizon(tsdf.df, ts_col, freq, partition_cols)
t_utils.calculate_time_horizon(tsdf.df, ts_col, freq, partition_cols)

# Only select required columns for interpolation
input_cols: List[str] = [*partition_cols, ts_col, *target_cols]
input_cols: List[str] = [ts_col, *target_cols]
if partition_cols is not None:
input_cols += [*partition_cols]

sampled_input: DataFrame = tsdf.df.select(*input_cols)

if self.is_resampled is False:
# Resample and Normalize Input
sampled_input: DataFrame = tsdf.resample(
sampled_input = tsdf.resample(
freq=freq, func=func, metricCols=target_cols
).df

Expand Down Expand Up @@ -367,7 +400,7 @@ def interpolate(
interpolated_result: DataFrame = flagged_series
for target_col in target_cols:
# Interpolate target columns
interpolated_result: DataFrame = self.__interpolate_column(
interpolated_result = self.__interpolate_column(
interpolated_result, ts_col, target_col, method
)

Expand Down
11 changes: 8 additions & 3 deletions python/tempo/intervals.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Optional
from functools import cached_property

import pyspark.sql
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import NumericType, BooleanType, StructField
import pyspark.sql.functions as f
Expand Down Expand Up @@ -31,7 +32,11 @@ class IntervalsDF:
"""

def __init__(
self, df: DataFrame, start_ts: str, end_ts: str, series_ids: list[str] = None
self,
df: DataFrame,
start_ts: str,
end_ts: str,
series_ids: Optional[list[str]] = None,
) -> None:
"""
Constructor for :class:`IntervalsDF`.
Expand Down Expand Up @@ -100,7 +105,7 @@ def metric_columns(self) -> list[str]:
return [col.name for col in self.df.schema.fields if is_metric_col(col)]

@cached_property
def window(self):
def window(self) -> pyspark.sql.window:
return Window.partitionBy(*self.series_ids).orderBy(*self.interval_boundaries)

@classmethod
Expand Down Expand Up @@ -181,7 +186,7 @@ def fromStackedMetrics(

df = (
df.groupBy(start_ts, end_ts, *series)
.pivot(metrics_name_col, values=metric_names) # type: ignore
.pivot(metrics_name_col, values=metric_names)
.max(metrics_value_col)
)

Expand Down
10 changes: 6 additions & 4 deletions python/tempo/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import os
import logging
from collections import deque
import tempo
from typing import Optional

import tempo.tsdf as t_tsdf
import pyspark.sql.functions as f
from pyspark.sql import SparkSession
from pyspark.sql.utils import ParseException
Expand All @@ -12,11 +14,11 @@


def write(
tsdf: tempo.TSDF,
tsdf: t_tsdf.TSDF,
spark: SparkSession,
tabName: str,
optimizationCols: list[str] = None,
):
optimizationCols: Optional[list[str]] = None,
) -> None:
"""
param: tsdf: input TSDF object to write
param: tabName Delta output table name
Expand Down
Loading

0 comments on commit cabe0c4

Please sign in to comment.