diff --git a/tests/unit/task/test_run.py b/tests/unit/task/test_run.py index 8f239ccfc3a..e9df0197ce7 100644 --- a/tests/unit/task/test_run.py +++ b/tests/unit/task/test_run.py @@ -1,7 +1,5 @@ -import threading from argparse import Namespace from dataclasses import dataclass -from datetime import datetime, timedelta from importlib import import_module from typing import Optional, Type, Union from unittest import mock @@ -18,7 +16,6 @@ from dbt.artifacts.resources.v1.components import DependsOn from dbt.artifacts.resources.v1.config import NodeConfig from dbt.artifacts.resources.v1.model import ModelConfig -from dbt.artifacts.schemas.batch_results import BatchResults from dbt.artifacts.schemas.results import RunStatus from dbt.artifacts.schemas.run import RunResult from dbt.config.runtime import RuntimeConfig @@ -27,8 +24,7 @@ from dbt.events.types import LogModelResult from dbt.exceptions import DbtRuntimeError from dbt.flags import get_flags, set_from_args -from dbt.materializations.incremental.microbatch import MicrobatchBuilder -from dbt.task.run import ModelRunner, RunTask, _get_adapter_info +from dbt.task.run import MicrobatchModelRunner, ModelRunner, RunTask, _get_adapter_info from dbt.tests.util import safe_set_invocation_context from dbt_common.events.base_types import EventLevel from dbt_common.events.event_manager_client import add_callback_to_manager @@ -161,50 +157,23 @@ def test_execute( model_runner.execute(model=table_model, manifest=manifest) # TODO: Assert that the model was executed - def test__build_run_microbatch_model_result( - self, table_model: ModelNode, model_runner: ModelRunner - ) -> None: - batch = (datetime.now() - timedelta(days=1), datetime.now()) - only_successes = [ - RunResult( - node=table_model, - status=RunStatus.Success, - timing=[], - thread_id=threading.current_thread().name, - execution_time=0, - message="SUCCESS", - adapter_response={}, - failures=0, - batch_results=BatchResults(successful=[batch]), - ) - ] - only_failures = [ - RunResult( - node=table_model, - status=RunStatus.Error, - timing=[], - thread_id=threading.current_thread().name, - execution_time=0, - message="ERROR", - adapter_response={}, - failures=1, - batch_results=BatchResults(failed=[batch]), - ) - ] - mixed_results = only_failures + only_successes - expect_success = model_runner._build_run_microbatch_model_result( - table_model, only_successes - ) - expect_error = model_runner._build_run_microbatch_model_result(table_model, only_failures) - expect_partial_success = model_runner._build_run_microbatch_model_result( - table_model, mixed_results +class TestMicrobatchModelRunner: + @pytest.fixture + def model_runner( + self, + postgres_adapter: PostgresAdapter, + table_model: ModelNode, + runtime_config: RuntimeConfig, + ) -> MicrobatchModelRunner: + return MicrobatchModelRunner( + config=runtime_config, + adapter=postgres_adapter, + node=table_model, + node_index=1, + num_nodes=1, ) - assert expect_success.status == RunStatus.Success - assert expect_error.status == RunStatus.Error - assert expect_partial_success.status == RunStatus.PartialSuccess - @pytest.mark.parametrize( "has_relation,relation_type,materialized,full_refresh_config,full_refresh_flag,expectation", [ @@ -228,7 +197,7 @@ def test__build_run_microbatch_model_result( def test__is_incremental( self, mocker: MockerFixture, - model_runner: ModelRunner, + model_runner: MicrobatchModelRunner, has_relation: bool, relation_type: str, materialized: str, @@ -270,7 +239,7 @@ def test_keyboard_breaks__execute_microbatch_materialization( self, table_model: ModelNode, manifest: Manifest, - model_runner: ModelRunner, + model_runner: MicrobatchModelRunner, ) -> None: def mock_build_batch_context(*args, **kwargs): raise KeyboardInterrupt("Test exception") @@ -282,14 +251,14 @@ def mock_is_incremental(*args, **kwargs): table_model.config.incremental_strategy = "microbatch" table_model.config.batch_size = BatchSize.day - with patch.object( - MicrobatchBuilder, "build_batch_context", mock_build_batch_context - ), patch.object(ModelRunner, "_is_incremental", mock_is_incremental): + model_runner.compiler = mock.Mock() + model_runner.compiler.compile_node.side_effect = KeyboardInterrupt + with patch.object(MicrobatchModelRunner, "_is_incremental", mock_is_incremental): try: model_runner._execute_microbatch_materialization( table_model, manifest, {}, MagicMock() ) - assert False, "KeybaordInterrupt failed to escape" + assert False, "KeyboardInterrupt failed to escape" except KeyboardInterrupt: assert True