diff --git a/ingestion/src/metadata/ingestion/connections/test_connections.py b/ingestion/src/metadata/ingestion/connections/test_connections.py index 3b3f98756a69..c71b04a89d4e 100644 --- a/ingestion/src/metadata/ingestion/connections/test_connections.py +++ b/ingestion/src/metadata/ingestion/connections/test_connections.py @@ -103,7 +103,7 @@ def _test_connection_steps( metadata=metadata, steps=steps, automation_workflow=automation_workflow ) - return _test_connection_steps_and_raise(steps=steps) + return _test_connection_steps_during_ingestion(steps=steps) def _test_connection_steps_automation_workflow( @@ -231,16 +231,9 @@ def _test_connection_steps_during_ingestion( return test_connection_result -def _test_connection_steps_and_raise( - steps: List[TestConnectionStep], -) -> TestConnectionResult: - """ - Run the test connection as part of the ingestion workflow - Raise an exception if something fails - """ - test_connection_result = _test_connection_steps_during_ingestion(steps) - - for step in test_connection_result.steps: +def raise_test_connection_exception(result: TestConnectionResult) -> None: + """Raise if needed an exception for the test connection""" + for step in result.steps: if not step.passed and step.mandatory: raise SourceConnectionException( f"Failed to run the test connection step: {step.name}" @@ -250,8 +243,6 @@ def _test_connection_steps_and_raise( f"You might be missing metadata in: {step.name} due to {step.message}" ) - return test_connection_result - def test_connection_steps( metadata: OpenMetadata, @@ -319,7 +310,7 @@ def test_connection_db_common( automation_workflow: Optional[AutomationWorkflow] = None, queries: dict = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -351,7 +342,7 @@ def test_connection_db_common( for key, query in queries.items(): test_fn[key] = partial(test_query, statement=query, engine=engine) - test_connection_steps( + result = test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, @@ -361,6 +352,8 @@ def test_connection_db_common( kill_active_connections(engine) + return result + def test_connection_db_schema_sources( metadata: OpenMetadata, @@ -369,7 +362,7 @@ def test_connection_db_schema_sources( automation_workflow: Optional[AutomationWorkflow] = None, queries: dict = None, timeout_seconds: Optional[int] = THREE_MIN, -) -> None: +) -> TestConnectionResult: """ Test connection. This can be executed either as part of a metadata workflow or during an Automation Workflow @@ -418,7 +411,7 @@ def custom_executor(engine_: Engine, inspector_fn_str: str): for key, query in queries.items(): test_fn[key] = partial(test_query, statement=query, engine=engine) - test_connection_steps( + result = test_connection_steps( metadata=metadata, test_fn=test_fn, service_type=service_connection.type.value, @@ -428,6 +421,8 @@ def custom_executor(engine_: Engine, inspector_fn_str: str): kill_active_connections(engine) + return result + def test_query(engine: Engine, statement: str): """