Skip to content

Commit

Permalink
refactor: handle batch update error and success in different functions
Browse files Browse the repository at this point in the history
  • Loading branch information
njuguna-n committed Oct 7, 2024
1 parent 8bd222b commit 0f3405f
Showing 1 changed file with 15 additions and 10 deletions.
25 changes: 15 additions & 10 deletions dbt/dbt-run.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,17 @@ def get_max_timestamp():
FROM {os.getenv('POSTGRES_SCHEMA')}.document_metadata
""")
return cur.fetchone()[0]

def handle_batch_update_error(last_processed_timestamp, dataemon_interval):
print(f"Error running dbt for batch {last_processed_timestamp}")
update_batch_status(last_processed_timestamp, "error")
time.sleep(dataemon_interval)

def handle_batch_update_success(last_processed_timestamp, dataemon_interval, max_timestamp):
update_batch_status(last_processed_timestamp, "success")

if max_timestamp == last_processed_timestamp:
time.sleep(dataemon_interval)

def run_dbt_in_batches():
last_processed_timestamp = get_last_processed_timestamp()
Expand All @@ -199,19 +210,13 @@ def run_dbt_in_batches():
"--vars", f'{{start_timestamp: "{last_processed_timestamp}", batch_size: {batch_size}}}',
"--exclude", "config.materialized:view"
])

if result.returncode != 0:
print("Error running dbt")
update_batch_status(last_processed_timestamp, "error")
time.sleep(dataemon_interval)
continue
handle_batch_update_error(last_processed_timestamp, dataemon_interval)
continue

update_batch_status(last_processed_timestamp, "success")
max_timestamp = get_max_timestamp()

if max_timestamp == last_processed_timestamp:
time.sleep(dataemon_interval)
continue
handle_batch_update_success(last_processed_timestamp, dataemon_interval, max_timestamp)

last_processed_timestamp = max_timestamp

Expand Down

0 comments on commit 0f3405f

Please sign in to comment.