From 30b10e32f05770a1f5ae9913c3270de3888f922e Mon Sep 17 00:00:00 2001 From: Bruno Almeida Date: Tue, 22 Oct 2024 11:27:38 -0300 Subject: [PATCH 1/9] add: new table with ua details --- pipelines/sici/dump_api/schedules.py | 26 ++++++++++++++++++++------ pipelines/sici/dump_api/tasks.py | 23 +++++++++++++++++------ 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/pipelines/sici/dump_api/schedules.py b/pipelines/sici/dump_api/schedules.py index 1d1c80a..170f61a 100644 --- a/pipelines/sici/dump_api/schedules.py +++ b/pipelines/sici/dump_api/schedules.py @@ -8,6 +8,24 @@ from pipelines.constants import constants + + +parameter_list = [ + { + "dataset_id": "unidades_administrativas", + "table_id": "detalhes", + "billing_project_id": "rj-iplanrio", + "materialize_after_dump": False, + }, + { + "dataset_id": "unidades_administrativas", + "table_id": "orgaos", + "billing_project_id": "iplanrio", + "materialize_after_dump": True, + }, + # Add more parameter dicts as needed +] + sici_dump_api_schedule = Schedule( clocks=[ IntervalClock( @@ -16,12 +34,8 @@ labels=[ constants.RJ_IPLANRIO_AGENT_LABEL.value, ], - parameter_defaults={ - "dataset_id": "unidades_administrativas", - "table_id": "orgaos", - "billing_project_id": "rj-iplanrio", - "materialize_after_dump": True, - }, + parameter_defaults=params, ) + for params in parameter_list ] ) diff --git a/pipelines/sici/dump_api/tasks.py b/pipelines/sici/dump_api/tasks.py index 12fedeb..aadc96c 100644 --- a/pipelines/sici/dump_api/tasks.py +++ b/pipelines/sici/dump_api/tasks.py @@ -9,6 +9,7 @@ @task def get_data_from_api_soap_sici( wsdl: str = "http://sici.rio.rj.gov.br/Servico/WebServiceSICI.asmx?wsdl", + endpoint: str = "Get_Arvore_UA", params: dict = { "Codigo_UA": "", "Nivel": "", @@ -24,8 +25,13 @@ def get_data_from_api_soap_sici( # Create a client client = Client(wsdl=wsdl) - # Call the service - response = client.service.Get_Arvore_UA(**params) + if endpoint == "Get_Arvore_UA": + # Call the service + response = client.service.Get_Arvore_UA(**params) + elif endpoint == "Get_UG_Tipo_UG": + response = client.service.Get_UG_Tipo_UG(**params) + else: + raise ValueError(f"Invalid endpoint: {endpoint}") # Transform to df df = xml_to_dataframe(response) @@ -34,10 +40,15 @@ def get_data_from_api_soap_sici( log(f"Data sample: {df.head(5)}") # Safe the dataframe to a CSV file - df.to_csv("sici_data.csv", index=False) - - # Return the true path of the csv file - return "sici_data.csv" + if endpoint == "Get_Arvore_UA": + df.to_csv("sici_data.csv", index=False) + # Return the true path of the csv file + return "sici_data.csv" + elif endpoint == "Get_UG_Tipo_UG": + df.to_csv("sici_data_ug.csv", index=False) + return "sici_data_ug.csv" + else: + raise ValueError(f"Invalid endpoint: {endpoint}") except Exception as e: log.error(f"An unexpected error occurred: {e}") From 6af185f21b23c1fa171add185d088e9d4d62ad20 Mon Sep 17 00:00:00 2001 From: Bruno Almeida Date: Tue, 22 Oct 2024 11:52:20 -0300 Subject: [PATCH 2/9] feat: endpoint and endpoint_parameters to flex the existing tasks --- pipelines/sici/dump_api/flows.py | 5 ++++- pipelines/sici/dump_api/schedules.py | 14 ++++++++++++++ pipelines/sici/dump_api/tasks.py | 23 +++++++++++++++-------- 3 files changed, 33 insertions(+), 9 deletions(-) diff --git a/pipelines/sici/dump_api/flows.py b/pipelines/sici/dump_api/flows.py index 533d274..571b48d 100644 --- a/pipelines/sici/dump_api/flows.py +++ b/pipelines/sici/dump_api/flows.py @@ -30,6 +30,8 @@ ) as rj_iplanrio__sici__dump_api__flow: dataset_id = Parameter("dataset_id") table_id = Parameter("table_id") + endpoint = Parameter("endpoint") + endpoint_parameters = Parameter("endpoint_parameters") billing_project_id = Parameter("billing_project_id", required=False) bd_project_mode = Parameter("bd_project_mode", required=False, default="prod") materialize_after_dump = Parameter("materialize_after_dump", default=False, required=False) @@ -40,11 +42,12 @@ table_id=table_id, ) - get_credentials = get_sici_api_credentials() + get_credentials = get_sici_api_credentials(endpoint_parameters) get_credentials.set_upstream(rename_flow_run) path = get_data_from_api_soap_sici( wsdl=constants.SICI_SOAP_API_WSDL.value, + endpoint=endpoint, params=get_credentials, ) diff --git a/pipelines/sici/dump_api/schedules.py b/pipelines/sici/dump_api/schedules.py index 170f61a..a1f8cdd 100644 --- a/pipelines/sici/dump_api/schedules.py +++ b/pipelines/sici/dump_api/schedules.py @@ -14,12 +14,26 @@ { "dataset_id": "unidades_administrativas", "table_id": "detalhes", + "endpoint": "Get_Arvore_UA", + "endpoint_parameters": { + "Id_Base": "", + "Codigo_UG": "", + "Data_Inicio": "", + "Data_Fim": "", + "Tipo_UG": "", + }, "billing_project_id": "rj-iplanrio", "materialize_after_dump": False, }, { "dataset_id": "unidades_administrativas", "table_id": "orgaos", + "endpoint": "Get_UG_Tipo_UG", + "endpoint_parameters": { + "Codigo_UA": "", + "Nivel": "", + "Tipo_Arvore": "", + }, "billing_project_id": "iplanrio", "materialize_after_dump": True, }, diff --git a/pipelines/sici/dump_api/tasks.py b/pipelines/sici/dump_api/tasks.py index aadc96c..d70dd80 100644 --- a/pipelines/sici/dump_api/tasks.py +++ b/pipelines/sici/dump_api/tasks.py @@ -56,7 +56,13 @@ def get_data_from_api_soap_sici( @task -def get_sici_api_credentials(): +def get_sici_api_credentials( + endpoint_parameters: dict = { + "Codigo_UA": "", + "Nivel": "", + "Tipo_Arvore": "", + }, +): """ Get the credentials for the SICI API. """ @@ -81,11 +87,12 @@ def get_sici_api_credentials(): f"An error occurred while fetching the SICI API credentials for chave_acesso: {e}", ) raise - - return { - "Codigo_UA": "", - "Nivel": "", - "Tipo_Arvore": "", - "consumidor": consumidor["CONSUMIDOR"], - "chaveAcesso": chave_acesso["CHAVE_ACESSO"], + + # Create an all_parameters dict with the consumidor and chave_acesso and the endpoint_parameters + all_parameters = { + "consumidor": consumidor, + "chaveAcesso": chave_acesso, + **endpoint_parameters, } + + return all_parameters From 00234ac29b7b450809e2b9c9ef9db9e8f803bd4b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 22 Oct 2024 14:53:00 +0000 Subject: [PATCH 3/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/sici/dump_api/schedules.py | 1 - pipelines/sici/dump_api/tasks.py | 7 +++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pipelines/sici/dump_api/schedules.py b/pipelines/sici/dump_api/schedules.py index a1f8cdd..a4e5d04 100644 --- a/pipelines/sici/dump_api/schedules.py +++ b/pipelines/sici/dump_api/schedules.py @@ -9,7 +9,6 @@ from pipelines.constants import constants - parameter_list = [ { "dataset_id": "unidades_administrativas", diff --git a/pipelines/sici/dump_api/tasks.py b/pipelines/sici/dump_api/tasks.py index d70dd80..3429921 100644 --- a/pipelines/sici/dump_api/tasks.py +++ b/pipelines/sici/dump_api/tasks.py @@ -44,11 +44,10 @@ def get_data_from_api_soap_sici( df.to_csv("sici_data.csv", index=False) # Return the true path of the csv file return "sici_data.csv" - elif endpoint == "Get_UG_Tipo_UG": + if endpoint == "Get_UG_Tipo_UG": df.to_csv("sici_data_ug.csv", index=False) return "sici_data_ug.csv" - else: - raise ValueError(f"Invalid endpoint: {endpoint}") + raise ValueError(f"Invalid endpoint: {endpoint}") except Exception as e: log.error(f"An unexpected error occurred: {e}") @@ -87,7 +86,7 @@ def get_sici_api_credentials( f"An error occurred while fetching the SICI API credentials for chave_acesso: {e}", ) raise - + # Create an all_parameters dict with the consumidor and chave_acesso and the endpoint_parameters all_parameters = { "consumidor": consumidor, From 5d76fdca223f3566f7bdef4a209c90d5c8673d4c Mon Sep 17 00:00:00 2001 From: Bruno Almeida Date: Tue, 22 Oct 2024 12:21:13 -0300 Subject: [PATCH 4/9] fix: typo --- pipelines/sici/dump_api/schedules.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/sici/dump_api/schedules.py b/pipelines/sici/dump_api/schedules.py index a1f8cdd..94244d4 100644 --- a/pipelines/sici/dump_api/schedules.py +++ b/pipelines/sici/dump_api/schedules.py @@ -14,7 +14,7 @@ { "dataset_id": "unidades_administrativas", "table_id": "detalhes", - "endpoint": "Get_Arvore_UA", + "endpoint": "Get_UG_Tipo_UG", "endpoint_parameters": { "Id_Base": "", "Codigo_UG": "", @@ -28,7 +28,7 @@ { "dataset_id": "unidades_administrativas", "table_id": "orgaos", - "endpoint": "Get_UG_Tipo_UG", + "endpoint": "Get_Arvore_UA", "endpoint_parameters": { "Codigo_UA": "", "Nivel": "", From 849578134ae66eb71b4c5fcc5019ccb4079cdaaa Mon Sep 17 00:00:00 2001 From: Bruno Almeida Date: Tue, 22 Oct 2024 12:29:31 -0300 Subject: [PATCH 5/9] fix: schedule --- pipelines/sici/dump_api/schedules.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pipelines/sici/dump_api/schedules.py b/pipelines/sici/dump_api/schedules.py index 7fa499c..e26ad0d 100644 --- a/pipelines/sici/dump_api/schedules.py +++ b/pipelines/sici/dump_api/schedules.py @@ -5,6 +5,7 @@ import pytz from prefect.schedules import Schedule from prefect.schedules.clocks import IntervalClock +from prefeitura_rio.pipelines_utils.io import untuple_clocks as untuple from pipelines.constants import constants @@ -39,16 +40,17 @@ # Add more parameter dicts as needed ] -sici_dump_api_schedule = Schedule( - clocks=[ +sici_dump_api_clocks = [ IntervalClock( - interval=timedelta(days=1), - start_date=datetime(2024, 7, 17, 18, tzinfo=pytz.timezone("America/Sao_Paulo")), + interval=timedelta(hours=12), + start_date=datetime(2024, 9, 17, 19, 0, tzinfo=pytz.timezone("America/Sao_Paulo")) + + timedelta(minutes= 2 * count), labels=[ constants.RJ_IPLANRIO_AGENT_LABEL.value, ], parameter_defaults=params, ) - for params in parameter_list - ] -) + for count, (_, params) in enumerate(parameter_list.items()) +] + +sici_dump_api_schedule = Schedule(clocks=untuple(sici_dump_api_clocks)) \ No newline at end of file From bb214fa824eb40575f6f2f881d496e38ae8d4b3c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 22 Oct 2024 15:29:42 +0000 Subject: [PATCH 6/9] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pipelines/sici/dump_api/schedules.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pipelines/sici/dump_api/schedules.py b/pipelines/sici/dump_api/schedules.py index e26ad0d..5a98a71 100644 --- a/pipelines/sici/dump_api/schedules.py +++ b/pipelines/sici/dump_api/schedules.py @@ -41,16 +41,16 @@ ] sici_dump_api_clocks = [ - IntervalClock( - interval=timedelta(hours=12), - start_date=datetime(2024, 9, 17, 19, 0, tzinfo=pytz.timezone("America/Sao_Paulo")) - + timedelta(minutes= 2 * count), - labels=[ - constants.RJ_IPLANRIO_AGENT_LABEL.value, - ], - parameter_defaults=params, - ) - for count, (_, params) in enumerate(parameter_list.items()) + IntervalClock( + interval=timedelta(hours=12), + start_date=datetime(2024, 9, 17, 19, 0, tzinfo=pytz.timezone("America/Sao_Paulo")) + + timedelta(minutes=2 * count), + labels=[ + constants.RJ_IPLANRIO_AGENT_LABEL.value, + ], + parameter_defaults=params, + ) + for count, (_, params) in enumerate(parameter_list.items()) ] -sici_dump_api_schedule = Schedule(clocks=untuple(sici_dump_api_clocks)) \ No newline at end of file +sici_dump_api_schedule = Schedule(clocks=untuple(sici_dump_api_clocks)) From 711cfcde55ae60197aeaf2ec8a10c7f5895e360d Mon Sep 17 00:00:00 2001 From: Bruno Almeida Date: Tue, 22 Oct 2024 14:33:04 -0300 Subject: [PATCH 7/9] fix:typo --- pipelines/sici/dump_api/schedules.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/sici/dump_api/schedules.py b/pipelines/sici/dump_api/schedules.py index 5a98a71..c3f5317 100644 --- a/pipelines/sici/dump_api/schedules.py +++ b/pipelines/sici/dump_api/schedules.py @@ -50,7 +50,7 @@ ], parameter_defaults=params, ) - for count, (_, params) in enumerate(parameter_list.items()) + for count, params in enumerate(parameter_list) ] sici_dump_api_schedule = Schedule(clocks=untuple(sici_dump_api_clocks)) From 99025dd160a4f161a25b007c3c868611481780de Mon Sep 17 00:00:00 2001 From: Bruno Almeida Date: Tue, 22 Oct 2024 14:55:13 -0300 Subject: [PATCH 8/9] add: log to find error --- pipelines/sici/dump_api/tasks.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pipelines/sici/dump_api/tasks.py b/pipelines/sici/dump_api/tasks.py index 3429921..996b9e5 100644 --- a/pipelines/sici/dump_api/tasks.py +++ b/pipelines/sici/dump_api/tasks.py @@ -25,6 +25,9 @@ def get_data_from_api_soap_sici( # Create a client client = Client(wsdl=wsdl) + log(f"Calling the SICI API with the following parameters: {params}") + log(type(params)) + if endpoint == "Get_Arvore_UA": # Call the service response = client.service.Get_Arvore_UA(**params) @@ -94,4 +97,8 @@ def get_sici_api_credentials( **endpoint_parameters, } + log(f"Credentials for the SICI API were successfully retrieved: {all_parameters}") + log(f"Endpoint parameters: {endpoint_parameters}") + log(type(all_parameters)) + return all_parameters From c4fad75fda791821130f86089f61a84e318b5b72 Mon Sep 17 00:00:00 2001 From: Bruno Almeida Date: Tue, 22 Oct 2024 15:06:34 -0300 Subject: [PATCH 9/9] fix: variable type --- pipelines/sici/dump_api/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/sici/dump_api/tasks.py b/pipelines/sici/dump_api/tasks.py index 996b9e5..90b2bc6 100644 --- a/pipelines/sici/dump_api/tasks.py +++ b/pipelines/sici/dump_api/tasks.py @@ -92,8 +92,8 @@ def get_sici_api_credentials( # Create an all_parameters dict with the consumidor and chave_acesso and the endpoint_parameters all_parameters = { - "consumidor": consumidor, - "chaveAcesso": chave_acesso, + "consumidor": consumidor["CONSUMIDOR"], + "chaveAcesso": chave_acesso["CHAVE_ACESSO"], **endpoint_parameters, }