Skip to content

Commit

Permalink
DRY get_timeseries and get_timeseries_with_resampling.
Browse files Browse the repository at this point in the history
Move parsing of variables into module-level function.
  • Loading branch information
cipherself committed Feb 14, 2024
1 parent fe1d6b0 commit 94ab135
Showing 1 changed file with 112 additions and 95 deletions.
207 changes: 112 additions & 95 deletions src/enlyze/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import abc
from datetime import datetime
from functools import cache, reduce
from typing import Iterator, Mapping, Optional, Sequence
from typing import Iterator, Mapping, Optional, Sequence, Tuple, Union
from uuid import UUID

import enlyze.api_clients.timeseries.models as timeseries_api_models
Expand Down Expand Up @@ -44,6 +45,36 @@ def _get_timeseries_data_from_pages(
return timeseries_data


def _get_variables_sequence_and_query_parameter_list(
variables: Union[
Sequence[user_models.Variable],
Mapping[user_models.Variable, user_models.ResamplingMethod],
],
resampling_interval: Optional[int],
) -> Tuple[Sequence[user_models.Variable], Sequence[str]]:
if isinstance(variables, abc.Sequence) and resampling_interval is not None:
raise ValueError("`variables` must be a mapping {variable: ResamplingMethod}")

if resampling_interval:
validate_resampling_interval(resampling_interval)
variables_sequence = []
variables_query_parameter_list = []
for variable, resampling_method in variables.items():
variables_sequence.append(variable)
variables_query_parameter_list.append(
f"{variable.uuid}"
f"{VARIABLE_UUID_AND_RESAMPLING_METHOD_SEPARATOR}"
f"{resampling_method.value}"
)

validate_resampling_method_for_data_type(
resampling_method, variable.data_type
)
return variables_sequence, variables_query_parameter_list

return variables, [str(v.uuid) for v in variables]


class EnlyzeClient:
"""Main entrypoint for interacting with the ENLYZE platform.
Expand Down Expand Up @@ -152,57 +183,67 @@ def get_variables(
for variable in self._get_variables(machine.uuid)
]

def get_timeseries(
def _get_paginated_timeseries(
self,
*,
machine_uuid: str,
start: datetime,
end: datetime,
variables: Sequence[user_models.Variable],
) -> Optional[user_models.TimeseriesData]:
"""Get timeseries data of :ref:`variables <variable>` for a given time frame.
Timeseries data for multiple variables can be requested at once. However, all
variables must belong to the same machine.
You should always pass :ref:`timezone-aware datetime
<python:datetime-naive-aware>` objects to this method! If you don't, naive
datetime objects will be assumed to be expressed in the local timezone of the
system where the code is run.
:param start: Start of the time frame for which to fetch timeseries data. Must
not be before ``end``.
:param end: End of the time frame for which to fetch timeseries data.
:param variables: The variables for which to fetch timeseries data.
:raises: |token-error|
:raises: |generic-error|
variables: Sequence[str],
resampling_interval: Optional[int],
) -> Iterator[timeseries_api_models.TimeseriesData]:
params = {
"appliance": machine_uuid,
"start_datetime": start.isoformat(),
"end_datetime": end.isoformat(),
"variables": ",".join(variables),
}

:returns: Timeseries data or ``None`` if the API returned no data for the
request
if resampling_interval:
params["resampling_interval"] = resampling_interval

"""
return self._timeseries_api_client.get_paginated(
"timeseries", timeseries_api_models.TimeseriesData, params=params
)

start, end, machine_uuid = validate_timeseries_arguments(start, end, variables)
def _get_timeseries(
self,
start: datetime,
end: datetime,
variables: Union[
Sequence[user_models.Variable],
Mapping[user_models.Variable, user_models.ResamplingMethod],
],
resampling_interval: Optional[int] = None,
) -> Optional[user_models.TimeseriesData]:
try:
variables_sequence, variables_query_parameter_list = (
_get_variables_sequence_and_query_parameter_list(
variables, resampling_interval
)
)
except ValueError as e:
raise EnlyzeError from e

variables_uuids = [str(v.uuid) for v in variables]
start, end, machine_uuid = validate_timeseries_arguments(
start, end, variables_sequence
)

try:
chunks = chunk(
variables_uuids, MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST
variables_query_parameter_list,
MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST,
)
except ValueError as e:
raise EnlyzeError from e

chunks_pages = (
self._timeseries_api_client.get_paginated(
"timeseries",
timeseries_api_models.TimeseriesData,
params={
"appliance": machine_uuid,
"start_datetime": start.isoformat(),
"end_datetime": end.isoformat(),
"variables": ",".join(chunk),
},
self._get_paginated_timeseries(
machine_uuid=machine_uuid,
start=start,
end=end,
variables=chunk,
resampling_interval=resampling_interval,
)
for chunk in chunks
)
Expand All @@ -218,9 +259,41 @@ def get_timeseries(
return timeseries_data.to_user_model(
start=start,
end=end,
variables=variables,
variables=variables_sequence,
)

def get_timeseries(
self,
start: datetime,
end: datetime,
variables: Sequence[user_models.Variable],
) -> Optional[user_models.TimeseriesData]:
"""Get timeseries data of :ref:`variables <variable>` for a given time frame.
Timeseries data for multiple variables can be requested at once. However, all
variables must belong to the same machine.
You should always pass :ref:`timezone-aware datetime
<python:datetime-naive-aware>` objects to this method! If you don't, naive
datetime objects will be assumed to be expressed in the local timezone of the
system where the code is run.
:param start: Start of the time frame for which to fetch timeseries data. Must
not be before ``end``.
:param end: End of the time frame for which to fetch timeseries data.
:param variables: The variables for which to fetch timeseries data.
:raises: |token-error|
:raises: |generic-error|
:returns: Timeseries data or ``None`` if the API returned no data for the
request
"""

return self._get_timeseries(start, end, variables)

def get_timeseries_with_resampling(
self,
start: datetime,
Expand Down Expand Up @@ -259,63 +332,7 @@ def get_timeseries_with_resampling(
request
""" # noqa: E501
variables_sequence = []
variables_query_parameter_list = []
for variable, resampling_method in variables.items():
variables_sequence.append(variable)
variables_query_parameter_list.append(
f"{variable.uuid}"
f"{VARIABLE_UUID_AND_RESAMPLING_METHOD_SEPARATOR}"
f"{resampling_method.value}"
)

validate_resampling_method_for_data_type(
resampling_method, variable.data_type
)

start, end, machine_uuid = validate_timeseries_arguments(
start,
end,
variables_sequence,
)
validate_resampling_interval(resampling_interval)

try:
chunks = chunk(
variables_query_parameter_list,
MAXIMUM_NUMBER_OF_VARIABLES_PER_TIMESERIES_REQUEST,
)
except ValueError as e:
raise EnlyzeError from e

chunks_pages = (
self._timeseries_api_client.get_paginated(
"timeseries",
timeseries_api_models.TimeseriesData,
params={
"appliance": machine_uuid,
"start_datetime": start.isoformat(),
"end_datetime": end.isoformat(),
"variables": ",".join(chunk),
"resampling_interval": resampling_interval,
},
)
for chunk in chunks
)

timeseries_data_chunked = [
_get_timeseries_data_from_pages(pages) for pages in chunks_pages
]
if not timeseries_data_chunked or None in timeseries_data_chunked:
return None

timeseries_data = reduce(lambda x, y: x.merge(y), timeseries_data_chunked)

return timeseries_data.to_user_model(
start=start,
end=end,
variables=variables_sequence,
)
return self._get_timeseries(start, end, variables, resampling_interval)

def _get_production_runs(
self,
Expand Down

0 comments on commit 94ab135

Please sign in to comment.