Skip to content

Commit

Permalink
unit test for MicrobatchModelRunner: remove result builder test as it…
Browse files Browse the repository at this point in the history
… will need to be re-written with new structure
  • Loading branch information
MichelleArk committed Nov 8, 2024
1 parent 330ca88 commit 8cbd2c6
Showing 1 changed file with 21 additions and 52 deletions.
73 changes: 21 additions & 52 deletions tests/unit/task/test_run.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import threading
from argparse import Namespace
from dataclasses import dataclass
from datetime import datetime, timedelta
from importlib import import_module
from typing import Optional, Type, Union
from unittest import mock
Expand All @@ -18,7 +16,6 @@
from dbt.artifacts.resources.v1.components import DependsOn
from dbt.artifacts.resources.v1.config import NodeConfig
from dbt.artifacts.resources.v1.model import ModelConfig
from dbt.artifacts.schemas.batch_results import BatchResults
from dbt.artifacts.schemas.results import RunStatus
from dbt.artifacts.schemas.run import RunResult
from dbt.config.runtime import RuntimeConfig
Expand All @@ -27,8 +24,7 @@
from dbt.events.types import LogModelResult
from dbt.exceptions import DbtRuntimeError
from dbt.flags import get_flags, set_from_args
from dbt.materializations.incremental.microbatch import MicrobatchBuilder
from dbt.task.run import ModelRunner, RunTask, _get_adapter_info
from dbt.task.run import MicrobatchModelRunner, ModelRunner, RunTask, _get_adapter_info
from dbt.tests.util import safe_set_invocation_context
from dbt_common.events.base_types import EventLevel
from dbt_common.events.event_manager_client import add_callback_to_manager
Expand Down Expand Up @@ -161,50 +157,23 @@ def test_execute(
model_runner.execute(model=table_model, manifest=manifest)
# TODO: Assert that the model was executed

def test__build_run_microbatch_model_result(
self, table_model: ModelNode, model_runner: ModelRunner
) -> None:
batch = (datetime.now() - timedelta(days=1), datetime.now())
only_successes = [
RunResult(
node=table_model,
status=RunStatus.Success,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message="SUCCESS",
adapter_response={},
failures=0,
batch_results=BatchResults(successful=[batch]),
)
]
only_failures = [
RunResult(
node=table_model,
status=RunStatus.Error,
timing=[],
thread_id=threading.current_thread().name,
execution_time=0,
message="ERROR",
adapter_response={},
failures=1,
batch_results=BatchResults(failed=[batch]),
)
]
mixed_results = only_failures + only_successes

expect_success = model_runner._build_run_microbatch_model_result(
table_model, only_successes
)
expect_error = model_runner._build_run_microbatch_model_result(table_model, only_failures)
expect_partial_success = model_runner._build_run_microbatch_model_result(
table_model, mixed_results
class TestMicrobatchModelRunner:
@pytest.fixture
def model_runner(
self,
postgres_adapter: PostgresAdapter,
table_model: ModelNode,
runtime_config: RuntimeConfig,
) -> MicrobatchModelRunner:
return MicrobatchModelRunner(
config=runtime_config,
adapter=postgres_adapter,
node=table_model,
node_index=1,
num_nodes=1,
)

assert expect_success.status == RunStatus.Success
assert expect_error.status == RunStatus.Error
assert expect_partial_success.status == RunStatus.PartialSuccess

@pytest.mark.parametrize(
"has_relation,relation_type,materialized,full_refresh_config,full_refresh_flag,expectation",
[
Expand All @@ -228,7 +197,7 @@ def test__build_run_microbatch_model_result(
def test__is_incremental(
self,
mocker: MockerFixture,
model_runner: ModelRunner,
model_runner: MicrobatchModelRunner,
has_relation: bool,
relation_type: str,
materialized: str,
Expand Down Expand Up @@ -270,7 +239,7 @@ def test_keyboard_breaks__execute_microbatch_materialization(
self,
table_model: ModelNode,
manifest: Manifest,
model_runner: ModelRunner,
model_runner: MicrobatchModelRunner,
) -> None:
def mock_build_batch_context(*args, **kwargs):
raise KeyboardInterrupt("Test exception")
Expand All @@ -282,14 +251,14 @@ def mock_is_incremental(*args, **kwargs):
table_model.config.incremental_strategy = "microbatch"
table_model.config.batch_size = BatchSize.day

with patch.object(
MicrobatchBuilder, "build_batch_context", mock_build_batch_context
), patch.object(ModelRunner, "_is_incremental", mock_is_incremental):
model_runner.compiler = mock.Mock()
model_runner.compiler.compile_node.side_effect = KeyboardInterrupt
with patch.object(MicrobatchModelRunner, "_is_incremental", mock_is_incremental):
try:
model_runner._execute_microbatch_materialization(
table_model, manifest, {}, MagicMock()
)
assert False, "KeybaordInterrupt failed to escape"
assert False, "KeyboardInterrupt failed to escape"
except KeyboardInterrupt:
assert True

Expand Down

0 comments on commit 8cbd2c6

Please sign in to comment.