Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix graph error after Kubernetes watch restart #298

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 33 additions & 38 deletions backend/streams_explorer/core/services/dataflow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,16 @@ def _add_streaming_app(self, graph: nx.DiGraph, app: K8sApp) -> None:
)

for input_topic in app.input_topics:
self._add_topic(graph, input_topic)
self._add_input_topic(graph, app.id, input_topic)
if app.output_topic:
self._add_topic(graph, app.output_topic)
self._add_output_topic(graph, app.id, app.output_topic)
if app.error_topic:
self._add_error_topic(graph, app.id, app.error_topic)
if app.input_pattern:
self._enqueue_input_pattern(app.input_pattern, app.id)
for extra_input in app.extra_input_topics:
self._add_topic(graph, extra_input)
self._add_input_topic(graph, app.id, extra_input)
for extra_output in app.extra_output_topics:
self._add_topic(graph, extra_output)
self._add_output_topic(graph, app.id, extra_output)
for extra_pattern in app.extra_input_patterns:
self._enqueue_input_pattern(extra_pattern, app.id)
Expand Down Expand Up @@ -178,9 +174,9 @@ async def get_positioned_graph(self) -> dict:
return await self.__get_positioned_json_graph(self.graph)

async def get_metrics(self) -> list[Metric]:
if self.metric_provider is not None:
return await self.metric_provider.get()
return []
if not self.metric_provider:
return []
return await self.metric_provider.get()

def get_node_type(self, id: str) -> NodeTypesEnum:
try:
Expand Down Expand Up @@ -208,8 +204,35 @@ def find_associated_pipelines(
return pipelines

@staticmethod
def _add_topic(graph: nx.DiGraph, name: str) -> None:
graph.add_node(name, label=name, node_type=NodeTypesEnum.TOPIC)
def _add_topic(graph: nx.DiGraph, topic_name: str) -> None:
graph.add_node(
topic_name,
**{
NodeDataFields.LABEL: topic_name,
NodeDataFields.NODE_TYPE: NodeTypesEnum.TOPIC,
},
)

@staticmethod
def _add_input_topic(graph: nx.DiGraph, app_id: str, topic_name: str) -> None:
DataFlowGraph._add_topic(graph, topic_name)
graph.add_edge(topic_name, app_id)

@staticmethod
def _add_output_topic(graph: nx.DiGraph, app_id: str, topic_name: str) -> None:
DataFlowGraph._add_topic(graph, topic_name)
graph.add_edge(app_id, topic_name)

@staticmethod
def _add_error_topic(graph: nx.DiGraph, app_id: str, topic_name: str) -> None:
graph.add_node(
topic_name,
**{
NodeDataFields.LABEL: topic_name,
NodeDataFields.NODE_TYPE: NodeTypesEnum.ERROR_TOPIC,
},
)
graph.add_edge(app_id, topic_name)

@staticmethod
def _filter_topic_node_ids(graph: nx.DiGraph) -> set[str]:
Expand All @@ -222,19 +245,6 @@ def _filter_topic_node_ids(graph: nx.DiGraph) -> set[str]:
in (NodeTypesEnum.TOPIC, NodeTypesEnum.ERROR_TOPIC)
}

@staticmethod
def _add_input_topic(graph: nx.DiGraph, app_id: str, topic_name: str) -> None:
graph.add_edge(topic_name, app_id)

def _add_output_topic(
self,
graph: nx.DiGraph,
app_id: str,
topic_name: str,
) -> None:
self._add_topic(graph, topic_name)
graph.add_edge(app_id, topic_name)

def _enqueue_input_pattern(self, pattern: str, node_id: str) -> None:
"""
Enqueue a input topic pattern for an app or Kafka Connector
Expand Down Expand Up @@ -317,21 +327,6 @@ def resolve_topic_pattern_in_pipeline(
else:
self.add_pattern_as_topic(self.pipelines[pipeline], node_id, pattern)

@staticmethod
def _add_error_topic(
graph: nx.DiGraph,
app_id: str,
topic_name: str,
) -> None:
graph.add_node(
topic_name,
**{
NodeDataFields.LABEL: topic_name,
NodeDataFields.NODE_TYPE: NodeTypesEnum.ERROR_TOPIC,
},
)
graph.add_edge(app_id, topic_name)

def reset(self) -> None:
self.graph.clear()
self.json_graph.clear()
Expand All @@ -340,7 +335,7 @@ def reset(self) -> None:

@staticmethod
def __get_json_graph(graph: nx.Graph) -> dict:
json_graph: dict = nx.node_link_data(graph)
json_graph = dict(nx.node_link_data(graph))
json_graph["edges"] = json_graph.pop("links")
return json_graph

Expand Down
36 changes: 25 additions & 11 deletions backend/streams_explorer/core/services/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class K8sResource(NamedTuple):
| V1beta1CronJobList
| EventsV1EventList,
]
return_type: type | None
return_type: type
callback: Callable[..., Awaitable[None]]
delay: int = 0

Expand Down Expand Up @@ -185,7 +185,7 @@ def list_events(namespace: str, *args, **kwargs) -> EventsV1EventList:
async def __watch_namespace(
self, namespace: str, resource: K8sResource, resource_version: int | None = None
) -> None:
return_type = resource.return_type.__name__ if resource.return_type else None
return_type = resource.return_type.__name__
try:
async with kubernetes_asyncio.watch.Watch(return_type) as w:
async with w.stream(
Expand All @@ -194,19 +194,33 @@ async def __watch_namespace(
async for event in stream:
await resource.callback(event)
except ApiException as e:
logger.error("Kubernetes watch error {}", e)
formatted_error = " ".join(str(e).splitlines())
logger.error(
"Kubernetes {} watch error {}",
return_type,
formatted_error,
)
match e.status:
case 410: # Expired
# parse resource version from error
resource_version = None
if e.reason:
match = re.match(
r"Expired: too old resource version: \d+ \((\d+)\)",
e.reason,
)

if match:
resource_version = int(match.group(1))
# FIXME: leads to graph errors, e.g. in data_flow.apply_input_pattern_edges()
# probably due to missed events for added or removed deployments
# if e.reason:
# match = re.match(
# r"Expired: too old resource version: \d+ \((\d+)\)",
# e.reason,
# )

# if match:
# resource_version = int(match.group(1))
logger.debug(
"Restarting Kubernetes {} watch {}",
return_type,
f"at resource version {resource_version}"
if resource_version
else "from the start",
)
return await self.__watch_namespace(
namespace, resource, resource_version
)
Expand Down
23 changes: 15 additions & 8 deletions backend/tests/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async def mock_callback() -> None:
await mock_watch_namespace(
"test-namespace",
K8sResource(
mock_list_deployments, return_type=None, callback=mock_callback
mock_list_deployments, return_type=V1Deployment, callback=mock_callback
),
)
assert e.value.status == 500
Expand All @@ -110,7 +110,7 @@ async def mock_callback() -> None:
await mock_watch_namespace(
"test-namespace",
K8sResource(
mock_list_deployments, return_type=None, callback=mock_callback
mock_list_deployments, return_type=V1Deployment, callback=mock_callback
),
)
assert e.value.status == 409
Expand Down Expand Up @@ -141,17 +141,20 @@ async def mock_callback() -> None:
await mock_watch_namespace(
"test-namespace",
K8sResource(
mock_list_deployments, return_type=None, callback=mock_callback
mock_list_deployments, return_type=V1Deployment, callback=mock_callback
),
)
assert isinstance(e, RecursionError)
mock_watch_namespace.assert_called_with(
"test-namespace",
K8sResource(mock_list_deployments, return_type=None, callback=mock_callback),
K8sResource(
mock_list_deployments, return_type=V1Deployment, callback=mock_callback
),
resource_version=None,
)


@pytest.mark.skip("Disabled parsing resource version")
@pytest.mark.asyncio
async def test_watch_namespace_restart_expired_with_resource_version(
kubernetes: Kubernetes, mocker: MockFixture
Expand All @@ -176,12 +179,14 @@ async def mock_callback() -> None:
await mock_watch_namespace(
"test-namespace",
K8sResource(
mock_list_deployments, return_type=None, callback=mock_callback
mock_list_deployments, return_type=V1Deployment, callback=mock_callback
),
)
mock_watch_namespace.assert_called_with(
"test-namespace",
K8sResource(mock_list_deployments, return_type=None, callback=mock_callback),
K8sResource(
mock_list_deployments, return_type=V1Deployment, callback=mock_callback
),
resource_version=987654321,
)

Expand Down Expand Up @@ -210,11 +215,13 @@ async def mock_callback() -> None:
await mock_watch_namespace(
"test-namespace",
K8sResource(
mock_list_deployments, return_type=None, callback=mock_callback
mock_list_deployments, return_type=V1Deployment, callback=mock_callback
),
)
assert isinstance(e, RecursionError)
mock_watch_namespace.assert_called_with(
"test-namespace",
K8sResource(mock_list_deployments, return_type=None, callback=mock_callback),
K8sResource(
mock_list_deployments, return_type=V1Deployment, callback=mock_callback
),
)