Skip to content

Commit

Permalink
Merge pull request #41 from blab/temporal_aggregation
Browse files Browse the repository at this point in the history
Adding temporal aggregation
  • Loading branch information
marlinfiggins authored Nov 8, 2024
2 parents 711b2cc + 1f10034 commit ed301ff
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 0 deletions.
17 changes: 17 additions & 0 deletions evofr/data/hier_frequencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from .data_helpers import format_var_names, prep_dates
from .data_spec import DataSpec
from .temporal_aggregation import aggregate_temporally_hierarchical
from .variant_frequencies import VariantFrequencies


Expand All @@ -15,6 +16,7 @@ def __init__(
group: str,
date_to_index: Optional[dict] = None,
pivot: Optional[str] = None,
aggregation_frequency: Optional[str] = None,
):
"""Construct a data specification for handling variant frequencies
in hierarchical models.
Expand All @@ -36,6 +38,10 @@ def __init__(
Defaults to "other" if present otherwise.
This will usually used as a reference or pivot strain.
aggregation_frequency:
optional temporal frequency used to aggregate daily counts to
larger time periods such as "W" (week) or "M" (month).
Returns
-------
HierFrequencies
Expand All @@ -51,6 +57,17 @@ def __init__(
self.var_names = format_var_names(raw_var_names, pivot=pivot)
self.pivot = self.var_names[-1]

# Aggregate counts into larger windows
self.aggregation_frequency = aggregation_frequency
if self.aggregation_frequency is not None:
(
self.groups,
self.dates,
self.date_to_index,
) = aggregate_temporally_hierarchical(
self.groups, self.dates, self.aggregation_frequency
)

# Loop each group
grouped = raw_seq.groupby(group)
self.names = [name for name, _ in grouped]
Expand Down
46 changes: 46 additions & 0 deletions evofr/data/temporal_aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pandas as pd


def aggregate_temporally(seq_counts, dates, frequency):
"""
Aggregates time-series data based on a specified frequency (e.g., weekly, monthly).
Full set of specfications for here: https://pandas.pydata.org/docs/dev/user_guide/timeseries.html#timeseries-offset-aliases
Parameters:
- 'seq_counts' (numpy.ndarray): A 2D array where each row corresponds to a time point and columns
- 'dates' (list of pandas.Timestamp): A list of timestamps corresponding to each row in 'seq_counts'.
- frequency (str): A string representing the frequency of aggregation, according to pandas offset aliases.
Examples include 'W-SUN' for weekly aggregation ending on Sunday, 'M' for monthly.
Returns:
- 'seq_counts_agg' (numpy.ndarray): A 2D array where each row corresponds to aggregated counts
- 'dates_agg' (list of pandas.Timestamp): A list of timestamps corresponding to each row in 'seq_counts'.
- 'date_to_index' (dict): A dictionary mapping timestamps to row in 'seq_counts_agg'
"""
columns_seq_counts = [f"seq_{i}" for i in range(seq_counts.shape[1])]
df = pd.DataFrame(seq_counts, index=dates, columns=columns_seq_counts)

# Grouping the data according to the specified frequency
grouped = df.groupby(pd.Grouper(freq=frequency)).sum()

seq_counts_agg = grouped[columns_seq_counts].values
dates_agg = list(grouped.index)
date_to_index = {d: i for (i, d) in enumerate(dates_agg)}
return seq_counts_agg, dates_agg, date_to_index


def aggregate_temporally_hierarchical(groups, dates, frequency):
"""
Applies `aggregate_temporally` to each group within a hierarchical model.
"""
for group in groups:
seq_counts, dates_agg, date_to_index = aggregate_temporally(
group.seq_counts, dates, frequency
)
group.seq_counts = seq_counts
group.dates = dates_agg
group.date_to_index = date_to_index

return groups, dates_agg, date_to_index
17 changes: 17 additions & 0 deletions evofr/data/variant_frequencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from .data_helpers import prep_dates, prep_sequence_counts
from .data_spec import DataSpec
from .temporal_aggregation import aggregate_temporally

VARIANT_NAMES = ["Variant", "other"]
START_DATE = pd.to_datetime("2022-01-01")
Expand Down Expand Up @@ -66,6 +67,7 @@ def __init__(
date_to_index: Optional[dict] = None,
var_names: Optional[List] = None,
pivot: Optional[str] = None,
aggregation_frequency: Optional[str] = None,
):
"""Construct a data specification for handling variant frequencies.
Expand All @@ -88,6 +90,10 @@ def __init__(
This will usually used as a reference or pivot strain.
Can only be used if you do not set `var_names`.
aggregation_frequency:
optional temporal frequency used to aggregate daily counts to
larger time periods such as "W" (week) or "M" (month).
Returns
-------
VariantFrequencies
Expand All @@ -106,6 +112,17 @@ def __init__(
)
self.pivot = self.var_names[-1]

# Aggregate counts into larger windows
self.aggregation_frequency = aggregation_frequency
if self.aggregation_frequency is not None:
(
self.seq_counts,
self.dates,
self.date_to_index,
) = aggregate_temporally(
self.seq_counts, self.dates, self.aggregation_frequency
)

def make_data_dict(self, data: Optional[dict] = None) -> dict:
if data is None:
data = dict()
Expand Down

0 comments on commit ed301ff

Please sign in to comment.