Skip to content

Commit

Permalink
feat: add schedule for materializing chamado per cpf table
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-milan committed Dec 10, 2024
1 parent 43cd52c commit 00a5f22
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 0 deletions.
1 change: 1 addition & 0 deletions pipelines/dump_1746/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
##
from pipelines.dump_1746.dump_db.flows import * # noqa
from pipelines.dump_1746.run_dbt_model.flows import * # noqa
Empty file.
29 changes: 29 additions & 0 deletions pipelines/dump_1746/run_dbt_model/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-
"""
Database dumping flows for segovi project.
"""

from copy import deepcopy

from prefect.run_configs import KubernetesRun # noqa
from prefect.storage import GCS
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.constants import constants
from pipelines.dump_1746.run_dbt_model.schedules import materialize_flow_schedule
from pipelines.templates.run_dbt_model.flows import templates__run_dbt_model__flow as materialize_flow

materialize_only_1746_tables = deepcopy(materialize_flow)
materialize_only_1746_tables.state_handlers = [handler_inject_bd_credentials, handler_initialize_sentry]
materialize_only_1746_tables.name = "SEGOVI: 1746 - Materializar tabelas"
materialize_only_1746_tables.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
materialize_only_1746_tables.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SEGOVI_AGENT_LABEL.value,
],
)
materialize_only_1746_tables.schedule = materialize_flow_schedule
34 changes: 34 additions & 0 deletions pipelines/dump_1746/run_dbt_model/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta

import pytz
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock

from pipelines.constants import constants

materialize_flow_schedule = Schedule(
clocks=[
# Every day at 6am
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2020, 1, 1, 6, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[constants.RJ_SEGOVI_AGENT_LABEL.value],
parameter_defaults={
"dataset_id": "adm_central_atendimento_1746",
"dbt_alias": False,
"dbt_model_parameters": {},
"dbt_model_secret_parameters": [],
"dbt_project_materialization": None,
"downstream": None,
"exclude": None,
"flags": None,
"infisical_credential_dict": None,
"table_id": "chamado_cpf",
"upstream": None,
"mode": "prod",
"materialize_to_datario": False
}
)
]
)

0 comments on commit 00a5f22

Please sign in to comment.