From 4156025d15472a6db84afadfee2a46c70578289b Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 12 Dec 2024 16:40:38 -0500 Subject: [PATCH] Update post review --- .../dagster_fivetran/fivetran_event_iterator.py | 8 +++++++- .../experimental/test_columns_metadata.py | 3 +++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/fivetran_event_iterator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/fivetran_event_iterator.py index 71ea528782fbe..f1ea98a332760 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/fivetran_event_iterator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/fivetran_event_iterator.py @@ -1,3 +1,4 @@ +import os from concurrent.futures import ThreadPoolExecutor from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, Union @@ -133,7 +134,12 @@ def _threadpool_wrap_map_fn() -> Iterator[FivetranEventType]: ) with ThreadPoolExecutor( - max_workers=DEFAULT_MAX_THREADPOOL_WORKERS, + max_workers=int( + os.getenv( + "FIVETRAN_POSTPROCESSING_THREADPOOL_WORKERS", + default=DEFAULT_MAX_THREADPOOL_WORKERS, + ) + ), thread_name_prefix=f"fivetran_{connector_id}", ) as executor: yield from imap( diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_columns_metadata.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_columns_metadata.py index a929e7e3f6e21..5550a82c9bce3 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_columns_metadata.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_columns_metadata.py @@ -135,6 +135,9 @@ def my_fivetran_assets(context: AssetExecutionContext, fivetran: FivetranWorkspa assert table_schema_by_asset_key == expected_table_schema_by_asset_key captured = capsys.readouterr() + # If an exception occurs in fetch_column_metadata, + # a message is logged as a warning and the exception is not raised. + # We test that this message is not in the logs. assert not re.search( r"dagster - WARNING - (?s:.)+ - An error occurred while fetching column metadata for table", captured.err,