Skip to content

Commit

Permalink
#104 Parallel Processing
Browse files Browse the repository at this point in the history
  • Loading branch information
mdancho84 committed Oct 11, 2023
1 parent 9354a23 commit 52ef1b8
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 91 deletions.
54 changes: 18 additions & 36 deletions src/pytimetk/core/anomaly.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def anomalize(
df = tk.load_dataset("walmart_sales_weekly", parse_dates=["Date"])[["id", "Date", "Weekly_Sales"]]
anomalize_df = df.groupby('id').anomalize("Date", "Weekly_Sales", period = 52, trend = 52)
anomalize_df = df.groupby('id').anomalize("Date", "Weekly_Sales", period = 52, trend = 52, threads = 1)
# Visualize the results
anomalize_df[["id", "Date", "observed", "seasonal", "trend", "remainder"]] \
Expand Down Expand Up @@ -172,41 +172,23 @@ def anomalize(

group_names = data.grouper.names

if threads == 1:

result = data \
.apply(
_anomalize,
date_column=date_column,
value_column=value_column,
period=period,
trend=trend,
method=method,
decomp=decomp,
clean=clean,
iqr_alpha=iqr_alpha,
max_anomalies=max_anomalies,
bind_data=bind_data,
verbose=verbose,
).reset_index(level=group_names)
else:
result = parallel_apply(
data,
_anomalize,
date_column=date_column,
value_column=value_column,
period=period,
trend=trend,
method=method,
decomp=decomp,
clean=clean,
iqr_alpha=iqr_alpha,
max_anomalies=max_anomalies,
bind_data=bind_data,
threads=threads,
show_progress=show_progress,
verbose=verbose,
).reset_index(level=group_names)
result = parallel_apply(
data,
_anomalize,
date_column=date_column,
value_column=value_column,
period=period,
trend=trend,
method=method,
decomp=decomp,
clean=clean,
iqr_alpha=iqr_alpha,
max_anomalies=max_anomalies,
bind_data=bind_data,
threads=threads,
show_progress=show_progress,
verbose=verbose,
).reset_index(level=group_names)

return result

Expand Down
6 changes: 2 additions & 4 deletions src/pytimetk/core/ts_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def ts_features(
value_column = 'value',
features = [acf_features, hurst],
freq = 7,
threads = 1,
threads = 2,
show_progress = True
)
)
Expand Down Expand Up @@ -243,9 +243,7 @@ def ts_features(
ts_features = []
for name, group in conditional_tqdm(construct_df.groupby('unique_id'), total=len(construct_df.unique_id.unique()), desc="Processing", display=show_progress):
result = partial_get_feats(name, group, features = features)
ts_features.append(result)


ts_features.append(result)

# Combine the list to a DataFrame
ts_features = pd.concat(ts_features).rename_axis('unique_id')
Expand Down
97 changes: 46 additions & 51 deletions src/pytimetk/core/ts_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@

from pytimetk.utils.checks import check_dataframe_or_groupby, check_date_column, check_series_or_datetime

from pytimetk.utils.parallel_helpers import parallel_apply



@pf.register_dataframe_method
def ts_summary(
data: Union[pd.DataFrame, pd.core.groupby.generic.DataFrameGroupBy],
date_column: str,
threads = 1,
show_progress = True,
) -> pd.DataFrame:
'''Computes summary statistics for a time series data, either for the entire dataset or grouped by a specific column.
Expand Down Expand Up @@ -64,11 +67,23 @@ def ts_summary(
```
```{python}
# Grouped ts_summary
df = tk.load_dataset('stocks_daily', parse_dates = ['date'])
df.groupby('symbol').ts_summary(date_column = 'date')
```
```{python}
# Parallelized grouped ts_summary
df \
.groupby('symbol') \
.ts_summary(
date_column = 'date',
threads = 2,
show_progress = True
)
```
```{python}
# See also:
tk.get_date_summary(df['date'])
Expand All @@ -88,64 +103,44 @@ def ts_summary(

if isinstance(data, pd.DataFrame):

df = data.copy()
df.sort_values(by=[date_column], inplace=True)

date = df[date_column]

# Compute summary statistics
date_summary = get_date_summary(date)
frequency_summary = get_frequency_summary(date)
diff_summary = get_diff_summary(date)
diff_summary_num = get_diff_summary(date, numeric = True)

# Combine summary statistics into a single DataFrame
return pd.concat([date_summary, frequency_summary, diff_summary, diff_summary_num], axis = 1)
return _ts_summary(data, date_column)

if isinstance(data, pd.core.groupby.generic.DataFrameGroupBy):

group_names = data.grouper.names
data = data.obj

df = data.copy()

df.sort_values(by=[*group_names, date_column], inplace=True)

# Get unique group combinations
groups = df[group_names].drop_duplicates()

summary_dfs = []
for idx, group in groups.iterrows():
mask = (df[group_names] == group).all(axis=1)
group_df = df[mask]


# Get group columns
id_df = pd.DataFrame(group).T.reset_index(drop=True)
# print(id_df)

date = group_df[date_column]

# Compute summary statistics
date_summary = get_date_summary(date)
frequency_summary = get_frequency_summary(date)
diff_summary = get_diff_summary(date)
diff_summary_num = get_diff_summary(date, numeric = True)

unique_id = ' | '.join(group.values)

# Combine summary statistics into a single DataFrame
summary_df = pd.concat([id_df, date_summary, frequency_summary, diff_summary, diff_summary_num], axis = 1)

# Append to list of summary DataFrames
summary_dfs.append(summary_df)

return pd.concat(summary_dfs, axis = 0).reset_index(drop=True)
result = data.parallel_apply(
func = lambda group: _ts_summary(group, date_column),
threads = threads,
show_progress = show_progress,
)
result = result.reset_index(drop=True)
return result

# Monkey patch the method to pandas groupby objects
pd.core.groupby.generic.DataFrameGroupBy.ts_summary = ts_summary



def _ts_summary(group: pd.DataFrame, date_column: str) -> pd.DataFrame:
"""Compute time series summary for a single group."""

# Make sure date is sorted
group = group.sort_values(by=date_column)

date = group[date_column]

# Compute summary statistics
date_summary = get_date_summary(date)
frequency_summary = get_frequency_summary(date)
diff_summary = get_diff_summary(date)
diff_summary_num = get_diff_summary(date, numeric=True)

# Combine summary statistics into a single DataFrame
result = pd.concat([date_summary, frequency_summary, diff_summary, diff_summary_num], axis=1)

# Add group columns back
for col in group.columns.difference([date_column]):
result[col] = group[col].iloc[0]

return result

def get_diff_summary(idx: Union[pd.Series, pd.DatetimeIndex], numeric: bool = False):
'''Calculates summary statistics of the time differences between consecutive values in a datetime index.
Expand Down

0 comments on commit 52ef1b8

Please sign in to comment.