Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added aggrgate_type argument in aggregate class method for SingleTime… #52

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 44 additions & 13 deletions src/infrasys/time_series_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def check_data(
return data

@classmethod
def aggregate(cls, ts_data: list[Self]) -> Self:
def aggregate(cls, ts_data: list[Self], agg_type: Literal["sum", "avg"] = "sum") -> Self:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should explore to have this method as a standalone function. It feels unnatural to me that we define a method that can take multiple instances of itself. I am open for discussion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the convention we have is to return "SingleTimeSeries" instead of self

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I thought at first also. However, https://peps.python.org/pep-0673/#use-in-classmethod-signatures

"""Method to aggregate list of SingleTimeSeries data.

Parameters
Expand All @@ -119,7 +119,7 @@ def aggregate(cls, ts_data: list[Self]) -> Self:
InconsistentTimeseriesAggregation
Raised if incompatible timeseries data are passed.
"""

agg_func = {"sum": sum, "avg": lambda x: sum(x) / len(x)}[agg_type]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Peaky detail, but I am not a fam of defining functions inside functions. We can define this methods outside maybe?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, wouldn't it be more consistent if we use numpy functions?

# Extract unique properties from ts_data
unique_props = {
"length": {data.length for data in ts_data},
Expand All @@ -136,25 +136,24 @@ def aggregate(cls, ts_data: list[Self]) -> Self:
raise InconsistentTimeseriesAggregation(msg)

# Aggregate data
is_quantity = issubclass(next(iter(unique_props["data_type"])), BaseQuantity)
magnitude_type = (
type(ts_data[0].data.magnitude)
if is_quantity
else next(iter(unique_props["data_type"]))
)
data_type = next(iter(unique_props["data_type"]))
is_quantity = issubclass(data_type, BaseQuantity)
magnitude_type = type(ts_data[0].data.magnitude) if is_quantity else data_type

# Aggregate data based on magnitude type
if issubclass(magnitude_type, pa.Array):
new_data = sum(
if issubclass(magnitude_type, pa.Array) and is_quantity:
new_data = agg_func(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could simply this cases. In all
the instances you have array and if it is pa.array we just need to convert it to numpy.

[
data.data.to_numpy() * (data.data.units if is_quantity else 1)
data.data.__class__(np.array(data.data.magnitude)) * data.data.units
for data in ts_data
]
)
elif issubclass(magnitude_type, pa.Array) and not is_quantity:
new_data = agg_func([data.data.to_numpy() for data in ts_data])
elif issubclass(magnitude_type, np.ndarray):
new_data = sum([data.data for data in ts_data])
new_data = agg_func([data.data for data in ts_data])
elif issubclass(magnitude_type, list) and not is_quantity:
new_data = sum([np.array(data) for data in ts_data])
new_data = agg_func([np.array(data) for data in ts_data])
else:
msg = f"Unsupported data type for aggregation: {magnitude_type}"
raise TypeError(msg)
Expand Down Expand Up @@ -264,6 +263,38 @@ def from_time_array(
def get_time_series_metadata_type() -> Type:
return SingleTimeSeriesMetadata

# Support for scalar * SingleTimeSeries
def __mul__(self, scalar: int | float | BaseQuantity) -> Self:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally do not like this. Do we really need this?

if isinstance(scalar, (int, float, BaseQuantity)):
is_quantity = issubclass(type(self.data), BaseQuantity)
data_type = (
type(self.data.magnitude) if hasattr(self.data, "magnitude") else type(self.data)
)

if is_quantity and issubclass(data_type, pa.Array):
data_new = (
self.data.__class__(np.array(self.data.magnitude), self.data.units) * scalar
)
elif not is_quantity and issubclass(data_type, pa.Array):
data_new = self.data.to_numpy() * scalar
elif not is_quantity and issubclass(data_type, list):
data_new = np.array(self.data) * scalar
elif issubclass(data_type, np.ndarray):
data_new = self.data * scalar
else:
msg = f"Unsupported {data_type=}, {is_quantity=}"
raise TypeError(msg)

return SingleTimeSeries(
data=data_new,
variable_name=self.variable_name,
normalization=self.normalization,
resolution=self.resolution,
initial_time=self.initial_time,
)
msg = "Multiplication only supported with int, float or BaseQuantity"
raise TypeError(msg)


class SingleTimeSeriesScalingFactor(SingleTimeSeries):
"""Defines a time array with a single dimension of floats that are 0-1 scaling factors."""
Expand Down
18 changes: 18 additions & 0 deletions tests/test_single_time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,21 @@ def test_pint_array_aggregate():
ts_agg = SingleTimeSeries.aggregate([ts1, ts2])
assert isinstance(ts_agg, SingleTimeSeries)
assert list(ts_agg.data.magnitude) == [2.2, 4.4, 6.6, 9, 11]
ts_agg = SingleTimeSeries.aggregate([ts1, ts2], "avg")
assert isinstance(ts_agg, SingleTimeSeries)
assert list(ts_agg.data.magnitude) == [1.1, 2.2, 3.3, 4.5, 5.5]


def test_scalar_single_timeseries_multiplier():
length = 10
initial_time = datetime(year=2020, month=1, day=1)
time_array = [initial_time + timedelta(hours=i) for i in range(length)]
data = ActivePower([1.1, 2.2, 3.3, 4.5, 5.5], "kilowatts")
variable_name = "active_power"
ts1 = SingleTimeSeries.from_time_array(data, variable_name, time_array, normalization=None)
ts2 = ts1 * 2
assert isinstance(ts2, SingleTimeSeries)
assert list(ts2.data.magnitude) == [2.2, 4.4, 6.6, 9, 11]
ts3 = ts1 * ActivePower(2, "kilowatts")
assert isinstance(ts3, SingleTimeSeries)
assert list(ts3.data.magnitude) == [2.2, 4.4, 6.6, 9, 11]
Loading