Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dump Serpro #249

Open
wants to merge 42 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
589d191
dump serpro
akaBotelho Sep 26, 2024
05860c4
Merge branch 'main' into staging/radar-serpro
mergify[bot] Sep 26, 2024
d89830d
log
akaBotelho Sep 26, 2024
f1d633d
print files '/app'
akaBotelho Sep 27, 2024
b1a3e98
comenta wait_sleeping
akaBotelho Sep 27, 2024
96ebcf0
Merge branch 'main' into staging/radar-serpro
mergify[bot] Sep 27, 2024
25c64a4
wait_sleeping
akaBotelho Sep 27, 2024
8aa170d
Merge branch 'staging/radar-serpro' of https://github.com/prefeitura-…
akaBotelho Sep 27, 2024
ed43dfe
retry get_db_object
akaBotelho Sep 27, 2024
2310af6
retry_delay
akaBotelho Sep 27, 2024
a8545f7
log secret
akaBotelho Sep 27, 2024
725bc2c
param secret
akaBotelho Sep 27, 2024
0500cc4
checkpoint get_db_object
akaBotelho Sep 27, 2024
cf93ee6
makedirs
akaBotelho Sep 27, 2024
3b54cb5
fix path
akaBotelho Sep 27, 2024
34eabef
handler_inject_bd_credentials
akaBotelho Sep 27, 2024
6a734a9
teste
akaBotelho Oct 2, 2024
25ba132
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 3, 2024
d5e31cc
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 4, 2024
8a0955b
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 4, 2024
5e3f756
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 6, 2024
c48d72e
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 7, 2024
0837091
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 8, 2024
eaceac3
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 9, 2024
ae7c598
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 9, 2024
3bd23be
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 9, 2024
24f6f53
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 9, 2024
d68255b
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 10, 2024
15709fe
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 16, 2024
65c3108
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 16, 2024
a32552f
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 16, 2024
eb141c5
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 17, 2024
c769986
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 21, 2024
9e6c0cd
altera query para teste
akaBotelho Oct 29, 2024
b4081e7
Merge branch 'staging/radar-serpro' of https://github.com/prefeitura-…
akaBotelho Oct 29, 2024
eb53f11
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 29, 2024
251da00
altera flow
akaBotelho Oct 29, 2024
af56f98
Merge branch 'staging/radar-serpro' of https://github.com/prefeitura-…
akaBotelho Oct 29, 2024
b132d87
Merge branch 'main' into staging/radar-serpro
mergify[bot] Oct 29, 2024
ea1d9b6
Merge branch 'main' into staging/radar-serpro
mergify[bot] Dec 19, 2024
eb16b49
Merge branch 'main' into staging/radar-serpro
mergify[bot] Dec 20, 2024
30aceb3
Merge branch 'main' into staging/radar-serpro
mergify[bot] Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions pipelines/serpro/flows.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,39 @@
# -*- coding: utf-8 -*-
from prefect import Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS

# from prefect.utilities.edges import unmapped
from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants as smtr_constants
from pipelines.serpro.tasks import wait_sleeping

# from pipelines.migration.tasks import upload_raw_data_to_gcs
from pipelines.serpro.tasks import dump_serpro, get_db_object, wait_sleeping
from pipelines.serpro.utils import handler_setup_serpro

with Flow("SMTR - Teste Conexão Serpro") as flow:
batch_size = Parameter("batch_size", default=100000)
# setup_serpro()

jdbc = get_db_object()
csv_files = dump_serpro(jdbc, batch_size)

wait_sleeping()

# upload_raw_data_to_gcs.map(
# dataset_id=unmapped("radar_serpro"),
# table_id=unmapped("tb_infracao_view"),
# raw_filepath=csv_files,
# partitions=unmapped(None),
# error=unmapped(None),
# bucket_name=unmapped("rj-smtr-dev"),
# )

flow.storage = GCS(smtr_constants.GCS_FLOWS_BUCKET.value)
flow.run_config = KubernetesRun(
image=smtr_constants.DOCKER_IMAGE_FEDORA.value,
labels=[smtr_constants.RJ_SMTR_AGENT_LABEL.value],
)
flow.state_handlers = [handler_setup_serpro]
flow.state_handlers = [handler_setup_serpro, handler_inject_bd_credentials]
67 changes: 66 additions & 1 deletion pipelines/serpro/tasks.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,81 @@
# -*- coding: utf-8 -*-
import csv
import os
from datetime import timedelta
from time import sleep
from typing import List

from prefect import task

from pipelines.utils.jdbc import JDBC
from pipelines.utils.secret import get_secret
from pipelines.utils.utils import log


@task
def wait_sleeping(interval_seconds: int = 54000, wait=None):
sleep(interval_seconds)


@task
@task(checkpoint=False, max_retries=3, retry_delay=timedelta(seconds=20))
def get_db_object(secret_path="radar_serpro", environment: str = "dev"):
jar_path = get_secret(secret_path=secret_path, environment=environment)["jars"]

if not os.path.exists(jar_path):
raise Exception(f"Arquivo JAR '{jar_path}' não encontrado.")

return JDBC(db_params_secret_path=secret_path, environment=environment)


@task
def dump_serpro(jdbc: JDBC, batch_size: int) -> List[str]:

index = 0
data_folder = os.getenv("DATA_FOLDER", "data")
file_path = f"{os.getcwd()}/{data_folder}/raw/radar_serpro/tb_infracao_view/"
csv_files = []

os.makedirs(os.path.dirname(file_path), exist_ok=True)

query = """ SELECT
*
FROM
dbpro_radar_view_SMTR_VBL.tb_infracao_view
WHERE
SUBSTRING(auinf_dt_infracao, 1, 10) = '2024-10-28'
"""

jdbc.execute_query(query)

columns = jdbc.get_columns()

while True:
rows = jdbc.fetch_batch(batch_size)

if not rows:
break

output_file = file_path + f"dados_infracao_{index}.csv"

with open(output_file, "w", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(columns)
writer.writerows(rows)

log(output_file)
csv_files.append(output_file)
index += 1

jdbc.close_connection()

return csv_files


@task
def list_files():

try:
files = os.listdir("/app")
log(f"Files: {files}")
except Exception as e:
log(f"Erro: {e}")
9 changes: 9 additions & 0 deletions pipelines/utils/jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from pipelines.utils.secret import get_secret

# from prefeitura_rio.pipelines_utils.logging import log


class JDBC:
def __init__(self, db_params_secret_path: str, environment: str = "staging") -> None:
Expand Down Expand Up @@ -68,3 +70,10 @@ def fetch_all(self) -> List[List]:
Fetches all rows from the JDBC database.
"""
return [list(item) for item in self._cursor.fetchall()]

def close_connection(self):
"""
Closes the JDBC connection.
"""
if self._connection:
self._connection.close()
Loading