Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add alvaras pipeline #19

Merged
merged 13 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,19 @@
# Unzip Oracle Instant Client
FROM ubuntu:18.04 as unzip-step
COPY --from=curl-step /tmp/instantclient.zip /tmp/instantclient.zip
RUN apt-get update && \

Check failure on line 12 in Dockerfile

View workflow job for this annotation

GitHub Actions / Lint

DL3008 warning: Pin versions in apt get install. Instead of `apt-get install <package>` use `apt-get install <package>=<version>`
apt-get install --no-install-recommends -y unzip && \
rm -rf /var/lib/apt/lists/* && \
unzip /tmp/instantclient.zip -d /tmp

# Start Python image
FROM python:${PYTHON_VERSION}
SHELL ["/bin/bash", "-o", "pipefail", "-c"]

# Install a few dependencies and setup oracle instant client
WORKDIR /opt/oracle
COPY --from=unzip-step /tmp/instantclient_21_5 /opt/oracle/instantclient_21_5
RUN apt-get update && \

Check failure on line 24 in Dockerfile

View workflow job for this annotation

GitHub Actions / Lint

DL3008 warning: Pin versions in apt get install. Instead of `apt-get install <package>` use `apt-get install <package>=<version>`
apt-get install --no-install-recommends -y git curl gnupg2 libaio1 && \
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - && \
echo "deb [arch=amd64,arm64,armhf] https://packages.microsoft.com/debian/12/prod bookworm main" > /etc/apt/sources.list.d/mssql-release.list && \
Expand All @@ -45,4 +46,4 @@
# Install requirements
WORKDIR /app
COPY . .
RUN python3 -m pip install --prefer-binary --no-cache-dir -U .
RUN python3 -m pip install --prefer-binary --no-cache-dir -U .
2 changes: 2 additions & 0 deletions pipelines/atividade_economica/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
from pipelines.atividade_economica.dump_db.flows import * # noqa
51 changes: 51 additions & 0 deletions pipelines/atividade_economica/dump_db/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# -*- coding: utf-8 -*-
"""
Database dumping flows for alvaras.
"""

from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_templates.dump_db.flows import flow as dump_sql_flow
from prefeitura_rio.pipelines_utils.prefect import set_default_parameters
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.atividade_economica.dump_db.schedules import (
alvaras_infra_daily_update_schedule,
)
from pipelines.constants import constants

rj_iplanrio_alvaras_flow = deepcopy(dump_sql_flow)
rj_iplanrio_alvaras_flow.state_handlers = [
handler_inject_bd_credentials,
handler_initialize_sentry,
]
rj_iplanrio_alvaras_flow.name = "IPLANRIO: Alvaras - Ingerir tabelas de banco SQL"
rj_iplanrio_alvaras_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)

rj_iplanrio_alvaras_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMFP_AGENT_LABEL.value, # label do agente
],
)

alvaras_default_parameters = {
"db_database": "DW_BI_ALVARAS",
"db_host": "10.70.15.11",
"db_port": "1433",
"db_type": "sql_server",
"dataset_id": "atividade_economica",
"infisical_secret_path": "/db-alvaras",
}

rj_iplanrio_alvaras_flow = set_default_parameters(
rj_iplanrio_alvaras_flow,
default_parameters=alvaras_default_parameters,
)

rj_iplanrio_alvaras_flow.schedule = alvaras_infra_daily_update_schedule
224 changes: 224 additions & 0 deletions pipelines/atividade_economica/dump_db/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
# -*- coding: utf-8 -*-
"""
Schedules for the database dump pipeline.
"""

from datetime import datetime, timedelta

import pytz
from prefect.schedules import Schedule
from prefeitura_rio.pipelines_utils.io import untuple_clocks as untuple
from prefeitura_rio.pipelines_utils.prefect import generate_dump_db_schedules

from pipelines.constants import constants

#####################################
#
# Alvaras Schedules
#
#####################################

_alvaras_infra_query = {
"fact_fatoalvaras": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_Alvara,
Quantidade,
ID_AtvProcesso,
ID_CAE,
ID_CNAE,
ID_DiaDeferimento,
ID_DiaSolicitacao,
ID_DiaTaxaPagamen,
ID_Direcionamento,
ID_TipoContribuint,
ID_TipoSolicitacao
FROM DW_BI_ALVARAS.dbo.FACT_FatoAlvaras;
""",
},
"fact_fatocp": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_AtvProcesso,
Quantidade_cp,
ID_CAE,
ID_CNAE,
ID_Consulta,
ID_DiaInicial,
ID_Direcionamento,
ID_TipoContribuint,
ID_TipoSolicitacao
FROM DW_BI_ALVARAS.dbo.FACT_FatoCP;
""",
},
"tab_alvara": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_Alvara,
DSC_Alvara,
DSC_Endereco,
DSC_Bairro,
DSC_Zoneamento,
DSC_IRLF,
DSC_TipoAnalise,
DSC_TempoRespDia,
DSC_StatusIntermediario,
DSC_StatusCPL,
DSC_TempoRespMinuto,
DSC_TipoAlvara,
DSC_TaxaOriginal,
DSC_TaxaMulta,
DSC_TaxaMora,
DSC_TaxaTotal,
DSC_IsentoTaxa,
DSC_Numero,
DSC_AlvaraLiberado
FROM DW_BI_ALVARAS.dbo.TAB_ALVARA;
""",
},
"tab_atvprocesso": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_AtvProcesso,
DSC_AtvProcesso,
DSC_RespAtividade,
DSC_RefAtividade
FROM DW_BI_ALVARAS.dbo.TAB_AtvProcesso;
""",
},
"tab_cae": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_CAE,
DSC_CAE,
ID_TipoAtividade,
DSC_TipoAtividade
FROM DW_BI_ALVARAS.dbo.TAB_CAE;
""",
},
"tab_cnae": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_CNAE,
DSC_CNAE
FROM DW_BI_ALVARAS.dbo.TAB_CNAE;
""",
},
"tab_consulta": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_Consulta,
DSC_Consulta,
DSC_Endereco_cp,
DSC_Bairro_cp,
DSC_Zoneamento_cp,
DSC_CodeConsulta,
DSC_IRLF_cp,
DSC_StatusCPL_cp,
DSC_TipoAnalise_cp,
DSC_Status_cp
FROM DW_BI_ALVARAS.dbo.TAB_Consulta;
""",
},
"tab_direcionamento": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_Direcionamento,
DSC_Direcionamento
FROM DW_BI_ALVARAS.dbo.TAB_Direcionamento;
""",
},
"tab_tipocontribuinte_tipocontribuint": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_TipoContribuint,
DSC_TipoContribuint
FROM DW_BI_ALVARAS.dbo.TAB_TipoContribuinte_TipoContribuint;
""",
},
"tab_tiposolicitacao": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"materialize_to_datario": False,
"dump_to_gcs": False,
"dump_mode": "overwrite",
"execute_query": """
SELECT
ID_TipoSolicitacao,
DSC_TipoSolicitacao
FROM DW_BI_ALVARAS.dbo.TAB_TipoSolicitacao;
""",
},
}

alvaras_infra_clocks = generate_dump_db_schedules(
interval=timedelta(days=1),
start_date=datetime(2022, 3, 21, 2, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_SMFP_AGENT_LABEL.value,
],
db_database="DW_BI_ALVARAS",
db_host="10.70.15.11",
db_port="1433",
db_type="sql_server",
dataset_id="atividade_economica",
infisical_secret_path="/db-alvaras",
table_parameters=_alvaras_infra_query,
)

alvaras_infra_daily_update_schedule = Schedule(clocks=untuple(alvaras_infra_clocks))
1 change: 1 addition & 0 deletions pipelines/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""
Imports all flows for every project so we can register all of them...
"""
from pipelines.atividade_economica import * # noqa
from pipelines.egpweb_metas import * # noqa
from pipelines.ergon import * # noqa
from pipelines.ergon_comlurb import * # noqa
Expand Down
14 changes: 14 additions & 0 deletions queries/models/atividade_economica/fact_fatoalvaras.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
SELECT
SAFE_CAST(ID_Alvara AS STRING) AS ID_Alvara,
SAFE_CAST(Quantidade AS FLOAT64) AS Quantidade,
SAFE_CAST(ID_AtvProcesso AS STRING) AS ID_AtvProcesso,
SAFE_CAST(ID_CAE AS STRING) AS ID_CAE,
SAFE_CAST(ID_CNAE AS STRING) AS ID_CNAE,
SAFE_CAST(ID_DiaDeferimento AS STRING) AS ID_DiaDeferimento,
SAFE_CAST(ID_DiaSolicitacao AS STRING) AS ID_DiaSolicitacao,
SAFE_CAST(ID_DiaTaxaPagamen AS STRING) AS ID_DiaTaxaPagamen,
SAFE_CAST(ID_Direcionamento AS STRING) AS ID_Direcionamento,
SAFE_CAST(ID_TipoContribuint AS STRING) AS ID_TipoContribuint,
SAFE_CAST(ID_TipoSolicitacao AS STRING) AS ID_TipoSolicitacao

FROM `rj-iplanrio.alvaras_staging.fact_fatoalvaras`
12 changes: 12 additions & 0 deletions queries/models/atividade_economica/fact_fatocp.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
SELECT
SAFE_CAST(ID_AtvProcesso AS STRING) AS ID_AtvProcesso,
SAFE_CAST(Quantidade_cp AS FLOAT64) AS Quantidade_cp,
SAFE_CAST(ID_CAE AS STRING) AS ID_CAE,
SAFE_CAST(ID_CNAE AS STRING) AS ID_CNAE,
SAFE_CAST(ID_Consulta AS STRING) AS ID_Consulta,
SAFE_CAST(ID_DiaInicial AS STRING) AS ID_DiaInicial,
SAFE_CAST(ID_Direcionamento AS STRING) AS ID_Direcionamento,
SAFE_CAST(ID_TipoContribuint AS STRING) AS ID_TipoContribuint,
SAFE_CAST(ID_TipoSolicitacao AS STRING) AS ID_TipoSolicitacao

FROM `rj-iplanrio.alvaras_staging.fact_fatocp`
22 changes: 22 additions & 0 deletions queries/models/atividade_economica/tab_alvara.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
SELECT
SAFE_CAST(ID_Alvara AS STRING) AS ID_Alvara,
SAFE_CAST(DSC_Alvara AS STRING) AS DSC_Alvara,
SAFE_CAST(DSC_Endereco AS STRING) AS DSC_Endereco,
SAFE_CAST(DSC_Bairro AS STRING) AS DSC_Bairro,
SAFE_CAST(DSC_Zoneamento AS STRING) AS DSC_Zoneamento,
SAFE_CAST(DSC_IRLF AS STRING) AS DSC_IRLF,
SAFE_CAST(DSC_TipoAnalise AS STRING) AS DSC_TipoAnalise,
SAFE_CAST(DSC_TempoRespDia AS FLOAT64) AS DSC_TempoRespDia,
SAFE_CAST(DSC_StatusIntermediario AS STRING) AS DSC_StatusIntermediario,
SAFE_CAST(DSC_StatusCPL AS STRING) AS DSC_StatusCPL,
SAFE_CAST(DSC_TempoRespMinuto AS FLOAT64) AS DSC_TempoRespMinuto,
SAFE_CAST(DSC_TipoAlvara AS STRING) AS DSC_TipoAlvara,
SAFE_CAST(DSC_TaxaOriginal AS FLOAT64) AS DSC_TaxaOriginal,
SAFE_CAST(DSC_TaxaMulta AS FLOAT64) AS DSC_TaxaMulta,
SAFE_CAST(DSC_TaxaMora AS FLOAT64) AS DSC_TaxaMora,
SAFE_CAST(DSC_TaxaTotal AS FLOAT64) AS DSC_TaxaTotal,
SAFE_CAST(DSC_IsentoTaxa AS STRING) AS DSC_IsentoTaxa,
SAFE_CAST(DSC_Numero AS FLOAT64) AS DSC_Numero,
SAFE_CAST(DSC_AlvaraLiberado AS STRING) AS DSC_AlvaraLiberado

FROM `rj-iplanrio.alvaras_staging.tab_alvara`
7 changes: 7 additions & 0 deletions queries/models/atividade_economica/tab_atvprocesso.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
SELECT
SAFE_CAST(ID_AtvProcesso AS STRING) AS ID_AtvProcesso,
SAFE_CAST(DSC_AtvProcesso AS STRING) AS DSC_AtvProcesso,
SAFE_CAST(DSC_RespAtividade AS STRING) AS DSC_RespAtividade,
SAFE_CAST(DSC_RefAtividade AS STRING) AS DSC_RefAtividade

FROM `rj-iplanrio.alvaras_staging.tab_atvprocesso`
7 changes: 7 additions & 0 deletions queries/models/atividade_economica/tab_cae.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
SELECT
SAFE_CAST(ID_CAE AS STRING) AS ID_CAE,
SAFE_CAST(DSC_CAE AS STRING) AS DSC_CAE,
SAFE_CAST(ID_TipoAtividade AS STRING) AS ID_TipoAtividade,
SAFE_CAST(DSC_TipoAtividade AS STRING) AS DSC_TipoAtividade

FROM `rj-iplanrio.alvaras_staging.tab_cae`
5 changes: 5 additions & 0 deletions queries/models/atividade_economica/tab_cnae.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
SELECT
SAFE_CAST(ID_CNAE AS STRING) AS ID_CNAE,
SAFE_CAST(DSC_CNAE AS STRING) AS DSC_CNAE

FROM `rj-iplanrio.alvaras_staging.tab_cnae`
13 changes: 13 additions & 0 deletions queries/models/atividade_economica/tab_consulta.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
SELECT
SAFE_CAST(ID_Consulta AS STRING) AS ID_Consulta,
SAFE_CAST(DSC_Consulta AS STRING) AS DSC_Consulta,
SAFE_CAST(DSC_Endereco_cp AS STRING) AS DSC_Endereco_cp,
SAFE_CAST(DSC_Bairro_cp AS STRING) AS DSC_Bairro_cp,
SAFE_CAST(DSC_Zoneamento_cp AS STRING) AS DSC_Zoneamento_cp,
SAFE_CAST(DSC_CodeConsulta AS FLOAT64) AS DSC_CodeConsulta,
SAFE_CAST(DSC_IRLF_cp AS STRING) AS DSC_IRLF_cp,
SAFE_CAST(DSC_StatusCPL_cp AS STRING) AS DSC_StatusCPL_cp,
SAFE_CAST(DSC_TipoAnalise_cp AS STRING) AS DSC_TipoAnalise_cp,
SAFE_CAST(DSC_Status_cp AS STRING) AS DSC_Status_cp

FROM `rj-iplanrio.alvaras_staging.tab_consulta`
Loading
Loading