Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Staging/add sici api 2 #33

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pipelines/sici/dump_api/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
) as rj_iplanrio__sici__dump_api__flow:
dataset_id = Parameter("dataset_id")
table_id = Parameter("table_id")
endpoint = Parameter("endpoint")
endpoint_parameters = Parameter("endpoint_parameters")
billing_project_id = Parameter("billing_project_id", required=False)
bd_project_mode = Parameter("bd_project_mode", required=False, default="prod")
materialize_after_dump = Parameter("materialize_after_dump", default=False, required=False)
Expand All @@ -40,11 +42,12 @@
table_id=table_id,
)

get_credentials = get_sici_api_credentials()
get_credentials = get_sici_api_credentials(endpoint_parameters)
get_credentials.set_upstream(rename_flow_run)

path = get_data_from_api_soap_sici(
wsdl=constants.SICI_SOAP_API_WSDL.value,
endpoint=endpoint,
params=get_credentials,
)

Expand Down
63 changes: 46 additions & 17 deletions pipelines/sici/dump_api/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,52 @@
import pytz
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
from prefeitura_rio.pipelines_utils.io import untuple_clocks as untuple

from pipelines.constants import constants

sici_dump_api_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2024, 7, 17, 18, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_IPLANRIO_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "unidades_administrativas",
"table_id": "orgaos",
"billing_project_id": "rj-iplanrio",
"materialize_after_dump": True,
},
)
]
)

parameter_list = [
{
"dataset_id": "unidades_administrativas",
"table_id": "detalhes",
"endpoint": "Get_UG_Tipo_UG",
"endpoint_parameters": {
"Id_Base": "",
"Codigo_UG": "",
"Data_Inicio": "",
"Data_Fim": "",
"Tipo_UG": "",
},
"billing_project_id": "rj-iplanrio",
"materialize_after_dump": False,
},
{
"dataset_id": "unidades_administrativas",
"table_id": "orgaos",
"endpoint": "Get_Arvore_UA",
"endpoint_parameters": {
"Codigo_UA": "",
"Nivel": "",
"Tipo_Arvore": "",
},
"billing_project_id": "iplanrio",
"materialize_after_dump": True,
},
# Add more parameter dicts as needed
]

sici_dump_api_clocks = [
IntervalClock(
interval=timedelta(hours=12),
start_date=datetime(2024, 9, 17, 19, 0, tzinfo=pytz.timezone("America/Sao_Paulo"))
+ timedelta(minutes=2 * count),
labels=[
constants.RJ_IPLANRIO_AGENT_LABEL.value,
],
parameter_defaults=params,
)
for count, params in enumerate(parameter_list)
]

sici_dump_api_schedule = Schedule(clocks=untuple(sici_dump_api_clocks))
46 changes: 35 additions & 11 deletions pipelines/sici/dump_api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
@task
def get_data_from_api_soap_sici(
wsdl: str = "http://sici.rio.rj.gov.br/Servico/WebServiceSICI.asmx?wsdl",
endpoint: str = "Get_Arvore_UA",
params: dict = {
"Codigo_UA": "",
"Nivel": "",
Expand All @@ -24,8 +25,16 @@ def get_data_from_api_soap_sici(
# Create a client
client = Client(wsdl=wsdl)

# Call the service
response = client.service.Get_Arvore_UA(**params)
log(f"Calling the SICI API with the following parameters: {params}")
log(type(params))

if endpoint == "Get_Arvore_UA":
# Call the service
response = client.service.Get_Arvore_UA(**params)
elif endpoint == "Get_UG_Tipo_UG":
response = client.service.Get_UG_Tipo_UG(**params)
else:
raise ValueError(f"Invalid endpoint: {endpoint}")

# Transform to df
df = xml_to_dataframe(response)
Expand All @@ -34,18 +43,28 @@ def get_data_from_api_soap_sici(
log(f"Data sample: {df.head(5)}")

# Safe the dataframe to a CSV file
df.to_csv("sici_data.csv", index=False)

# Return the true path of the csv file
return "sici_data.csv"
if endpoint == "Get_Arvore_UA":
df.to_csv("sici_data.csv", index=False)
# Return the true path of the csv file
return "sici_data.csv"
if endpoint == "Get_UG_Tipo_UG":
df.to_csv("sici_data_ug.csv", index=False)
return "sici_data_ug.csv"
raise ValueError(f"Invalid endpoint: {endpoint}")

except Exception as e:
log.error(f"An unexpected error occurred: {e}")
raise


@task
def get_sici_api_credentials():
def get_sici_api_credentials(
endpoint_parameters: dict = {
"Codigo_UA": "",
"Nivel": "",
"Tipo_Arvore": "",
},
):
"""
Get the credentials for the SICI API.
"""
Expand All @@ -71,10 +90,15 @@ def get_sici_api_credentials():
)
raise

return {
"Codigo_UA": "",
"Nivel": "",
"Tipo_Arvore": "",
# Create an all_parameters dict with the consumidor and chave_acesso and the endpoint_parameters
all_parameters = {
"consumidor": consumidor["CONSUMIDOR"],
"chaveAcesso": chave_acesso["CHAVE_ACESSO"],
**endpoint_parameters,
}

log(f"Credentials for the SICI API were successfully retrieved: {all_parameters}")
log(f"Endpoint parameters: {endpoint_parameters}")
log(type(all_parameters))

return all_parameters
Loading