Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Clean up notes and some query_operator names
Browse files Browse the repository at this point in the history
  • Loading branch information
Code0x58 committed Dec 7, 2020
1 parent 9a85467 commit c1fccd7
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 35 deletions.
3 changes: 1 addition & 2 deletions heron/tools/tracker/src/python/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
{"name": "topologies", "description": topologies.__doc__},
]

# TODO: implement a 120s timeout to be consistent with previous implementation
app = FastAPI(
title="Heron Tracker",
redoc_url="/",
Expand Down Expand Up @@ -85,7 +84,7 @@ async def shutdown_event():
@app.exception_handler(Exception)
async def handle_exception(_, exc: Exception):
payload = ResponseEnvelope[str](
result="",
result=None,
execution_time=0.0,
message=f"request failed: {exc}",
status=constants.RESPONSE_STATUS_FAILURE
Expand Down
25 changes: 12 additions & 13 deletions heron/tools/tracker/src/python/query_operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,24 +212,23 @@ class Sum(Operator):
"""
# pylint: disable=super-init-not-called
def __init__(self, children) -> None:
self.timeSeriesList = children
self.time_series_list = children

async def execute(self, tracker, tmanager: TManagerLocation, start: int, end: int) -> Any:
# Initialize the metric to be returned with sum of all the constants.
result = Metrics(None, None, None, start, end, {})
constant_sum = sum(ts for ts in self.timeSeriesList if isinstance(ts, float))
constant_sum = sum(ts for ts in self.time_series_list if isinstance(ts, float))
result.setDefault(constant_sum, start, end)

futureMetrics = [
ts.execute(tracker, tmanager, start, end)
for ts in self.timeSeriesList if isinstance(ts, Operator)
for ts in self.time_series_list if isinstance(ts, Operator)
]

# Get all the timeseries metrics
all_metrics = []
for met_f in asyncio.as_completed(futureMetrics):
met = await met_f
# TODO: change this interface from str to plain raise (raised on await)
if isinstance(met, str):
raise Exception(met)
all_metrics.extend(met)
Expand All @@ -256,18 +255,18 @@ class Max(Operator):
def __init__(self, children):
if len(children) < 1:
raise Exception("MAX expects at least one operand.")
self.timeSeriesList = children
self.time_series_list = children

async def execute(self, tracker, tmanager: TManagerLocation, start: int, end: int) -> Any:
# Initialize the metric to be returned with max of all the constants.
result = Metrics(None, None, None, start, end, {})
constants = [ts for ts in self.timeSeriesList if isinstance(ts, float)]
constants = [ts for ts in self.time_series_list if isinstance(ts, float)]
if constants:
result.setDefault(max(constants), start, end)

futureMetrics = [
ts.execute(tracker, tmanager, start, end)
for ts in self.timeSeriesList if isinstance(ts, Operator)
for ts in self.time_series_list if isinstance(ts, Operator)
]

# Get all the timeseries metrics
Expand Down Expand Up @@ -313,12 +312,12 @@ def __init__(self, children):
if not 0 <= quantile <= 100:
raise Exception("Quantile must be between 0 and 100 inclusive.")
self.quantile = quantile
self.timeSeriesList = timeseries_list
self.time_series_list = timeseries_list

async def execute(self, tracker, tmanager, start, end):
futureMetrics = [
ts.execute(tracker, tmanager, start, end)
for ts in self.timeSeriesList if isinstance(ts, Operator)
for ts in self.time_series_list if isinstance(ts, Operator)
]

# Get all the timeseries metrics
Expand Down Expand Up @@ -522,14 +521,14 @@ class Rate(Operator):
def __init__(self, children) -> None:
if len(children) != 1:
raise Exception("RATE requires exactly one argument.")
timeSeries, = children
if not isinstance(timeSeries, Operator):
time_series, = children
if not isinstance(time_series, Operator):
raise Exception("RATE requires a timeseries, not constant.")
self.timeSeries = timeSeries
self.time_series = time_series

async def execute(self, tracker, tmanager: TManagerLocation, start: int, end: int) -> Any:
# Get 1 previous data point to be able to apply rate on the first data
metrics = await self.timeSeries.execute(tracker, tmanager, start-60, end)
metrics = await self.time_series.execute(tracker, tmanager, start-60, end)

# Apply rate on all of them
for metric in metrics:
Expand Down
10 changes: 0 additions & 10 deletions heron/tools/tracker/src/python/routers/topologies.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
Some information may not be available for a topology due until the state
manager has recieved more information from the state manager.
> **TODO:** link to topology lifecycle documentation.
"""
from typing import List, Optional, Dict, Union

Expand Down Expand Up @@ -102,20 +100,16 @@ async def get_topology_info(
role: Optional[str] = Query(None, deprecated=True),
):
topology = state.tracker.get_topology(cluster, role, environ, topology)
# TODO: 404 if no topology found
# TODO: 412 if no topology info
return topology.info


# XXX: this all smells like graphql
@router.get("/config", response_model=Dict[str, Union[int, str]])
async def get_topology_config(
cluster: str,
environ: str,
topology: str,
role: Optional[str] = Query(None, deprecated=True),
):
# TODO: deprecate in favour of /info
topology = state.tracker.get_topology(cluster, role, environ, topology)
topology_info = topology.info
return topology_info.physical_plan.config
Expand All @@ -128,7 +122,6 @@ async def get_topology_physical_plan(
topology: str,
role: Optional[str] = Query(None, deprecated=True),
):
# TODO: deprecate in favour of /info
topology = state.tracker.get_topology(cluster, role, environ, topology)
return topology.info.physical_plan

Expand All @@ -141,7 +134,6 @@ async def get_topology_execution_state(
topology: str,
role: Optional[str] = Query(None, deprecated=True),
):
# TODO: deprecate in favour of /info
topology = state.tracker.get_topology(cluster, role, environ, topology)
return topology.info.execution_state

Expand All @@ -153,7 +145,6 @@ async def get_topology_scheduler_location(
topology: str,
role: Optional[str] = Query(None, deprecated=True),
):
# TODO: deprecate in favour of /info
topology = state.tracker.get_topology(cluster, role, environ, topology)
return topology.info.scheduler_location

Expand All @@ -165,7 +156,6 @@ async def get_topology_metadata(
topology: str,
role: Optional[str] = Query(None, deprecated=True),
):
# TODO: deprecate in favour of /info
topology = state.tracker.get_topology(cluster, role, environ, topology)
return topology.info.metadata

Expand Down
2 changes: 0 additions & 2 deletions heron/tools/tracker/src/python/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@
ResultType = TypeVar("ResultType")


# XXX: requires python 3.7+
class ResponseEnvelope(GenericModel, Generic[ResultType]):
# XXX: looking to deprecate exception time - leve to logging or calling app
execution_time: float = Field(0.0, alias="executiontime")
message: str
result: Optional[ResultType] = None
Expand Down
14 changes: 7 additions & 7 deletions heron/tools/tracker/tests/python/query_unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,21 @@ def test_parse_query_string(mock_query):
assert isinstance(root, Default)
assert isinstance(root.constant, float)
assert isinstance(root.timeseries, Sum)
assert 2 == len(root.timeseries.timeSeriesList)
assert isinstance(root.timeseries.timeSeriesList[0], TS)
assert isinstance(root.timeseries.timeSeriesList[1], TS)
assert 2 == len(root.timeseries.time_series_list)
assert isinstance(root.timeseries.time_series_list[0], TS)
assert isinstance(root.timeseries.time_series_list[1], TS)

query = "MAX(1, TS(a, a, a))"
root = mock_query.parse_query_string(query)
assert isinstance(root, Max)
assert isinstance(root.timeSeriesList[0], float)
assert isinstance(root.timeSeriesList[1], TS)
assert isinstance(root.time_series_list[0], float)
assert isinstance(root.time_series_list[1], TS)

query = "PERCENTILE(90, TS(a, a, a))"
root = mock_query.parse_query_string(query)
assert isinstance(root, Percentile)
assert isinstance(root.quantile, float)
assert isinstance(root.timeSeriesList[0], TS)
assert isinstance(root.time_series_list[0], TS)

query = "PERCENTILE(TS(a, a, a), 90)"
with pytest.raises(Exception):
Expand Down Expand Up @@ -167,7 +167,7 @@ def test_parse_query_string(mock_query):
query = "RATE(TS(a, a, a))"
root = mock_query.parse_query_string(query)
assert isinstance(root, Rate)
assert isinstance(root.timeSeries, TS)
assert isinstance(root.time_series, TS)

# Must have one operand only
query = "RATE(TS(a, a, a), TS(b, b, b))"
Expand Down
1 change: 0 additions & 1 deletion tools/python/checkstyle.ini
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ confidence=
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
disable=
fixme,
import-star-module-level,
old-octal-literal,
oct-method,
Expand Down

0 comments on commit c1fccd7

Please sign in to comment.