Skip to content

Commit

Permalink
fix: Handle reserved words in db, schema and table names with the `no…
Browse files Browse the repository at this point in the history
…rmalize_name` dialect helper

Co-authored-by: Pat Nadolny <[email protected]>
  • Loading branch information
edgarrmondragon and pnadolny13 committed Aug 13, 2024
1 parent 0365442 commit df5d707
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 10 deletions.
13 changes: 8 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ jobs:
fail-fast: false
matrix:
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
- "3.12"
os: ["ubuntu-latest", "macos-latest", "windows-latest"]
# - "3.11"
# - "3.10"
# - "3.9"
# - "3.8"
os:
- "ubuntu-latest"
# - "macos-latest"
# - "windows-latest"
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
Expand Down
9 changes: 7 additions & 2 deletions target_snowflake/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from snowflake.sqlalchemy import URL
from snowflake.sqlalchemy.base import SnowflakeIdentifierPreparer
from snowflake.sqlalchemy.snowdialect import SnowflakeDialect
from sqlalchemy.sql import quoted_name, text
from sqlalchemy.sql import text

from target_snowflake.snowflake_types import NUMBER, TIMESTAMP_NTZ, VARIANT

Expand Down Expand Up @@ -61,6 +61,11 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.schema_cache: dict = {}
super().__init__(*args, **kwargs)

@property
def dialect(self) -> SnowflakeDialect:
"""Return a Snowflake dialect instance."""
return self._engine.dialect

def get_table_columns(
self,
full_table_name: str,
Expand Down Expand Up @@ -388,7 +393,7 @@ def _get_merge_from_stage_statement( # noqa: ANN202
dedup = f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {dedup_cols} ORDER BY SEQ8() DESC) = 1"
return (
text(
f"merge into {quoted_name(full_table_name, quote=True)} d using " # noqa: ISC003
f"merge into {full_table_name} d using " # noqa: ISC003
+ f"(select {json_casting_selects} from '@~/target-snowflake/{sync_id}'" # noqa: S608
+ f"(file_format => {file_format}) {dedup}) s "
+ f"on {join_expr} "
Expand Down
5 changes: 3 additions & 2 deletions target_snowflake/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
}


class SnowflakeSink(SQLSink):
class SnowflakeSink(SQLSink[SnowflakeConnector]):
"""Snowflake target sink class."""

connector_class = SnowflakeConnector
Expand Down Expand Up @@ -64,7 +64,8 @@ def database_name(self) -> str | None:

@property
def table_name(self) -> str:
return super().table_name.upper()
table = super().table_name
return self.connector.dialect.identifier_preparer.quote(table).upper()

def setup(self) -> None:
"""Set up Sink.
Expand Down
63 changes: 62 additions & 1 deletion tests/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import typing as t
from pathlib import Path

import pytest
Expand All @@ -25,6 +26,9 @@
)
from singer_sdk.testing.templates import TargetFileTestTemplate

if t.TYPE_CHECKING:
from target_snowflake.connector import SnowflakeConnector


class SnowflakeTargetArrayData(TargetArrayData):
def validate(self) -> None:
Expand Down Expand Up @@ -65,7 +69,7 @@ def validate(self) -> None:

class SnowflakeTargetCamelcaseComplexSchema(TargetCamelcaseComplexSchema):
def validate(self) -> None:
connector = self.target.default_sink_class.connector_class(self.target.config)
connector: SnowflakeConnector = self.target.default_sink_class.connector_class(self.target.config)
table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.ForecastingTypeToCategory".upper() # noqa: E501
table_schema = connector.get_table(table)
expected_types = {
Expand Down Expand Up @@ -458,6 +462,61 @@ def setup(self) -> None:
)


class SnowflakeTargetExistingReservedNameTableAlter(TargetFileTestTemplate):
name = "existing_reserved_name_table_alter"
# This sends a schema that will request altering from TIMESTAMP_NTZ to VARCHAR

@property
def singer_filepath(self) -> Path:
current_dir = Path(__file__).resolve().parent
return current_dir / "target_test_streams" / "reserved_words_in_table.singer"

def setup(self) -> None:
connector = self.target.default_sink_class.connector_class(self.target.config)
table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.\"order\"".upper()
connector.connection.execute(
f"""
CREATE OR REPLACE TABLE {table} (
ID VARCHAR(16777216),
COL_STR VARCHAR(16777216),
COL_TS TIMESTAMP_NTZ(9),
COL_INT STRING,
COL_BOOL BOOLEAN,
COL_VARIANT VARIANT,
_SDC_BATCHED_AT TIMESTAMP_NTZ(9),
_SDC_DELETED_AT VARCHAR(16777216),
_SDC_EXTRACTED_AT TIMESTAMP_NTZ(9),
_SDC_RECEIVED_AT TIMESTAMP_NTZ(9),
_SDC_SEQUENCE NUMBER(38,0),
_SDC_TABLE_VERSION NUMBER(38,0),
PRIMARY KEY (ID)
)
""",
)


class SnowflakeTargetReservedWordsInTable(TargetFileTestTemplate):
# Contains reserved words from
# https://docs.snowflake.com/en/sql-reference/reserved-keywords
# Syncs records then alters schema by adding a non-reserved word column.
name = "reserved_words_in_table"

@property
def singer_filepath(self) -> Path:
current_dir = Path(__file__).resolve().parent
return current_dir / "target_test_streams" / "reserved_words_in_table.singer"

def validate(self) -> None:
connector = self.target.default_sink_class.connector_class(self.target.config)
table = f"{self.target.config['database']}.{self.target.config['default_target_schema']}.\"order\"".upper()
result = connector.connection.execute(
f"select * from {table}",
)
assert result.rowcount == 1
row = result.first()
assert len(row) == 13, f"Row has unexpected length {len(row)}"


class SnowflakeTargetTypeEdgeCasesTest(TargetFileTestTemplate):
name = "type_edge_cases"

Expand Down Expand Up @@ -540,6 +599,8 @@ def singer_filepath(self) -> Path:
SnowflakeTargetColonsInColName,
SnowflakeTargetExistingTable,
SnowflakeTargetExistingTableAlter,
SnowflakeTargetExistingReservedNameTableAlter,
SnowflakeTargetReservedWordsInTable,
SnowflakeTargetTypeEdgeCasesTest,
SnowflakeTargetColumnOrderMismatch,
],
Expand Down
2 changes: 2 additions & 0 deletions tests/target_test_streams/reserved_words_in_table.singer
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{ "type": "SCHEMA", "stream": "order", "schema": { "properties": { "id": { "type": [ "string", "null" ] }, "col_str": { "type": [ "string", "null" ] }, "col_ts": { "format": "date-time", "type": [ "string", "null" ] }, "col_int": { "type": "integer" }, "col_bool": { "type": [ "boolean", "null" ] }, "col_variant": {"type": "object"} }, "type": "object" }, "key_properties": [ "id" ], "bookmark_properties": [ "col_ts" ] }
{ "type": "RECORD", "stream": "order", "record": { "id": "123", "col_str": "foo", "col_ts": "2023-06-13 11:50:04.072", "col_int": 5, "col_bool": true, "col_variant": {"key": "val"} }, "time_extracted": "2023-06-14T18:08:23.074716+00:00" }

0 comments on commit df5d707

Please sign in to comment.