diff --git a/dbt_dry_run/node_runner/incremental_runner.py b/dbt_dry_run/node_runner/incremental_runner.py index 60f793a..b012ec6 100644 --- a/dbt_dry_run/node_runner/incremental_runner.py +++ b/dbt_dry_run/node_runner/incremental_runner.py @@ -1,4 +1,5 @@ -from typing import Callable, Dict, Optional +from textwrap import dedent +from typing import Callable, Dict, Iterable, Optional, Set from dbt_dry_run import flags from dbt_dry_run.exception import SchemaChangeException, UpstreamFailedException @@ -78,9 +79,49 @@ def fail_handler(dry_run_result: DryRunResult, target_table: Table) -> DryRunRes } +def get_common_field_names(left: Table, right: Table) -> Set[str]: + return left.field_names.intersection(right.field_names) + + +def get_merge_sql( + node: Node, common_field_names: Iterable[str], select_statement: str +) -> str: + values_csv = ",".join(sorted(common_field_names)) + return dedent( + f"""MERGE {node.to_table_ref_literal()} + USING ( + {select_statement} + ) + ON True + WHEN NOT MATCHED THEN + INSERT ({values_csv}) + VALUES ({values_csv}) + """ + ) + + class IncrementalRunner(NodeRunner): resource_type = ("model",) + def _verify_merge_type_compatibility( + self, + node: Node, + sql_statement: str, + initial_result: DryRunResult, + target_table: Table, + ) -> DryRunResult: + if not initial_result.table: + return initial_result + common_field_names = get_common_field_names(initial_result.table, target_table) + if not common_field_names: + return initial_result + sql_statement = get_merge_sql(node, common_field_names, sql_statement) + status, model_schema, exception = self._sql_runner.query(sql_statement) + if status == DryRunStatus.SUCCESS: + return initial_result + + return DryRunResult(node, model_schema, status, exception) + def _modify_sql(self, node: Node, sql_statement: str) -> str: if node.config.sql_header: sql_statement = f"{node.config.sql_header}\n{sql_statement}" @@ -118,6 +159,9 @@ def run(self, node: Node) -> DryRunResult: if target_table: on_schema_change = node.config.on_schema_change or OnSchemaChange.IGNORE handler = ON_SCHEMA_CHANGE_TABLE_HANDLER[on_schema_change] + result = self._verify_merge_type_compatibility( + node, run_sql, result, target_table + ) result = handler(result, target_table) return result diff --git a/dbt_dry_run/test/node_runner/test_incremental_runner.py b/dbt_dry_run/test/node_runner/test_incremental_runner.py index 494d96a..064f761 100644 --- a/dbt_dry_run/test/node_runner/test_incremental_runner.py +++ b/dbt_dry_run/test/node_runner/test_incremental_runner.py @@ -1,11 +1,12 @@ -from unittest.mock import MagicMock +from typing import List, Optional +from unittest.mock import MagicMock, call from dbt_dry_run import flags from dbt_dry_run.exception import SchemaChangeException from dbt_dry_run.literals import enable_test_example_values from dbt_dry_run.models import BigQueryFieldType, Table, TableField from dbt_dry_run.models.manifest import NodeConfig, PartitionBy -from dbt_dry_run.node_runner.incremental_runner import IncrementalRunner +from dbt_dry_run.node_runner.incremental_runner import IncrementalRunner, get_merge_sql from dbt_dry_run.results import DryRunStatus, Results from dbt_dry_run.scheduler import ManifestScheduler from dbt_dry_run.test.utils import SimpleNode, assert_result_has_table, get_executed_sql @@ -22,6 +23,34 @@ ) +def get_mock_sql_runner_with_all_string_columns( + model_names: List[str], target_names: Optional[List[str]] +) -> MagicMock: + model_schema = Table( + fields=[ + TableField(name=name, type=BigQueryFieldType.STRING) for name in model_names + ] + ) + target_schema: Optional[Table] = None + if target_names: + target_schema = Table( + fields=[ + TableField(name=name, type=BigQueryFieldType.STRING) + for name in target_names + ] + ) + return get_mock_sql_runner_with(model_schema, target_schema) + + +def get_mock_sql_runner_with( + model_schema: Table, target_schema: Optional[Table] +) -> MagicMock: + mock_sql_runner = MagicMock() + mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, model_schema, None) + mock_sql_runner.get_node_schema.return_value = target_schema + return mock_sql_runner + + def test_partitioned_incremental_model_declares_dbt_max_partition_variable() -> None: dbt_max_partition_declaration = ( "declare _dbt_max_partition timestamp default CURRENT_TIMESTAMP();" @@ -59,8 +88,8 @@ def test_partitioned_incremental_model_declares_dbt_max_partition_variable() -> def test_incremental_model_that_does_not_exist_returns_dry_run_schema() -> None: - mock_sql_runner = MagicMock() - predicted_table = Table( + mock_sql_runner = get_mock_sql_runner_with_all_string_columns(["a"], None) + expected_table = Table( fields=[ TableField( name="a", @@ -68,8 +97,6 @@ def test_incremental_model_that_does_not_exist_returns_dry_run_schema() -> None: ) ] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, predicted_table, None) - mock_sql_runner.get_node_schema.return_value = None node = SimpleNode( unique_id="node1", @@ -85,28 +112,12 @@ def test_incremental_model_that_does_not_exist_returns_dry_run_schema() -> None: result = model_runner.run(node) mock_sql_runner.query.assert_called_with(node.compiled_code) - assert_result_has_table(predicted_table, result) + assert_result_has_table(expected_table, result) def test_incremental_model_that_exists_and_has_a_column_removed_and_readded_with_new_name() -> None: - mock_sql_runner = MagicMock() - target_table = Table( - fields=[ - TableField(name="a", type=BigQueryFieldType.STRING), - TableField(name="b", type=BigQueryFieldType.STRING), - ] - ) - predicted_table = Table( - fields=[ - TableField( - name="a", - type=BigQueryFieldType.STRING, - ), - TableField( - name="c", - type=BigQueryFieldType.STRING, - ), - ] + mock_sql_runner = get_mock_sql_runner_with_all_string_columns( + ["a", "b"], ["a", "c"] ) expected_table = Table( fields=[ @@ -116,9 +127,6 @@ def test_incremental_model_that_exists_and_has_a_column_removed_and_readded_with ] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, predicted_table, None) - mock_sql_runner.get_node_schema.return_value = target_table - node = SimpleNode( unique_id="node1", depends_on=[], @@ -134,29 +142,43 @@ def test_incremental_model_that_exists_and_has_a_column_removed_and_readded_with model_runner = IncrementalRunner(mock_sql_runner, results) result = model_runner.run(node) - mock_sql_runner.query.assert_called_with(node.compiled_code) + merge_sql = get_merge_sql(node, ["a"], node.compiled_code) + mock_sql_runner.query.assert_has_calls([call(node.compiled_code), call(merge_sql)]) assert_result_has_table(expected_table, result) def test_incremental_model_that_exists_and_has_a_column_added_does_nothing() -> None: - mock_sql_runner = MagicMock() - target_table = Table(fields=[TableField(name="a", type=BigQueryFieldType.STRING)]) - predicted_table = Table( + mock_sql_runner = get_mock_sql_runner_with_all_string_columns(["a", "b"], ["a"]) + expected_table = Table(fields=[TableField(name="a", type=BigQueryFieldType.STRING)]) + + node = SimpleNode( + unique_id="node1", + depends_on=[], + resource_type=ManifestScheduler.SEED, + table_config=NodeConfig(materialized="incremental", on_schema_change="ignore"), + ).to_node() + node.depends_on.deep_nodes = [] + + results = Results() + + model_runner = IncrementalRunner(mock_sql_runner, results) + + result = model_runner.run(node) + merge_sql = get_merge_sql(node, ["a"], node.compiled_code) + mock_sql_runner.query.assert_has_calls([call(node.compiled_code), call(merge_sql)]) + assert_result_has_table(expected_table, result) + + +def test_incremental_model_that_exists_and_has_no_common_columns() -> None: + mock_sql_runner = get_mock_sql_runner_with_all_string_columns( + ["a", "b"], ["c", "d"] + ) + expected_table = Table( fields=[ - TableField( - name="a", - type=BigQueryFieldType.STRING, - ), - TableField( - name="b", - type=BigQueryFieldType.STRING, - ), + TableField(name="c", type=BigQueryFieldType.STRING), + TableField(name="d", type=BigQueryFieldType.STRING), ] ) - expected_table = target_table - - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, predicted_table, None) - mock_sql_runner.get_node_schema.return_value = target_table node = SimpleNode( unique_id="node1", @@ -171,19 +193,16 @@ def test_incremental_model_that_exists_and_has_a_column_added_does_nothing() -> model_runner = IncrementalRunner(mock_sql_runner, results) result = model_runner.run(node) - mock_sql_runner.query.assert_called_with(node.compiled_code) + mock_sql_runner.query.assert_has_calls([call(node.compiled_code)]) assert_result_has_table(expected_table, result) def test_incremental_model_that_exists_and_syncs_all_columns() -> None: - mock_sql_runner = MagicMock() - target_table = Table( - fields=[ - TableField(name="a", type=BigQueryFieldType.STRING), - TableField(name="b", type=BigQueryFieldType.STRING), - ] + mock_sql_runner = get_mock_sql_runner_with_all_string_columns( + ["a", "c"], ["a", "b"] ) - predicted_table = Table( + + expected_table = Table( fields=[ TableField( name="a", @@ -196,11 +215,6 @@ def test_incremental_model_that_exists_and_syncs_all_columns() -> None: ] ) - expected_table = predicted_table - - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, predicted_table, None) - mock_sql_runner.get_node_schema.return_value = target_table - node = SimpleNode( unique_id="node1", depends_on=[], @@ -216,36 +230,64 @@ def test_incremental_model_that_exists_and_syncs_all_columns() -> None: model_runner = IncrementalRunner(mock_sql_runner, results) result = model_runner.run(node) - mock_sql_runner.query.assert_called_with(node.compiled_code) + merge_sql = get_merge_sql(node, ["a"], node.compiled_code) + mock_sql_runner.query.assert_has_calls([call(node.compiled_code), call(merge_sql)]) assert_result_has_table(expected_table, result) -def test_usage_of_predicted_table_and_target_table_when_full_refresh_flag_is_false( +def test_incremental_model_that_exists_and_fails_when_schema_changed() -> None: + mock_sql_runner = get_mock_sql_runner_with_all_string_columns( + ["a", "c"], ["a", "b"] + ) + + node = SimpleNode( + unique_id="node1", + depends_on=[], + resource_type=ManifestScheduler.SEED, + table_config=NodeConfig(materialized="incremental", on_schema_change="fail"), + ).to_node() + node.depends_on.deep_nodes = [] + + results = Results() + + model_runner = IncrementalRunner(mock_sql_runner, results) + + result = model_runner.run(node) + merge_sql = get_merge_sql(node, ["a"], node.compiled_code) + mock_sql_runner.query.assert_has_calls([call(node.compiled_code), call(merge_sql)]) + assert result.status == DryRunStatus.FAILURE + assert isinstance(result.exception, SchemaChangeException) + + +def test_incremental_model_that_exists_and_success_when_schema_not_changed() -> None: + mock_sql_runner = get_mock_sql_runner_with_all_string_columns( + ["a", "b"], ["a", "b"] + ) + node = SimpleNode( + unique_id="node1", + depends_on=[], + resource_type=ManifestScheduler.SEED, + table_config=NodeConfig(materialized="incremental", on_schema_change="fail"), + ).to_node() + node.depends_on.deep_nodes = [] + + results = Results() + + model_runner = IncrementalRunner(mock_sql_runner, results) + + result = model_runner.run(node) + merge_sql = get_merge_sql(node, ["a", "b"], node.compiled_code) + mock_sql_runner.query.assert_has_calls([call(node.compiled_code), call(merge_sql)]) + assert result.status == DryRunStatus.SUCCESS + + +def test_node_with_no_full_refresh_does_not_full_refresh_when_flag_is_false( default_flags: flags.Flags, ) -> None: flags.set_flags(flags.Flags(full_refresh=False)) - mock_sql_runner = MagicMock() - target_table = Table( - fields=[ - TableField(name="a", type=BigQueryFieldType.STRING), - TableField(name="b", type=BigQueryFieldType.STRING), - ] + mock_sql_runner = get_mock_sql_runner_with_all_string_columns( + ["a", "c"], ["a", "b"] ) - predicted_table = Table( - fields=[ - TableField( - name="a", - type=BigQueryFieldType.STRING, - ), - TableField( - name="c", - type=BigQueryFieldType.STRING, - ), - ] - ) - - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, predicted_table, None) - mock_sql_runner.get_node_schema.return_value = target_table node_with_no_full_refresh_config = SimpleNode( unique_id="node1", @@ -254,6 +296,27 @@ def test_usage_of_predicted_table_and_target_table_when_full_refresh_flag_is_fal table_config=NodeConfig(materialized="incremental"), ).to_node() node_with_no_full_refresh_config.depends_on.deep_nodes = [] + + IncrementalRunner(mock_sql_runner, Results()).run(node_with_no_full_refresh_config) + merge_sql = get_merge_sql( + node_with_no_full_refresh_config, + ["a"], + node_with_no_full_refresh_config.compiled_code, + ) + mock_sql_runner.query.assert_has_calls( + [call(node_with_no_full_refresh_config.compiled_code), call(merge_sql)] + ) + assert len(mock_sql_runner.get_node_schema.call_args_list) == 1 + mock_sql_runner.get_node_schema.assert_called_with(node_with_no_full_refresh_config) + + +def test_node_full_refresh_true_does_full_refresh_when_flag_is_false( + default_flags: flags.Flags, +) -> None: + flags.set_flags(flags.Flags(full_refresh=False)) + mock_sql_runner = get_mock_sql_runner_with_all_string_columns( + ["a", "b"], ["a", "c"] + ) node_with_full_refresh_set_to_true = SimpleNode( unique_id="node2", depends_on=[], @@ -261,6 +324,26 @@ def test_usage_of_predicted_table_and_target_table_when_full_refresh_flag_is_fal table_config=NodeConfig(materialized="incremental", full_refresh=True), ).to_node() node_with_full_refresh_set_to_true.depends_on.deep_nodes = [] + + IncrementalRunner(mock_sql_runner, Results()).run( + node_with_full_refresh_set_to_true + ) + mock_sql_runner.query.assert_has_calls( + [call(node_with_full_refresh_set_to_true.compiled_code)] + ) + assert ( + not mock_sql_runner.get_node_schema.called + ), "If full refresh we do not look at the target node schema" + + +def test_node_full_refresh_false_does_full_refresh_when_flag_is_false( + default_flags: flags.Flags, +) -> None: + flags.set_flags(flags.Flags(full_refresh=False)) + mock_sql_runner = get_mock_sql_runner_with_all_string_columns( + ["a", "c"], ["a", "b"] + ) + node_with_full_refresh_set_to_false = SimpleNode( unique_id="node3", depends_on=[], @@ -269,60 +352,30 @@ def test_usage_of_predicted_table_and_target_table_when_full_refresh_flag_is_fal ).to_node() node_with_full_refresh_set_to_false.depends_on.deep_nodes = [] - IncrementalRunner(mock_sql_runner, Results()).run(node_with_no_full_refresh_config) - mock_sql_runner.query.assert_called_with( - node_with_no_full_refresh_config.compiled_code - ) - assert len(mock_sql_runner.get_node_schema.call_args_list) == 1 - mock_sql_runner.get_node_schema.assert_called_with(node_with_no_full_refresh_config) - - IncrementalRunner(mock_sql_runner, Results()).run( - node_with_full_refresh_set_to_true - ) - mock_sql_runner.query.assert_called_with( - node_with_full_refresh_set_to_true.compiled_code - ) - assert len(mock_sql_runner.get_node_schema.call_args_list) == 1 - IncrementalRunner(mock_sql_runner, Results()).run( node_with_full_refresh_set_to_false ) - mock_sql_runner.query.assert_called_with( - node_with_full_refresh_set_to_false.compiled_code + merge_sql = get_merge_sql( + node_with_full_refresh_set_to_false, + ["a"], + node_with_full_refresh_set_to_false.compiled_code, ) - assert len(mock_sql_runner.get_node_schema.call_args_list) == 2 - mock_sql_runner.get_node_schema.assert_called_with( - node_with_full_refresh_set_to_false + mock_sql_runner.query.assert_has_calls( + [call(node_with_full_refresh_set_to_false.compiled_code), call(merge_sql)] + ) + mock_sql_runner.get_node_schema.assert_has_calls( + [call(node_with_full_refresh_set_to_false)] ) -def test_usage_of_predicted_table_and_target_table_when_full_refresh_flag_is_true( +def test_node_with_no_full_refresh_does_full_refresh_when_flag_is_true( default_flags: flags.Flags, ) -> None: flags.set_flags(flags.Flags(full_refresh=True)) - mock_sql_runner = MagicMock() - target_table = Table( - fields=[ - TableField(name="a", type=BigQueryFieldType.STRING), - TableField(name="b", type=BigQueryFieldType.STRING), - ] - ) - predicted_table = Table( - fields=[ - TableField( - name="a", - type=BigQueryFieldType.STRING, - ), - TableField( - name="c", - type=BigQueryFieldType.STRING, - ), - ] + mock_sql_runner = get_mock_sql_runner_with_all_string_columns( + ["a", "c"], ["a", "b"] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, predicted_table, None) - mock_sql_runner.get_node_schema.return_value = target_table - node_with_no_full_refresh_config = SimpleNode( unique_id="node1", depends_on=[], @@ -330,6 +383,22 @@ def test_usage_of_predicted_table_and_target_table_when_full_refresh_flag_is_tru table_config=NodeConfig(materialized="incremental"), ).to_node() node_with_no_full_refresh_config.depends_on.deep_nodes = [] + + IncrementalRunner(mock_sql_runner, Results()).run(node_with_no_full_refresh_config) + mock_sql_runner.query.assert_called_with( + node_with_no_full_refresh_config.compiled_code + ) + assert not mock_sql_runner.get_node_schema.called + + +def test_node_with_full_refresh_does_full_refresh_when_flag_is_true( + default_flags: flags.Flags, +) -> None: + flags.set_flags(flags.Flags(full_refresh=True)) + mock_sql_runner = get_mock_sql_runner_with_all_string_columns( + ["a", "c"], ["a", "b"] + ) + node_with_full_refresh_set_to_true = SimpleNode( unique_id="node2", depends_on=[], @@ -337,19 +406,6 @@ def test_usage_of_predicted_table_and_target_table_when_full_refresh_flag_is_tru table_config=NodeConfig(materialized="incremental", full_refresh=True), ).to_node() node_with_full_refresh_set_to_true.depends_on.deep_nodes = [] - node_with_full_refresh_set_to_false = SimpleNode( - unique_id="node3", - depends_on=[], - resource_type=ManifestScheduler.MODEL, - table_config=NodeConfig(materialized="incremental", full_refresh=False), - ).to_node() - node_with_full_refresh_set_to_false.depends_on.deep_nodes = [] - - IncrementalRunner(mock_sql_runner, Results()).run(node_with_no_full_refresh_config) - mock_sql_runner.query.assert_called_with( - node_with_no_full_refresh_config.compiled_code - ) - assert len(mock_sql_runner.get_node_schema.call_args_list) == 0 IncrementalRunner(mock_sql_runner, Results()).run( node_with_full_refresh_set_to_true @@ -357,101 +413,40 @@ def test_usage_of_predicted_table_and_target_table_when_full_refresh_flag_is_tru mock_sql_runner.query.assert_called_with( node_with_full_refresh_set_to_true.compiled_code ) - assert len(mock_sql_runner.get_node_schema.call_args_list) == 0 - - IncrementalRunner(mock_sql_runner, Results()).run( - node_with_full_refresh_set_to_false - ) - mock_sql_runner.query.assert_called_with( - node_with_full_refresh_set_to_false.compiled_code - ) - assert len(mock_sql_runner.get_node_schema.call_args_list) == 1 - mock_sql_runner.get_node_schema.assert_called_with( - node_with_full_refresh_set_to_false - ) + assert not mock_sql_runner.get_node_schema.called -def test_incremental_model_that_exists_and_fails_when_schema_changed() -> None: - mock_sql_runner = MagicMock() - target_table = Table( - fields=[ - TableField(name="a", type=BigQueryFieldType.STRING), - TableField(name="b", type=BigQueryFieldType.STRING), - ] - ) - predicted_table = Table( - fields=[ - TableField( - name="a", - type=BigQueryFieldType.STRING, - ), - TableField( - name="c", - type=BigQueryFieldType.STRING, - ), - ] +def test_node_with_false_full_refresh_does_not_full_refresh_when_flag_is_true( + default_flags: flags.Flags, +) -> None: + flags.set_flags(flags.Flags(full_refresh=True)) + mock_sql_runner = get_mock_sql_runner_with_all_string_columns( + ["a", "c"], ["a", "b"] ) - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, predicted_table, None) - mock_sql_runner.get_node_schema.return_value = target_table - - node = SimpleNode( - unique_id="node1", + node_with_full_refresh_set_to_false = SimpleNode( + unique_id="node3", depends_on=[], - resource_type=ManifestScheduler.SEED, - table_config=NodeConfig(materialized="incremental", on_schema_change="fail"), + resource_type=ManifestScheduler.MODEL, + table_config=NodeConfig(materialized="incremental", full_refresh=False), ).to_node() - node.depends_on.deep_nodes = [] - - results = Results() - - model_runner = IncrementalRunner(mock_sql_runner, results) - - result = model_runner.run(node) - mock_sql_runner.query.assert_called_with(node.compiled_code) - assert result.status == DryRunStatus.FAILURE - assert isinstance(result.exception, SchemaChangeException) - + node_with_full_refresh_set_to_false.depends_on.deep_nodes = [] -def test_incremental_model_that_exists_and_success_when_schema_not_changed() -> None: - mock_sql_runner = MagicMock() - target_table = Table( - fields=[ - TableField(name="a", type=BigQueryFieldType.STRING), - TableField(name="b", type=BigQueryFieldType.STRING), - ] + IncrementalRunner(mock_sql_runner, Results()).run( + node_with_full_refresh_set_to_false ) - predicted_table = Table( - fields=[ - TableField( - name="a", - type=BigQueryFieldType.STRING, - ), - TableField( - name="b", - type=BigQueryFieldType.STRING, - ), - ] + merge_sql = get_merge_sql( + node_with_full_refresh_set_to_false, + ["a"], + node_with_full_refresh_set_to_false.compiled_code, + ) + mock_sql_runner.query.assert_has_calls( + [call(node_with_full_refresh_set_to_false.compiled_code), call(merge_sql)] + ) + assert len(mock_sql_runner.get_node_schema.call_args_list) == 1 + mock_sql_runner.get_node_schema.assert_called_with( + node_with_full_refresh_set_to_false ) - - mock_sql_runner.query.return_value = (DryRunStatus.SUCCESS, predicted_table, None) - mock_sql_runner.get_node_schema.return_value = target_table - - node = SimpleNode( - unique_id="node1", - depends_on=[], - resource_type=ManifestScheduler.SEED, - table_config=NodeConfig(materialized="incremental", on_schema_change="fail"), - ).to_node() - node.depends_on.deep_nodes = [] - - results = Results() - - model_runner = IncrementalRunner(mock_sql_runner, results) - - result = model_runner.run(node) - mock_sql_runner.query.assert_called_with(node.compiled_code) - assert result.status == DryRunStatus.SUCCESS def test_model_with_sql_header_executes_header_first() -> None: diff --git a/integration/projects/test_incremental/models/single_struct_column_append_new_columns.sql b/integration/projects/test_incremental/models/single_struct_column_append_new_columns.sql new file mode 100644 index 0000000..a7da4c0 --- /dev/null +++ b/integration/projects/test_incremental/models/single_struct_column_append_new_columns.sql @@ -0,0 +1,10 @@ +{{ + config( + materialized="incremental", + on_schema_change="append_new_columns" + ) +}} + +SELECT + my_struct +FROM (SELECT STRUCT("foo" as my_string, "bar" as my_string2) as my_struct) diff --git a/integration/projects/test_incremental/test_incremental.py b/integration/projects/test_incremental/test_incremental.py index fa14037..3c8eebf 100644 --- a/integration/projects/test_incremental/test_incremental.py +++ b/integration/projects/test_incremental/test_incremental.py @@ -1,5 +1,3 @@ -import pytest - from integration.conftest import ProjectContext from integration.utils import ( assert_report_node_has_columns, @@ -41,9 +39,6 @@ def test_single_column_append_new_columns_has_both_columns( assert_report_node_has_columns(report_node, {"my_string", "my_string2"}) -@pytest.mark.xfail( - reason="False positive if column type of incremental changes: https://github.com/autotraderuk/dbt-dry-run/issues/26" -) def test_single_column_ignore_raises_error_if_column_type_changes( compiled_project: ProjectContext, ): @@ -56,6 +51,18 @@ def test_single_column_ignore_raises_error_if_column_type_changes( assert_node_failed_with_error(run_result.report, node_id, "BadRequest") +def test_single_struct_column_append_new_columns_fails_to_add_new_field( + compiled_project: ProjectContext, +): + node_id = "model.test_incremental.single_struct_column_append_new_columns" + manifest_node = compiled_project.manifest.nodes[node_id] + columns = ["my_struct STRUCT"] + with compiled_project.create_state(manifest_node, columns): + run_result = compiled_project.dry_run() + assert_report_produced(run_result) + assert_node_failed_with_error(run_result.report, node_id, "BadRequest") + + def test_cli_full_refresh_should_use_the_model_schema( compiled_project_full_refresh: ProjectContext, ):