From c1fccd739fb3c1332fa189420d300a307e264278 Mon Sep 17 00:00:00 2001 From: Oliver Bristow Date: Mon, 7 Dec 2020 12:27:07 +0000 Subject: [PATCH] Clean up notes and some query_operator names --- heron/tools/tracker/src/python/app.py | 3 +-- .../tracker/src/python/query_operators.py | 25 +++++++++---------- .../tracker/src/python/routers/topologies.py | 10 -------- heron/tools/tracker/src/python/utils.py | 2 -- .../tracker/tests/python/query_unittest.py | 14 +++++------ tools/python/checkstyle.ini | 1 - 6 files changed, 20 insertions(+), 35 deletions(-) diff --git a/heron/tools/tracker/src/python/app.py b/heron/tools/tracker/src/python/app.py index 4a769ede4c7..572854a5d8a 100644 --- a/heron/tools/tracker/src/python/app.py +++ b/heron/tools/tracker/src/python/app.py @@ -41,7 +41,6 @@ {"name": "topologies", "description": topologies.__doc__}, ] -# TODO: implement a 120s timeout to be consistent with previous implementation app = FastAPI( title="Heron Tracker", redoc_url="/", @@ -85,7 +84,7 @@ async def shutdown_event(): @app.exception_handler(Exception) async def handle_exception(_, exc: Exception): payload = ResponseEnvelope[str]( - result="", + result=None, execution_time=0.0, message=f"request failed: {exc}", status=constants.RESPONSE_STATUS_FAILURE diff --git a/heron/tools/tracker/src/python/query_operators.py b/heron/tools/tracker/src/python/query_operators.py index fd5280e9c67..8475e123f5d 100644 --- a/heron/tools/tracker/src/python/query_operators.py +++ b/heron/tools/tracker/src/python/query_operators.py @@ -212,24 +212,23 @@ class Sum(Operator): """ # pylint: disable=super-init-not-called def __init__(self, children) -> None: - self.timeSeriesList = children + self.time_series_list = children async def execute(self, tracker, tmanager: TManagerLocation, start: int, end: int) -> Any: # Initialize the metric to be returned with sum of all the constants. result = Metrics(None, None, None, start, end, {}) - constant_sum = sum(ts for ts in self.timeSeriesList if isinstance(ts, float)) + constant_sum = sum(ts for ts in self.time_series_list if isinstance(ts, float)) result.setDefault(constant_sum, start, end) futureMetrics = [ ts.execute(tracker, tmanager, start, end) - for ts in self.timeSeriesList if isinstance(ts, Operator) + for ts in self.time_series_list if isinstance(ts, Operator) ] # Get all the timeseries metrics all_metrics = [] for met_f in asyncio.as_completed(futureMetrics): met = await met_f - # TODO: change this interface from str to plain raise (raised on await) if isinstance(met, str): raise Exception(met) all_metrics.extend(met) @@ -256,18 +255,18 @@ class Max(Operator): def __init__(self, children): if len(children) < 1: raise Exception("MAX expects at least one operand.") - self.timeSeriesList = children + self.time_series_list = children async def execute(self, tracker, tmanager: TManagerLocation, start: int, end: int) -> Any: # Initialize the metric to be returned with max of all the constants. result = Metrics(None, None, None, start, end, {}) - constants = [ts for ts in self.timeSeriesList if isinstance(ts, float)] + constants = [ts for ts in self.time_series_list if isinstance(ts, float)] if constants: result.setDefault(max(constants), start, end) futureMetrics = [ ts.execute(tracker, tmanager, start, end) - for ts in self.timeSeriesList if isinstance(ts, Operator) + for ts in self.time_series_list if isinstance(ts, Operator) ] # Get all the timeseries metrics @@ -313,12 +312,12 @@ def __init__(self, children): if not 0 <= quantile <= 100: raise Exception("Quantile must be between 0 and 100 inclusive.") self.quantile = quantile - self.timeSeriesList = timeseries_list + self.time_series_list = timeseries_list async def execute(self, tracker, tmanager, start, end): futureMetrics = [ ts.execute(tracker, tmanager, start, end) - for ts in self.timeSeriesList if isinstance(ts, Operator) + for ts in self.time_series_list if isinstance(ts, Operator) ] # Get all the timeseries metrics @@ -522,14 +521,14 @@ class Rate(Operator): def __init__(self, children) -> None: if len(children) != 1: raise Exception("RATE requires exactly one argument.") - timeSeries, = children - if not isinstance(timeSeries, Operator): + time_series, = children + if not isinstance(time_series, Operator): raise Exception("RATE requires a timeseries, not constant.") - self.timeSeries = timeSeries + self.time_series = time_series async def execute(self, tracker, tmanager: TManagerLocation, start: int, end: int) -> Any: # Get 1 previous data point to be able to apply rate on the first data - metrics = await self.timeSeries.execute(tracker, tmanager, start-60, end) + metrics = await self.time_series.execute(tracker, tmanager, start-60, end) # Apply rate on all of them for metric in metrics: diff --git a/heron/tools/tracker/src/python/routers/topologies.py b/heron/tools/tracker/src/python/routers/topologies.py index bf0df9115fd..c96bbdb1e24 100644 --- a/heron/tools/tracker/src/python/routers/topologies.py +++ b/heron/tools/tracker/src/python/routers/topologies.py @@ -24,8 +24,6 @@ Some information may not be available for a topology due until the state manager has recieved more information from the state manager. -> **TODO:** link to topology lifecycle documentation. - """ from typing import List, Optional, Dict, Union @@ -102,12 +100,9 @@ async def get_topology_info( role: Optional[str] = Query(None, deprecated=True), ): topology = state.tracker.get_topology(cluster, role, environ, topology) - # TODO: 404 if no topology found - # TODO: 412 if no topology info return topology.info -# XXX: this all smells like graphql @router.get("/config", response_model=Dict[str, Union[int, str]]) async def get_topology_config( cluster: str, @@ -115,7 +110,6 @@ async def get_topology_config( topology: str, role: Optional[str] = Query(None, deprecated=True), ): - # TODO: deprecate in favour of /info topology = state.tracker.get_topology(cluster, role, environ, topology) topology_info = topology.info return topology_info.physical_plan.config @@ -128,7 +122,6 @@ async def get_topology_physical_plan( topology: str, role: Optional[str] = Query(None, deprecated=True), ): - # TODO: deprecate in favour of /info topology = state.tracker.get_topology(cluster, role, environ, topology) return topology.info.physical_plan @@ -141,7 +134,6 @@ async def get_topology_execution_state( topology: str, role: Optional[str] = Query(None, deprecated=True), ): - # TODO: deprecate in favour of /info topology = state.tracker.get_topology(cluster, role, environ, topology) return topology.info.execution_state @@ -153,7 +145,6 @@ async def get_topology_scheduler_location( topology: str, role: Optional[str] = Query(None, deprecated=True), ): - # TODO: deprecate in favour of /info topology = state.tracker.get_topology(cluster, role, environ, topology) return topology.info.scheduler_location @@ -165,7 +156,6 @@ async def get_topology_metadata( topology: str, role: Optional[str] = Query(None, deprecated=True), ): - # TODO: deprecate in favour of /info topology = state.tracker.get_topology(cluster, role, environ, topology) return topology.info.metadata diff --git a/heron/tools/tracker/src/python/utils.py b/heron/tools/tracker/src/python/utils.py index 36de8e6519d..b5f375d9efd 100644 --- a/heron/tools/tracker/src/python/utils.py +++ b/heron/tools/tracker/src/python/utils.py @@ -53,9 +53,7 @@ ResultType = TypeVar("ResultType") -# XXX: requires python 3.7+ class ResponseEnvelope(GenericModel, Generic[ResultType]): - # XXX: looking to deprecate exception time - leve to logging or calling app execution_time: float = Field(0.0, alias="executiontime") message: str result: Optional[ResultType] = None diff --git a/heron/tools/tracker/tests/python/query_unittest.py b/heron/tools/tracker/tests/python/query_unittest.py index 6706203729a..b482b47d674 100644 --- a/heron/tools/tracker/tests/python/query_unittest.py +++ b/heron/tools/tracker/tests/python/query_unittest.py @@ -89,21 +89,21 @@ def test_parse_query_string(mock_query): assert isinstance(root, Default) assert isinstance(root.constant, float) assert isinstance(root.timeseries, Sum) - assert 2 == len(root.timeseries.timeSeriesList) - assert isinstance(root.timeseries.timeSeriesList[0], TS) - assert isinstance(root.timeseries.timeSeriesList[1], TS) + assert 2 == len(root.timeseries.time_series_list) + assert isinstance(root.timeseries.time_series_list[0], TS) + assert isinstance(root.timeseries.time_series_list[1], TS) query = "MAX(1, TS(a, a, a))" root = mock_query.parse_query_string(query) assert isinstance(root, Max) - assert isinstance(root.timeSeriesList[0], float) - assert isinstance(root.timeSeriesList[1], TS) + assert isinstance(root.time_series_list[0], float) + assert isinstance(root.time_series_list[1], TS) query = "PERCENTILE(90, TS(a, a, a))" root = mock_query.parse_query_string(query) assert isinstance(root, Percentile) assert isinstance(root.quantile, float) - assert isinstance(root.timeSeriesList[0], TS) + assert isinstance(root.time_series_list[0], TS) query = "PERCENTILE(TS(a, a, a), 90)" with pytest.raises(Exception): @@ -167,7 +167,7 @@ def test_parse_query_string(mock_query): query = "RATE(TS(a, a, a))" root = mock_query.parse_query_string(query) assert isinstance(root, Rate) - assert isinstance(root.timeSeries, TS) + assert isinstance(root.time_series, TS) # Must have one operand only query = "RATE(TS(a, a, a), TS(b, b, b))" diff --git a/tools/python/checkstyle.ini b/tools/python/checkstyle.ini index df5faafcb59..016f5d6fe42 100644 --- a/tools/python/checkstyle.ini +++ b/tools/python/checkstyle.ini @@ -78,7 +78,6 @@ confidence= # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" disable= - fixme, import-star-module-level, old-octal-literal, oct-method,