Skip to content

Commit

Permalink
Rename the template to scd1_upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
sbuldeev committed Nov 29, 2024
1 parent 9161cba commit f30595e
Show file tree
Hide file tree
Showing 14 changed files with 77 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ def run(job_input: IJobInput):
- copy the data from staging to target table
3. Copying the data:
- truncate target table and insert the data from staging table
4. Drop staging table
"""

job_arguments = job_input.get_arguments()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
### Purpose:

This template can be used to load raw data from a database to target table in a database.
This template can be used to load raw data from a database to target 'Slowly Changing Dimension Type 1' table with specific implementation.
In summary, it upserts the target table with the source data.

### Template Name (template_name):

- "scd_upsert"
- "scd1_upsert"

### Template Parameters (template_args):

- target_schema - database schema, where target data is loaded
- target_table - database table where target data is loaded
- source_schema - database schema, where source raw data is loaded from
- source_view - database view, where source raw data is loaded from
- id_colum - column that will be used for tracking which row should be updated and which inserted
- id_column - column that will be used for tracking which row should be updated and which inserted
- check - (Optional) Callback function responsible for checking the quality of the data. Takes in a table name as a parameter which will be used for data validation
- staging_schema - (Optional) Schema where the checks will be executed. If not provided target_schema will be used as default

Expand Down Expand Up @@ -44,6 +44,6 @@ def run(job_input):
'target_table': 'dim_sddc',
'id_column': 'dim_sddc_id'
}
job_input.execute_template("scd_upsert", template_args, database="trino")
job_input.execute_template("scd1_upsert", template_args, database="trino")
# . . .
```
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ def initialize_job(self, context: JobContext):
)

context.templates.add_template(
"scd_upsert",
pathlib.Path(get_job_path("load/dimension/scd_upsert")),
"scd1_upsert",
pathlib.Path(get_job_path("load/dimension/scd1_upsert")),
connection_name,
)

Expand Down
40 changes: 40 additions & 0 deletions test-new-template/20_python_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2023-2024 Broadcom
# SPDX-License-Identifier: Apache-2.0
import logging

from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)


def run(job_input: IJobInput):
"""
Function named `run` is required in order for a python script to be recognized as a Data Job Python step and executed.
VDK provides to every python step an object - job_input - that has methods for:
* executing queries to OLAP Database;
* ingesting data into a database;
* processing data into a database.
See IJobInput documentation for more details.
"""
log.info(f"Starting job step {__name__}")

def sample_check(random_table_name):
return True

# Write your python code inside here ... for example:
job_input.execute_template(
template_name='scd_upsert',
template_args={
'source_schema': 'starshot_internal_dw_stg',
'source_view': 'sbuldeev_vw_template_test',
'target_schema': 'starshot_internal_dw_stg',
'target_table': 'sbuldeev_dw_template_test',
'id_column': 'org_id',
'check': sample_check
},
database="trino",
)

job_input.execute_query("SELECT 1")
28 changes: 28 additions & 0 deletions test-new-template/config.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
; Supported format: https://docs.python.org/3/library/configparser.html#supported-ini-file-structure

; This is the only file required to deploy a Data Job.
; Read more to understand what each option means:

; Information about the owner of the Data Job
[owner]

; Team is a way to group Data Jobs that belonged to the same team.
team = my-team

; Configuration related to running data jobs
[job]
; For format see https://en.wikipedia.org/wiki/Cron
; The cron expression is evaluated in UTC time.
; If it is time for a new job run and the previous job run hasn’t finished yet,
; the cron job waits until the previous execution has finished.
schedule_cron = 11 23 5 8 1

[vdk]
db_default_type=trino
trino_user=sb004367
trino_password=1597415974Ss%%
trino_host=trino.broadcom.net
trino_port=443
trino_schema=sc_hms
trino_use_ssl=True
trino_catalog=sc_hms
3 changes: 3 additions & 0 deletions test-new-template/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Python jobs can specify extra library dependencies in requirements.txt file.
# See https://pip.readthedocs.io/en/stable/user_guide/#requirements-files
# The file is optional and can be deleted if no extra library dependencies are necessary.

0 comments on commit f30595e

Please sign in to comment.