Skip to content

Commit

Permalink
Merge pull request #18 from prefeitura-rio/staging/ergon_hci
Browse files Browse the repository at this point in the history
Staging/ergon hci
  • Loading branch information
d116626 authored Oct 4, 2024
2 parents 59192a8 + 3527ff9 commit f883c7f
Show file tree
Hide file tree
Showing 7 changed files with 294 additions and 2 deletions.
1 change: 1 addition & 0 deletions pipelines/ergon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
##
from pipelines.ergon.dump_db_ergon.flows import * # noqa
from pipelines.ergon.dump_db_ergon_pericia_medica.flows import * # noqa
from pipelines.ergon.materializa_funcionarios_saude.flows import * # noqa
44 changes: 44 additions & 0 deletions pipelines/ergon/materializa_funcionarios_saude/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
"""
Materialize Active SMS Employees from Ergon.
"""

from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_templates.run_dbt_model.flows import (
templates__run_dbt_model__flow,
)
from prefeitura_rio.pipelines_utils.prefect import set_default_parameters
from prefeitura_rio.pipelines_utils.state_handlers import (
handler_initialize_sentry,
handler_inject_bd_credentials,
)

from pipelines.constants import constants
from pipelines.ergon.materializa_funcionarios_saude.schedules import (
smfp_funcionarios_saude_daily_update_schedule,
)

run_dbt_smfp_funcionarios_saude = deepcopy(templates__run_dbt_model__flow)
run_dbt_smfp_funcionarios_saude.name = "SMFP: Ergon Funcionarios Ativos SMS - Materializar tabelas"
run_dbt_smfp_funcionarios_saude.state_handlers = [
handler_inject_bd_credentials,
handler_initialize_sentry,
]
run_dbt_smfp_funcionarios_saude.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
run_dbt_smfp_funcionarios_saude.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value, labels=[constants.RJ_SMFP_AGENT_LABEL.value]
)

smfp_funcionarios_saude_default_parameters = {
"dataset_id": "recursos_humanos_ergon_saude",
"table_id": "funcionarios_ativos",
}
run_dbt_smfp_funcionarios_saude = set_default_parameters(
run_dbt_smfp_funcionarios_saude,
default_parameters=smfp_funcionarios_saude_default_parameters,
)

run_dbt_smfp_funcionarios_saude.schedule = smfp_funcionarios_saude_daily_update_schedule
30 changes: 30 additions & 0 deletions pipelines/ergon/materializa_funcionarios_saude/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
"""
Schedules for the database dump pipeline
"""

from datetime import datetime, timedelta

import pytz
from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock
from prefeitura_rio.pipelines_utils.io import untuple_clocks as untuple

from pipelines.constants import constants

smfp_funcionarios_saude_clocks = [
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2021, 11, 23, 1, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_SMFP_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "recursos_humanos_ergon_saude",
"table_id": "funcionarios_ativos",
},
)
]
smfp_funcionarios_saude_daily_update_schedule = Schedule(
clocks=untuple(smfp_funcionarios_saude_clocks)
)
2 changes: 1 addition & 1 deletion pipelines/templates/run_dbt_model/flows.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
MATERIALIZA MODELOS DO DBT......
MATERIALIZA MODELOS DO DBT..........
"""

from copy import deepcopy
Expand Down
5 changes: 4 additions & 1 deletion queries/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,7 @@ models:
+schema: recursos_humanos_ergon_pericia_medica
porte_empresa:
+materialized: table
+schema: porte_empresa
+schema: porte_empresa
recursos_humanos_ergon_saude:
+materialized: table
+schema: recursos_humanos_ergon_saude
136 changes: 136 additions & 0 deletions queries/models/recursos_humanos_ergon_saude/funcionarios_ativos.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@

{{
config(
materialized="table",
partition_by={
"field": "cpf_particao",
"data_type": "int64",
"range": {"start": 0, "end": 100000000000, "interval": 34722222},
},
)
}}


WITH funcionarios AS (
SELECT
f.id_vinculo AS id_funcionario,
LPAD(f.id_cpf, 11, '0') AS cpf, -- Adiciona zero à esquerda caso o CPF tenha menos de 11 dígitos
f.nome
FROM `rj-smfp.recursos_humanos_ergon.funcionario` f
-- WHERE LPAD(f.id_cpf, 11, '0') = ''
),

provimento AS (
SELECT
p.id_funcionario,
p.id_vinculo,
p.data_inicio AS provimento_inicio,
p.data_fim AS provimento_fim,
p.id_setor,
p.id_cargo,
p.empresa_vinculo AS id_empresa
FROM `rj-smfp.recursos_humanos_ergon.provimento` p
-- QUALIFY ROW_NUMBER() OVER (PARTITION BY id_funcionario ORDER BY id_vinculo DESC, data_inicio DESC) = 1
),

setor AS (
SELECT
id_setor,
data_inicio AS setor_inicio,
data_fim AS setor_fim,
id_setor_pai,
nome AS setor_nome,
sigla AS setor_sigla,
id_secretaria -- SAUDE 1800 (consultar SICI)
FROM `rj-smfp.recursos_humanos_ergon.setor`
-- WHERE data_fim IS NULL
QUALIFY ROW_NUMBER() OVER (PARTITION BY id_setor ORDER BY data_inicio DESC) = 1
),

cargo AS (
SELECT
id_cargo,
nome AS cargo_nome,
categoria AS cargo_categoria,
subcategoria AS cargo_subcategoria,
FROM `rj-smfp.recursos_humanos_ergon.cargo`
-- WHERE data_fim IS NULL
),

vacancia_vinculo AS (
SELECT
id_funcionario,
id_vinculo,
data_vacancia
FROM `rj-smfp.recursos_humanos_ergon.vinculo`
QUALIFY ROW_NUMBER() OVER (PARTITION BY id_funcionario ORDER BY id_vinculo DESC) = 1
),

empresa AS (
SELECT
id_empresa,
nome_empresa AS empresa_nome,
sigla AS empresa_sigla,
CNPJ AS empresa_cnpj
FROM `rj-smfp.recursos_humanos_ergon.empresas`
),


secretaria AS (
SELECT
id_unidade_administrativa AS id_secretaria,
sigla_unidade_administrativa AS secretaria_sigla,
nome_unidade_administrativa AS secretaria_nome
FROM `rj-iplanrio.unidades_administrativas.orgaos`
),

funcionarios_saude AS (
SELECT
f.cpf,
f.nome,
CASE
WHEN (p.provimento_fim IS NULL) AND (vv.data_vacancia IS NULL) THEN TRUE
ELSE FALSE
END AS status_ativo,
p.provimento_inicio,
p.provimento_fim,
vv.data_vacancia,
s.id_secretaria,
sec.secretaria_sigla,
sec.secretaria_nome,
p.id_empresa,
s.setor_nome,
s.setor_sigla,
s.setor_inicio,
s.setor_fim,
c.cargo_nome,
c.cargo_categoria,
c.cargo_subcategoria,
emp.empresa_nome,
emp.empresa_sigla,
emp.empresa_cnpj,
SAFE_CAST(f.cpf AS INT64) AS cpf_particao
FROM funcionarios f
LEFT JOIN provimento p
ON f.id_funcionario = p.id_funcionario
LEFT JOIN setor s
ON p.id_setor = s.id_setor
LEFT JOIN cargo c
ON p.id_cargo = c.id_cargo
LEFT JOIN vacancia_vinculo vv
ON f.id_funcionario = vv.id_funcionario
AND p.id_vinculo = vv.id_vinculo
LEFT JOIN empresa emp
ON p.id_empresa = emp.id_empresa
LEFT JOIN secretaria sec
ON s.id_secretaria = sec.id_secretaria

WHERE s.id_secretaria IN ('1800', '1700')
OR p.id_empresa IN ('32','80','81','82','83','84','85','86','87','88','89','97')
)

SELECT * FROM funcionarios_saude
WHERE status_ativo



Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
WITH all_cpf AS (
SELECT
DISTINCT
cpf
FROM (
SELECT
cpf,
'sms' AS origin
FROM `rj-sms.saude_dados_mestres.profissional_saude`
WHERE cpf IS NOT NULL
UNION ALL
SELECT
cpf,
'ergon' AS origin
FROM `rj-smfp.recursos_humanos_ergon_saude.funcionarios_ativos`
WHERE cpf IS NOT NULL
)
),

funcionarios AS (
SELECT
cpf.cpf,
ps.cpf AS cpf_sms,
ps.nome AS nome_sms,
fa.cpf AS cpf_ergon,
fa.nome AS nome_ergon,
fa.id_secretaria,
fa.secretaria_nome,
fa.secretaria_sigla,
fa.id_empresa,
fa.empresa_nome,
fa.empresa_sigla
FROM all_cpf cpf
LEFT JOIN `rj-sms.saude_dados_mestres.profissional_saude` ps
ON cpf.cpf = ps.cpf
LEFT JOIN `rj-smfp.recursos_humanos_ergon_saude.funcionarios_ativos` fa
ON cpf.cpf = fa.cpf
),

funcionarios_check AS (
SELECT
cpf,
cpf_sms,
cpf_ergon,
nome_sms,
nome_ergon,
id_secretaria,
secretaria_nome,
secretaria_sigla,
id_empresa,
empresa_nome,
empresa_sigla,
CASE
WHEN cpf_sms IS NOT NULL AND cpf_ergon IS NOT NULL THEN 'both'
WHEN cpf_sms IS NOT NULL THEN 'sms'
WHEN cpf_ergon IS NOT NULL THEN 'ergon'
ELSE 'none'
END AS check
FROM funcionarios
)


-- SELECT *
-- FROM funcionarios_check

SELECT
check,
id_secretaria,
secretaria_nome,
secretaria_sigla,
id_empresa,
empresa_nome,
empresa_sigla,
COUNT(*) AS count
FROM funcionarios_check
WHERE check = 'both'
GROUP BY 1,2,3,4,5,6, 7
ORDER BY 7

0 comments on commit f883c7f

Please sign in to comment.