diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 0e473a4..8910e5e 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -45,6 +45,7 @@ jobs: poetry run python -m pytest -n auto --disable-warnings --cov=edvart tests/ - name: Lint run: | + poetry run mypy edvart/ poetry run ruff check . poetry run ruff format --check . diff --git a/edvart/export_utils.py b/edvart/export_utils.py index 97a087f..d5283c5 100644 --- a/edvart/export_utils.py +++ b/edvart/export_utils.py @@ -29,6 +29,10 @@ def embed_image_base64(image_path: str, mime: str = "image/png") -> str: # Look up directory where currently executed template is located # Jinja's @environmentfilter or @contextfilter does not seem to provide # any information about the path of the template. - template_dir = os.path.dirname(inspect.getfile(inspect.currentframe().f_back)) + current_frame = inspect.currentframe() + assert current_frame is not None + frame_back = current_frame.f_back + assert frame_back is not None + template_dir = os.path.dirname(inspect.getfile(frame_back)) with open(os.path.join(template_dir, image_path), "rb") as img: return f"data:{mime};base64," + str(base64.b64encode(img.read()).decode("utf-8")) diff --git a/edvart/plots.py b/edvart/plots.py index 4bca094..2013968 100644 --- a/edvart/plots.py +++ b/edvart/plots.py @@ -107,7 +107,7 @@ def _scatter_plot_2d_noninteractive( color_categorical = pd.Categorical(df[color_col]) color_codes = color_categorical.codes else: - color_codes = df[color_col] + color_codes = df[color_col].values.astype(np.signedinteger) scatter = ax.scatter(x, y, c=color_codes, alpha=opacity) if is_color_categorical: diff --git a/edvart/report.py b/edvart/report.py index c0f565f..081c245 100755 --- a/edvart/report.py +++ b/edvart/report.py @@ -5,9 +5,10 @@ import sys import warnings from abc import ABC +from collections.abc import Sized from copy import copy from enum import auto -from typing import List, Optional, Tuple, Union +from typing import Any, Dict, List, Optional, Tuple, Union import isort import nbconvert @@ -83,7 +84,7 @@ def __init__( self.df = dataframe self.sections: list[Section] = [] self.verbosity = Verbosity(verbosity) - self._table_of_contents = None + self._table_of_contents: Optional[TableOfContents] = None def _warn_if_empty(self) -> None: """Warns if the report contains no sections.""" @@ -132,7 +133,7 @@ def _export_data( return ( code_dedent( f""" - df_parquet = BytesIO(base64.b85decode({buffer}.decode())) + df_parquet = BytesIO(base64.b85decode({buffer!r}.decode())) df = pd.read_parquet(df_parquet)""" ), ["import base64", "import pandas as pd", "from io import BytesIO"], @@ -143,7 +144,9 @@ def export_notebook( notebook_filepath: Union[str, os.PathLike], dataset_name: str = "[INSERT DATASET NAME]", dataset_description: str = "[INSERT DATASET DESCRIPTION]", - export_data_mode: ExportDataMode = ExportDataMode.NONE, + # mypy assumes that the type of `ExportDataMode.NONE`` is `auto` instead of `ExportDataMode` + # since `auto()` is assigned to it in the enum + export_data_mode: ExportDataMode = ExportDataMode.NONE, # type: ignore ) -> None: """Exports the report as an .ipynb file. @@ -280,7 +283,7 @@ def _export_html( Maximum number of seconds to wait for a cell to finish execution. """ # Execute notebook to produce output of cells - html_exp_kwargs = dict( + html_exp_kwargs: Dict[str, Any] = dict( preprocessors=[nbconvert.preprocessors.ExecutePreprocessor(timeout=timeout)] ) if template_name is not None: @@ -348,7 +351,7 @@ def export_html( # and unpickles the the whole report object from the decoded binary data unpickle_report = code_dedent( f""" - data = {buffer_base64} + data = {buffer_base64!r} report = pickle.loads(base64.b85decode(data), fix_imports=False) """ ) @@ -750,7 +753,7 @@ def __init__( columns_bivariate_analysis: Optional[List[str]] = None, columns_multivariate_analysis: Optional[List[str]] = None, columns_group_analysis: Optional[List[str]] = None, - groupby: Union[str, List[str]] = None, + groupby: Optional[Union[str, List[str]]] = None, ): super().__init__(dataframe, verbosity) @@ -773,7 +776,7 @@ def __init__( ) if isinstance(groupby, str): color_col = groupby - elif hasattr(groupby, "__len__") and len(groupby) == 1: + elif isinstance(groupby, Sized) and len(groupby) == 1: color_col = groupby[0] else: color_col = None @@ -814,7 +817,7 @@ def __init__( verbosity: Verbosity = Verbosity.LOW, ): super().__init__(dataframe, verbosity) - if not is_date(dataframe.index): + if not is_date(dataframe.index.to_series()): raise ValueError( "Input dataframe needs to be indexed by time." "Please reindex your data to be indexed by either a DatetimeIndex or a PeriodIndex." diff --git a/edvart/report_sections/bivariate_analysis.py b/edvart/report_sections/bivariate_analysis.py index a7e5651..dec6ed1 100644 --- a/edvart/report_sections/bivariate_analysis.py +++ b/edvart/report_sections/bivariate_analysis.py @@ -120,6 +120,8 @@ def __init__( raise ValueError("Either both or neither of columns_x, columns_y must be specified.") # For analyses which do not take columns_pairs, prepare columns_x and columns_y in case # columns_pairs is the only parameter specified + columns_x_no_pairs: Optional[List[str]] + columns_y_no_pairs: Optional[List[str]] if columns is None and columns_x is None and columns_pairs is not None: columns_x_no_pairs = [pair[0] for pair in columns_pairs] columns_y_no_pairs = [pair[1] for pair in columns_pairs] @@ -456,6 +458,7 @@ def _get_columns_x_y( if columns is None: columns = list(df.columns) columns_x = columns_y = columns + assert columns_y is not None columns_x = [col for col in columns_x if is_numeric(df[col])] columns_y = [col for col in columns_y if is_numeric(df[col])] @@ -722,6 +725,7 @@ def include_column(col: str) -> bool: columns_x = columns columns_y = columns if not allow_categorical: + assert columns_y is not None columns_x = list(filter(include_column, columns_x)) columns_y = list(filter(include_column, columns_y)) sns.pairplot(df, x_vars=columns_x, y_vars=columns_y, hue=color_col) @@ -908,6 +912,8 @@ def include_column(col: str) -> bool: if columns_x is None: columns_pairs = list(itertools.combinations(columns, 2)) else: + assert columns_x is not None + assert columns_y is not None columns_pairs = [ (col_x, col_y) for (col_x, col_y) in itertools.product(columns_x, columns_y) @@ -971,7 +977,7 @@ def contingency_table( annot = table.replace(0, "") if hide_zeros else table ax = sns.heatmap( - scaling_func(table), + scaling_func(table.values), annot=annot, fmt="", cbar=False, diff --git a/edvart/report_sections/dataset_overview.py b/edvart/report_sections/dataset_overview.py index 3cdcd10..0d02f13 100644 --- a/edvart/report_sections/dataset_overview.py +++ b/edvart/report_sections/dataset_overview.py @@ -449,8 +449,11 @@ def data_types(df: pd.DataFrame, columns: Optional[List[str]] = None) -> None: """ if columns is not None: df = df[columns] - dtypes = df.apply( - func=lambda x_: str(infer_data_type(x_)), + + # Type ignored because the apply is not properly typed: the type hints for + # the parameter `func` do not cover the complete set of possible inputs. + dtypes: pd.Series[str] = df.apply( + func=lambda x_: str(infer_data_type(x_)), # type: ignore axis=0, result_type="expand", ) @@ -652,7 +655,7 @@ def missing_values( bar_plot_title: str = "Missing Values Percentage of Each Column", bar_plot_ylim: float = 0, bar_plot_color: str = "#FFA07A", - **bar_plot_args: Dict[str, Any], + **bar_plot_args: Any, ) -> None: """Displays a table of missing values percentages for each column of df and a bar plot of the percentages. @@ -675,7 +678,7 @@ def missing_values( Bar plot y axis bottom limit. bar_plot_color : str Color of bars in the bar plot in hex format. - bar_plot_args : Dict[str, Any] + bar_plot_args : Any Additional kwargs passed to pandas.Series.bar. """ if columns is not None: diff --git a/edvart/report_sections/group_analysis.py b/edvart/report_sections/group_analysis.py index 7ad113e..e184e98 100644 --- a/edvart/report_sections/group_analysis.py +++ b/edvart/report_sections/group_analysis.py @@ -1,4 +1,5 @@ -from typing import Any, Callable, Dict, List, Optional, Union +from collections.abc import Hashable +from typing import Any, Callable, Dict, Iterable, List, Optional, Union import colorlover as cl import nbformat.v4 as nbfv4 @@ -102,7 +103,7 @@ def required_imports(self) -> List[str]: "import plotly.graph_objects as go", "from edvart.data_types import infer_data_type, DataType", "from edvart import utils", - "from typing import List, Dict, Optional, Callable", + "from typing import List, Dict, Optional, Callable, Iterable", "from plotly.subplots import make_subplots", ] @@ -218,7 +219,7 @@ def add_cells(self, cells: List[Dict[str, Any]], df: pd.DataFrame) -> None: ) cells.append(nbfv4.new_code_cell(code)) - columns = self.columns if self.columns is not None else df.columns + columns = self.columns if self.columns is not None else df.columns.to_list() if not self.show_statistics and not self.show_dist: return @@ -362,7 +363,7 @@ def within_group_stats( df: pd.DataFrame, groupby: List[str], column: str, - stats: Dict[str, Callable[[pd.Series], float]] = None, + stats: Optional[Dict[str, Callable[[pd.Series], float]]] = None, round_decimals: int = 2, ) -> None: """Display withing group statistics for a column of df grouped by one or other more columns. @@ -448,7 +449,9 @@ def group_missing_values( df_grouped = df.groupby(groupby)[columns] # Calculate number of samples in each group - sizes = df_grouped.size().rename("Group Size") + sizes = df_grouped.size() + assert isinstance(sizes, pd.Series) + sizes = sizes.rename("Group Size") # Calculate missing values percentage of each column for each group missing = df_grouped.apply(lambda g: g.isna().sum(axis=0)) @@ -490,7 +493,7 @@ def color_cell(value): background-color: {bg_hex}; """ - render = final_table.style.applymap( + render = final_table.style.map( func=color_cell, subset=pd.IndexSlice[:, colored_columns] ).format(formatter="{0:.2f} %", subset=pd.IndexSlice[:, colored_columns]) else: @@ -553,7 +556,8 @@ def group_barplot( fig = go.Figure() for color_idx, (idx, row) in enumerate(pivot.iterrows()): - if hasattr(idx, "__len__") and not isinstance(idx, str): + group_name: Hashable + if isinstance(idx, Iterable) and not isinstance(idx, str): group_name = "_".join([str(i) for i in idx]) else: group_name = idx @@ -641,7 +645,8 @@ def overlaid_histograms( ) for color_idx, (name, group) in enumerate(df.groupby(groupby)): - if hasattr(name, "__len__") and not isinstance(name, str): + group_name: Hashable + if isinstance(name, Iterable) and not isinstance(name, str): group_name = "_".join([str(i) for i in name]) else: group_name = name diff --git a/edvart/report_sections/multivariate_analysis.py b/edvart/report_sections/multivariate_analysis.py index 63d194b..c94fbdc 100644 --- a/edvart/report_sections/multivariate_analysis.py +++ b/edvart/report_sections/multivariate_analysis.py @@ -1,5 +1,5 @@ from enum import IntEnum -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple, Union import matplotlib.pyplot as plt import nbformat.v4 as nbfv4 @@ -487,7 +487,7 @@ def pca_explained_variance( plt.ylabel("Explained variance ratio") plt.xticks( ticks=range(len(pca.explained_variance_ratio_)), - labels=range(1, (len(pca.explained_variance_ratio_) + 1)), + labels=[str(label) for label in range(1, (len(pca.explained_variance_ratio_) + 1))], ) if show_grid: plt.grid() @@ -630,13 +630,15 @@ def parallel_coordinates( columns = [col for col in columns if col not in hide_columns] if drop_na: df = df.dropna() + + line: Optional[Dict[str, Any]] = None if color_col is not None: is_categorical_color = infer_data_type(df[color_col]) in ( DataType.CATEGORICAL, DataType.UNIQUE, DataType.BOOLEAN, ) - + colorscale: Union[List[Tuple[float, str]], str] if is_categorical_color: categories = df[color_col].unique() colorscale = get_default_discrete_colorscale(n_colors=len(categories)) @@ -669,8 +671,6 @@ def parallel_coordinates( "cmax": len(categories) - 0.5, } ) - else: - line = None # Add numeric columns to dimensions dimensions = [{"label": col_name, "values": df[col_name]} for col_name in numeric_columns] # Add categorical columns to dimensions @@ -818,12 +818,15 @@ def parallel_categories( columns = [col for col in columns if col not in hide_columns] if drop_na: df = df.dropna() + + line: Optional[Dict[str, Any]] = None if color_col is not None: categorical_color = infer_data_type(df[color_col]) in ( DataType.CATEGORICAL, DataType.UNIQUE, DataType.BOOLEAN, ) + colorscale: Union[List[Tuple[float, str]], str] if categorical_color: categories = df[color_col].unique() colorscale = get_default_discrete_colorscale(n_colors=len(categories)) @@ -833,14 +836,15 @@ def parallel_categories( color_series = df[color_col] colorscale = "Bluered_r" + colorbar: Dict[str, Any] = {"title": color_col} line = { "color": color_series, "colorscale": colorscale, - "colorbar": {"title": color_col}, + "colorbar": colorbar, } if categorical_color: - line["colorbar"].update( + colorbar.update( { "tickvals": color_series.unique(), "ticktext": categories, @@ -855,8 +859,6 @@ def parallel_categories( "cmax": len(categories) - 0.5, } ) - else: - line = None dimensions = [go.parcats.Dimension(values=df[col_name], label=col_name) for col_name in columns] diff --git a/edvart/report_sections/table_of_contents.py b/edvart/report_sections/table_of_contents.py index 198dcd5..b41117d 100644 --- a/edvart/report_sections/table_of_contents.py +++ b/edvart/report_sections/table_of_contents.py @@ -94,7 +94,7 @@ def show(self, sections: List[Section]) -> None: """ display(Markdown(self._title)) - lines = [] + lines: List[str] = [] for section in sections: self._add_section_lines(section, 1, lines, self._include_subsections) display(Markdown("\n".join(lines))) diff --git a/edvart/report_sections/timeseries_analysis/boxplots_over_time.py b/edvart/report_sections/timeseries_analysis/boxplots_over_time.py index 61f756d..6fe80ff 100644 --- a/edvart/report_sections/timeseries_analysis/boxplots_over_time.py +++ b/edvart/report_sections/timeseries_analysis/boxplots_over_time.py @@ -1,4 +1,3 @@ -from datetime import datetime from itertools import takewhile from typing import Any, Callable, Dict, List, Optional, Tuple @@ -48,7 +47,7 @@ def __init__( self, verbosity: Verbosity = Verbosity.LOW, columns: Optional[List[str]] = None, - grouping_function: Callable[[Any], str] = None, + grouping_function: Optional[Callable[[Any], str]] = None, grouping_function_imports: Optional[List[str]] = None, grouping_name: Optional[str] = None, default_nunique_max: int = 80, @@ -161,7 +160,7 @@ def show(self, df: pd.DataFrame) -> None: ) -def default_grouping_functions() -> Dict[str, Callable[[datetime], str]]: +def default_grouping_functions() -> Dict[str, Callable[[pd.Timestamp], str]]: """Return a dictionary of function names and functions. The function takes a pandas datetime and represents it as a rougher (in terms of time) @@ -170,7 +169,7 @@ def default_grouping_functions() -> Dict[str, Callable[[datetime], str]]: Returns ------- - Dict[str, Callable[[datetime], str]] + Dict[str, Callable[[pandas.Timestamp], str]] Dictionary from grouping function names to grouping functions. """ return { @@ -217,7 +216,7 @@ def get_default_grouping_func(df: pd.DataFrame, nunique_max: int = 80) -> Tuple[ def show_boxplots_over_time( df: pd.DataFrame, columns: Optional[List[str]] = None, - grouping_function: Callable[[Any], str] = None, + grouping_function: Optional[Callable[[Any], str]] = None, grouping_name: Optional[str] = None, default_nunique_max: int = 80, figsize: Tuple[float, float] = (20, 7), @@ -264,7 +263,7 @@ def show_boxplots_over_time( grouping_name, grouping_function = get_default_grouping_func( df, nunique_max=default_nunique_max ) - elif default_grouping_funcs.get(grouping_name) is not None: + elif grouping_name is not None and default_grouping_funcs.get(grouping_name) is not None: grouping_function = default_grouping_funcs[grouping_name] if columns is None: diff --git a/edvart/report_sections/timeseries_analysis/fourier_transform.py b/edvart/report_sections/timeseries_analysis/fourier_transform.py index f28df76..f625ec7 100644 --- a/edvart/report_sections/timeseries_analysis/fourier_transform.py +++ b/edvart/report_sections/timeseries_analysis/fourier_transform.py @@ -145,7 +145,7 @@ def show_fourier_transform( for col in columns: if not is_numeric(df[col]): raise ValueError(f"Cannot perform Fourier transform for non-numeric column `{col}`") - index_freq = pd.infer_freq(df.index) or "" + index_freq = pd.infer_freq(df.index.to_series()) or "" for col in columns: # FFT requires samples at regular intervals df_col = df[col].interpolate(method="time") diff --git a/edvart/report_sections/timeseries_analysis/rolling_statistics.py b/edvart/report_sections/timeseries_analysis/rolling_statistics.py index 32cb65a..752b5ab 100644 --- a/edvart/report_sections/timeseries_analysis/rolling_statistics.py +++ b/edvart/report_sections/timeseries_analysis/rolling_statistics.py @@ -3,6 +3,7 @@ import nbformat.v4 as nbfv4 import pandas as pd +import plotly import plotly.graph_objects as go from IPython.display import Markdown, display @@ -161,8 +162,7 @@ def show_rolling_statistics( index = df.index[window_size - 1 :] layout = dict(xaxis_rangeslider_visible=True) - - data = [] + data: List[List[plotly.basedatatypes.BaseTraceType]] = [] for col in columns: data.append([]) if show_std_dev: diff --git a/edvart/report_sections/timeseries_analysis/seasonal_decomposition.py b/edvart/report_sections/timeseries_analysis/seasonal_decomposition.py index 40a25e8..991939f 100644 --- a/edvart/report_sections/timeseries_analysis/seasonal_decomposition.py +++ b/edvart/report_sections/timeseries_analysis/seasonal_decomposition.py @@ -149,7 +149,7 @@ def show_seasonal_decomposition( If the input data is not indexed by time in ascending order. """ df = df.interpolate(method="time") - if pd.infer_freq(df.index) is None and period is None: + if pd.infer_freq(df.index.to_series()) is None and period is None: display( Markdown( "