Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: name_conforming_strategy #213

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Postgres target sink class, which handles writing streams."""

import copy
import uuid
from typing import Any, Dict, Iterable, List, Optional, Union

Expand All @@ -23,6 +24,29 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.temp_table_name = self.generate_temp_table_name()

def conform_schema(self, schema: dict) -> dict:
"""Return schema dictionary with property names conformed.

Override from self.conform_name(key) to self.conform_name(key, "column")

Args:
schema: JSON schema dictionary.

Returns:
A schema dictionary with the property names conformed.
"""
conformed_schema = copy.copy(schema)
conformed_property_names = {
key: self.conform_name(key, "column")
for key in conformed_schema["properties"]
}
self._check_conformed_names_not_duplicated(conformed_property_names)
conformed_schema["properties"] = {
conformed_property_names[key]: value
for key, value in conformed_schema["properties"].items()
}
return conformed_schema

@property
def append_only(self) -> bool:
"""Return True if the target is append only."""
Expand All @@ -48,7 +72,7 @@ def setup(self) -> None:
with self.connector._connect() as connection:
self.connector.prepare_table(
full_table_name=self.full_table_name,
schema=self.schema,
schema=self.conform_schema(self.schema),
primary_keys=self.key_properties,
connection=connection,
as_temp_table=False,
Expand All @@ -63,12 +87,20 @@ def process_batch(self, context: dict) -> None:
Args:
context: Stream partition or context dictionary.
"""
records: list = []

for record in context["records"]:
new_record: dict = {}
for k, v in record.items():
new_record.update({self.conform_name(k, "column"): v})
records.append(new_record)

# Use one connection so we do this all in a single transaction
with self.connector._connect() as connection:
# Check structure of table
table: sqlalchemy.Table = self.connector.prepare_table(
full_table_name=self.full_table_name,
schema=self.schema,
schema=self.conform_schema(self.schema),
primary_keys=self.key_properties,
as_temp_table=False,
connection=connection,
Expand All @@ -83,16 +115,16 @@ def process_batch(self, context: dict) -> None:
# Insert into temp table
self.bulk_insert_records(
table=temp_table,
schema=self.schema,
schema=self.conform_schema(self.schema),
primary_keys=self.key_properties,
records=context["records"],
records=records,
connection=connection,
)
# Merge data from Temp table to main table
self.upsert(
from_table=temp_table,
to_table=table,
schema=self.schema,
schema=self.conform_schema(self.schema),
join_keys=self.key_properties,
connection=connection,
)
Expand Down Expand Up @@ -218,7 +250,7 @@ def upsert(
# Update
where_condition = join_condition
update_columns = {}
for column_name in self.schema["properties"].keys():
for column_name in self.conform_schema(self.schema)["properties"].keys():
from_table_column: sqlalchemy.Column = from_table.columns[column_name]
to_table_column: sqlalchemy.Column = to_table.columns[column_name]
update_columns[to_table_column] = from_table_column
Expand Down Expand Up @@ -263,7 +295,10 @@ def generate_insert_statement(

def conform_name(self, name: str, object_type: Optional[str] = None) -> str:
"""Conforming names of tables, schemas, column names."""
return name
if object_type in self.config["name_conforming_strategy"]:
return super().conform_name(name, object_type)
else:
return name

@property
def schema_name(self) -> Optional[str]:
Expand Down
10 changes: 10 additions & 0 deletions target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,16 @@ def __init__(
required=False,
description="SSH Tunnel Configuration, this is a json object",
),
th.Property(
"name_conforming_strategy",
th.ArrayType(th.StringType),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be a string right?

default=[],
description=(
"If left as an empty array (the default), will not perform any name "
"conforming. Add `table` to the array to conform table names to snake "
"case. Add `column` to the array to conform column names to snake case."
),
),
).to_dict()
default_sink_class = PostgresSink

Expand Down
1 change: 1 addition & 0 deletions target_postgres/tests/test_standard_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def postgres_config():
"add_record_metadata": True,
"hard_delete": False,
"default_target_schema": "melty",
"name_conforming_strategy": ["table", "column"],
}


Expand Down