diff --git a/pipelines/ergon/__init__.py b/pipelines/ergon/__init__.py index 4596fd1..4f6de39 100644 --- a/pipelines/ergon/__init__.py +++ b/pipelines/ergon/__init__.py @@ -2,3 +2,4 @@ ## from pipelines.ergon.dump_db_ergon.flows import * # noqa from pipelines.ergon.dump_db_ergon_pericia_medica.flows import * # noqa +from pipelines.ergon.materializa_funcionarios_saude.flows import * # noqa diff --git a/pipelines/ergon/materializa_funcionarios_saude/flows.py b/pipelines/ergon/materializa_funcionarios_saude/flows.py new file mode 100644 index 0000000..534db72 --- /dev/null +++ b/pipelines/ergon/materializa_funcionarios_saude/flows.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +""" +Materialize Active SMS Employees from Ergon. +""" + +from copy import deepcopy + +from prefect.run_configs import KubernetesRun +from prefect.storage import GCS +from prefeitura_rio.pipelines_templates.run_dbt_model.flows import ( + templates__run_dbt_model__flow, +) +from prefeitura_rio.pipelines_utils.prefect import set_default_parameters +from prefeitura_rio.pipelines_utils.state_handlers import ( + handler_initialize_sentry, + handler_inject_bd_credentials, +) + +from pipelines.constants import constants +from pipelines.ergon.materializa_funcionarios_saude.schedules import ( + smfp_funcionarios_saude_daily_update_schedule, +) + +run_dbt_smfp_funcionarios_saude = deepcopy(templates__run_dbt_model__flow) +run_dbt_smfp_funcionarios_saude.name = "SMFP: Ergon Funcionarios Ativos SMS - Materializar tabelas" +run_dbt_smfp_funcionarios_saude.state_handlers = [ + handler_inject_bd_credentials, + handler_initialize_sentry, +] +run_dbt_smfp_funcionarios_saude.storage = GCS(constants.GCS_FLOWS_BUCKET.value) +run_dbt_smfp_funcionarios_saude.run_config = KubernetesRun( + image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_SMFP_AGENT_LABEL.value] +) + +smfp_funcionarios_saude_default_parameters = { + "dataset_id": "recursos_humanos_ergon_saude", + "table_id": "funcionarios_ativos", +} +run_dbt_smfp_funcionarios_saude = set_default_parameters( + run_dbt_smfp_funcionarios_saude, + default_parameters=smfp_funcionarios_saude_default_parameters, +) + +run_dbt_smfp_funcionarios_saude.schedule = smfp_funcionarios_saude_daily_update_schedule diff --git a/pipelines/ergon/materializa_funcionarios_saude/schedules.py b/pipelines/ergon/materializa_funcionarios_saude/schedules.py new file mode 100644 index 0000000..be8a7fb --- /dev/null +++ b/pipelines/ergon/materializa_funcionarios_saude/schedules.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +""" +Schedules for the database dump pipeline +""" + +from datetime import datetime, timedelta + +import pytz +from prefect.schedules import Schedule +from prefect.schedules.clocks import IntervalClock +from prefeitura_rio.pipelines_utils.io import untuple_clocks as untuple + +from pipelines.constants import constants + +smfp_funcionarios_saude_clocks = [ + IntervalClock( + interval=timedelta(days=1), + start_date=datetime(2021, 11, 23, 1, 0, tzinfo=pytz.timezone("America/Sao_Paulo")), + labels=[ + constants.RJ_SMFP_AGENT_LABEL.value, + ], + parameter_defaults={ + "dataset_id": "recursos_humanos_ergon_saude", + "table_id": "funcionarios_ativos", + }, + ) +] +smfp_funcionarios_saude_daily_update_schedule = Schedule( + clocks=untuple(smfp_funcionarios_saude_clocks) +) diff --git a/pipelines/templates/run_dbt_model/flows.py b/pipelines/templates/run_dbt_model/flows.py index 80093ab..756d8df 100644 --- a/pipelines/templates/run_dbt_model/flows.py +++ b/pipelines/templates/run_dbt_model/flows.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- """ -MATERIALIZA MODELOS DO DBT...... +MATERIALIZA MODELOS DO DBT.......... """ from copy import deepcopy diff --git a/queries/dbt_project.yml b/queries/dbt_project.yml index 4117865..5078dc1 100644 --- a/queries/dbt_project.yml +++ b/queries/dbt_project.yml @@ -53,4 +53,7 @@ models: +schema: recursos_humanos_ergon_pericia_medica porte_empresa: +materialized: table - +schema: porte_empresa \ No newline at end of file + +schema: porte_empresa + recursos_humanos_ergon_saude: + +materialized: table + +schema: recursos_humanos_ergon_saude \ No newline at end of file diff --git a/queries/models/recursos_humanos_ergon_saude/funcionarios_ativos.sql b/queries/models/recursos_humanos_ergon_saude/funcionarios_ativos.sql new file mode 100644 index 0000000..852473d --- /dev/null +++ b/queries/models/recursos_humanos_ergon_saude/funcionarios_ativos.sql @@ -0,0 +1,136 @@ + +{{ + config( + materialized="table", + partition_by={ + "field": "cpf_particao", + "data_type": "int64", + "range": {"start": 0, "end": 100000000000, "interval": 34722222}, + }, + ) +}} + + +WITH funcionarios AS ( + SELECT + f.id_vinculo AS id_funcionario, + LPAD(f.id_cpf, 11, '0') AS cpf, -- Adiciona zero à esquerda caso o CPF tenha menos de 11 dígitos + f.nome + FROM `rj-smfp.recursos_humanos_ergon.funcionario` f + -- WHERE LPAD(f.id_cpf, 11, '0') = '' +), + +provimento AS ( + SELECT + p.id_funcionario, + p.id_vinculo, + p.data_inicio AS provimento_inicio, + p.data_fim AS provimento_fim, + p.id_setor, + p.id_cargo, + p.empresa_vinculo AS id_empresa + FROM `rj-smfp.recursos_humanos_ergon.provimento` p + -- QUALIFY ROW_NUMBER() OVER (PARTITION BY id_funcionario ORDER BY id_vinculo DESC, data_inicio DESC) = 1 +), + +setor AS ( + SELECT + id_setor, + data_inicio AS setor_inicio, + data_fim AS setor_fim, + id_setor_pai, + nome AS setor_nome, + sigla AS setor_sigla, + id_secretaria -- SAUDE 1800 (consultar SICI) + FROM `rj-smfp.recursos_humanos_ergon.setor` + -- WHERE data_fim IS NULL + QUALIFY ROW_NUMBER() OVER (PARTITION BY id_setor ORDER BY data_inicio DESC) = 1 +), + +cargo AS ( + SELECT + id_cargo, + nome AS cargo_nome, + categoria AS cargo_categoria, + subcategoria AS cargo_subcategoria, + FROM `rj-smfp.recursos_humanos_ergon.cargo` + -- WHERE data_fim IS NULL +), + +vacancia_vinculo AS ( + SELECT + id_funcionario, + id_vinculo, + data_vacancia + FROM `rj-smfp.recursos_humanos_ergon.vinculo` + QUALIFY ROW_NUMBER() OVER (PARTITION BY id_funcionario ORDER BY id_vinculo DESC) = 1 +), + +empresa AS ( + SELECT + id_empresa, + nome_empresa AS empresa_nome, + sigla AS empresa_sigla, + CNPJ AS empresa_cnpj + FROM `rj-smfp.recursos_humanos_ergon.empresas` +), + + +secretaria AS ( + SELECT + id_unidade_administrativa AS id_secretaria, + sigla_unidade_administrativa AS secretaria_sigla, + nome_unidade_administrativa AS secretaria_nome + FROM `rj-iplanrio.unidades_administrativas.orgaos` +), + +funcionarios_saude AS ( + SELECT + f.cpf, + f.nome, + CASE + WHEN (p.provimento_fim IS NULL) AND (vv.data_vacancia IS NULL) THEN TRUE + ELSE FALSE + END AS status_ativo, + p.provimento_inicio, + p.provimento_fim, + vv.data_vacancia, + s.id_secretaria, + sec.secretaria_sigla, + sec.secretaria_nome, + p.id_empresa, + s.setor_nome, + s.setor_sigla, + s.setor_inicio, + s.setor_fim, + c.cargo_nome, + c.cargo_categoria, + c.cargo_subcategoria, + emp.empresa_nome, + emp.empresa_sigla, + emp.empresa_cnpj, + SAFE_CAST(f.cpf AS INT64) AS cpf_particao + FROM funcionarios f + LEFT JOIN provimento p + ON f.id_funcionario = p.id_funcionario + LEFT JOIN setor s + ON p.id_setor = s.id_setor + LEFT JOIN cargo c + ON p.id_cargo = c.id_cargo + LEFT JOIN vacancia_vinculo vv + ON f.id_funcionario = vv.id_funcionario + AND p.id_vinculo = vv.id_vinculo + LEFT JOIN empresa emp + ON p.id_empresa = emp.id_empresa + LEFT JOIN secretaria sec + ON s.id_secretaria = sec.id_secretaria + + WHERE s.id_secretaria IN ('1800', '1700') + OR p.id_empresa IN ('32','80','81','82','83','84','85','86','87','88','89','97') +) + +SELECT * FROM funcionarios_saude +WHERE status_ativo + + + diff --git a/queries/models/recursos_humanos_ergon_saude/funcionarios_ergon_sms.sql b/queries/models/recursos_humanos_ergon_saude/funcionarios_ergon_sms.sql new file mode 100644 index 0000000..013ee03 --- /dev/null +++ b/queries/models/recursos_humanos_ergon_saude/funcionarios_ergon_sms.sql @@ -0,0 +1,78 @@ +WITH all_cpf AS ( + SELECT + DISTINCT + cpf + FROM ( + SELECT + cpf, + 'sms' AS origin + FROM `rj-sms.saude_dados_mestres.profissional_saude` + WHERE cpf IS NOT NULL + UNION ALL + SELECT + cpf, + 'ergon' AS origin + FROM `rj-smfp.recursos_humanos_ergon_saude.funcionarios_ativos` + WHERE cpf IS NOT NULL + ) +), + +funcionarios AS ( + SELECT + cpf.cpf, + ps.cpf AS cpf_sms, + ps.nome AS nome_sms, + fa.cpf AS cpf_ergon, + fa.nome AS nome_ergon, + fa.id_secretaria, + fa.secretaria_nome, + fa.secretaria_sigla, + fa.id_empresa, + fa.empresa_nome, + fa.empresa_sigla + FROM all_cpf cpf + LEFT JOIN `rj-sms.saude_dados_mestres.profissional_saude` ps + ON cpf.cpf = ps.cpf + LEFT JOIN `rj-smfp.recursos_humanos_ergon_saude.funcionarios_ativos` fa + ON cpf.cpf = fa.cpf +), + +funcionarios_check AS ( + SELECT + cpf, + cpf_sms, + cpf_ergon, + nome_sms, + nome_ergon, + id_secretaria, + secretaria_nome, + secretaria_sigla, + id_empresa, + empresa_nome, + empresa_sigla, + CASE + WHEN cpf_sms IS NOT NULL AND cpf_ergon IS NOT NULL THEN 'both' + WHEN cpf_sms IS NOT NULL THEN 'sms' + WHEN cpf_ergon IS NOT NULL THEN 'ergon' + ELSE 'none' + END AS check + FROM funcionarios +) + + +-- SELECT * +-- FROM funcionarios_check + +SELECT + check, + id_secretaria, + secretaria_nome, + secretaria_sigla, + id_empresa, + empresa_nome, + empresa_sigla, + COUNT(*) AS count +FROM funcionarios_check +WHERE check = 'both' +GROUP BY 1,2,3,4,5,6, 7 +ORDER BY 7 \ No newline at end of file