Skip to content

Commit

Permalink
addind some adm_licenca_urbanismo queries
Browse files Browse the repository at this point in the history
  • Loading branch information
patriciacatandi committed Apr 19, 2024
1 parent 932b598 commit 53c6dd3
Show file tree
Hide file tree
Showing 12 changed files with 244 additions and 38 deletions.
6 changes: 1 addition & 5 deletions .github/workflows/scripts/register_flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,7 @@ def build_and_register( # pylint: disable=too-many-branches
while attempts < max_retries:
attempts += 1
try:
(
flow_id,
flow_version,
is_new,
) = register_serialized_flow(
(flow_id, flow_version, is_new,) = register_serialized_flow(
client=client,
serialized_flow=serialized_flow,
project_id=project_id,
Expand Down
2 changes: 2 additions & 0 deletions pipelines/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
from pipelines.silic.dump_db_sislic.flows import * # noqa
2 changes: 1 addition & 1 deletion pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class constants(Enum):
######################################
# Agent labels
######################################
# EXAMPLE_AGENT_LABEL = "example_agent"
RJ_SMDUE_AGENT_LABEL = "smdue"

######################################
# Other constants
Expand Down
2 changes: 0 additions & 2 deletions pipelines/exemplo/__init__.py

This file was deleted.

20 changes: 0 additions & 20 deletions pipelines/exemplo/nome_do_objetivo/flows.py

This file was deleted.

8 changes: 0 additions & 8 deletions pipelines/exemplo/nome_do_objetivo/tasks.py

This file was deleted.

5 changes: 3 additions & 2 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
"""
Imports all flows for every project so we can register all of them.
Imports all flows for every project so we can register all of them
"""
from pipelines.exemplo import * # noqa
from pipelines.sislic import * # noqa
from pipelines.templates import * # noqa
46 changes: 46 additions & 0 deletions pipelines/sislic/dump_db_sislic/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
"""
Database dumping flows for segovi project (SISLIC)
"""

from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS

from prefeitura_rio.pipelines_templates.dump_db.flows import flow as dump_sql_flow
from prefeitura_rio.pipelines_utils.prefect import set_default_parameters
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.constants import constants
from pipelines.sislic.dump_db_sislic.schedules import (
update_schedule,
)

dump_sislic_flow = deepcopy(dump_sql_flow)
dump_sislic_flow.name = "SMDUE: SISLIC - Ingerir tabelas de banco SQL"
dump_sislic_flow.state_handlers = [handler_inject_bd_credentials, handler_initialize_sentry]
dump_sislic_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
dump_sislic_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMDUE_AGENT_LABEL.value,
],
)

sislic_default_parameters = {
"db_database": "SMU_PRD",
"db_host": "10.2.221.101",
"db_port": "1433",
"db_type": "sql_server",
"infisical_secret_path": "/db_sislic",
"dataset_id": "adm_licenca_urbanismo",
}
dump_sislic_flow = set_default_parameters(
dump_sislic_flow, default_parameters=sislic_default_parameters
)

dump_sislic_flow.schedule = update_schedule
153 changes: 153 additions & 0 deletions pipelines/sislic/dump_db_sislic/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# -*- coding: utf-8 -*-
"""
Schedules for the database dump pipeline
"""

from datetime import timedelta, datetime

from prefect.schedules import Schedule
import pytz

from prefeitura_rio.pipelines_utils.io import untuple_clocks as untuple
from prefeitura_rio.pipelines_utils.prefect import generate_dump_db_schedules

from pipelines.constants import constants

sislic_queries = {
"documento": {
"materialize_after_dump": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT
codDocumento,
numDocumento,
codOrgao,
codTipoDocumento,
MatricCadastrador,
dtAbertura,
dtCadastroSistema,
codAssunto,
codDocumentoOrigem,
descStatus,
ID_CLASSIFICACAO_PROCESSO,
ID_ORIGEM_CLASSIFICACAO_PROCESSO,
ID_TIPO_PROCESSO,
ANO_REQUERIMENTO,
NUMERO_REQUERIMENTO
FROM SMU_PRD.dbo.tbPTI_Documentos
""",
},
"tipo_documento": {
"materialize_after_dump": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT
codTipoDocumento,
descTipoDocumento
FROM SMU_PRD.dbo.tbPTI_TiposDocumentos
""",
},
"assunto_processo": {
"materialize_after_dump": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT
codAssuntoN1,
codAssuntoN2,
codAssuntoN3,
codAssunto,
Status,
codAssunto_ReqOnLine,
Exige_Endereco,
Exige_ProcessoConstrucao,
Exige_CPF_CNPJ_Requerente,
AssuntoProcessoRio
FROM SMU_PRD.dbo.tbPTI_Assunto
""",
},
# "cronograma_financeiro": {
# "materialize_after_dump": True,
# "materialization_mode": "prod",
# "dump_mode": "overwrite",
# "execute_query": """
# Select
# DISTINCT
# CD_OBRA,
# ETAPA,
# DT_INICIO_ETAPA,
# DT_FIM_ETAPA,
# PC_PERCENTUAL,
# VL_ESTIMADO
# from dbo.fuSEGOVI_Cronograma_Financeiro()
# """,
# },
# "localizacao": {
# "materialize_after_dump": True,
# "materialization_mode": "prod",
# "dump_mode": "overwrite",
# "execute_query": """
# Select
# CD_OBRA,
# ENDERECO,
# NM_BAIRRO,
# NM_RA,
# NM_AP
# from dbo.fuSEGOVI_Localizacoes_obra()
# """,
# },
# "cronograma_alteracao": {
# "materialize_after_dump": True,
# "materialization_mode": "prod",
# "dump_mode": "overwrite",
# "execute_query": """
# Select
# DISTINCT
# CD_OBRA,
# NR_PROCESSO,
# TP_ALTERACAO,
# DT_PUBL_DO,
# CD_ETAPA,
# NR_PRAZO,
# DT_VALIDADE,
# DS_OBSERVACAO
# from dbo.fuSEGOVI_Alteração_de_Cronograma()
# """,
# },
# "programa_fonte": {
# "materialize_after_dump": True,
# "materialization_mode": "prod",
# "dump_mode": "overwrite",
# "execute_query": """
# Select
# DISTINCT
# CD_OBRA,
# CD_PRG_TRAB,
# PROGRAMA_TRABALHO,
# CD_FONTE_RECURSO,
# FONTE_RECURSO,
# CD_NATUREZA_DSP,
# NATUREZA_DESPESA
# from dbo.fuSEGOVI_Programa_Fonte()
# """,
# },
}

sislic_clocks = generate_dump_db_schedules(
interval=timedelta(days=1),
start_date=datetime(2022, 12, 19, 1, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_SMDUE_AGENT_LABEL.value,
],
db_database="SMU_PRD",
db_host="10.2.221.101",
db_port="1433",
db_type="sql_server",
dataset_id="adm_licenca_urbanismo",
infisical_secret_path="/db_sislic",
table_parameters=sislic_queries,
)

update_schedule = Schedule(clocks=untuple(sislic_clocks))
2 changes: 2 additions & 0 deletions pipelines/templates/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
from pipelines.templates.run_dbt_model.flows import * # noqa: F401, F403
36 changes: 36 additions & 0 deletions pipelines/templates/run_dbt_model/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
"""
MATERIALIZA MODELOS DO DBT.
"""

from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_templates.run_dbt_model.flows import (
templates__run_dbt_model__flow,
)
from prefeitura_rio.pipelines_utils.prefect import set_default_parameters
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants

templates__run_dbt_model_smi__flow = deepcopy(templates__run_dbt_model__flow)
templates__run_dbt_model_smi__flow.state_handlers = [handler_inject_bd_credentials]

templates__run_dbt_model_smi__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
templates__run_dbt_model_smi__flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMI_AGENT_LABEL.value,
],
)

templates_run_dbt_model_smi_default_parameters = {
"dataset_id": "dataset_id",
"table_id": "table_id",
}
templates__run_dbt_model_smi__flow = set_default_parameters(
templates__run_dbt_model_smi__flow,
default_parameters=templates_run_dbt_model_smi_default_parameters,
)

0 comments on commit 53c6dd3

Please sign in to comment.