Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions src/impulsoetl/scnes/estabelecimentos_equipes/principal.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"""ETL das equipes dos estabelecimentos de saúde do CNES por município."""

import warnings
import pandas as pd

from datetime import date

warnings.filterwarnings("ignore")
Expand All @@ -15,6 +17,7 @@
from impulsoetl import __VERSION__
from impulsoetl.bd import Sessao
from impulsoetl.loggers import logger
from impulsoetl.sisab.excecoes import SisabExcecao
from impulsoetl.scnes.estabelecimentos_equipes.extracao import extrair_equipes
from impulsoetl.scnes.estabelecimentos_equipes.tratamento import (
tratamento_dados,
Expand All @@ -23,7 +26,7 @@
from impulsoetl.scnes.verificacao_etls_scnes import verificar_dados
from impulsoetl.utilitarios.bd import carregar_dataframe


"""
@flow(
name="Obter dados da Ficha de Equipes de Saúde por Estabelecimento",
description=(
Expand All @@ -34,7 +37,7 @@
retry_delay_seconds=None,
version=__VERSION__,
validate_parameters=False,
)
)"""
def obter_equipes_cnes(
sessao: Session,
tabela_destino: str,
Expand All @@ -53,24 +56,31 @@ def obter_equipes_cnes(
unidade_geografica_id: Código de identificação da unidade geográfica.
"""

lista_cnes = extrair_lista_cnes(codigo_municipio=codigo_municipio)
try:

lista_cnes = extrair_lista_cnes(codigo_municipio=codigo_municipio)

df_extraido = extrair_equipes(
codigo_municipio=codigo_municipio,
lista_cnes=lista_cnes,
periodo_data_inicio=periodo_data_inicio,
)
df_extraido = extrair_equipes(
codigo_municipio=codigo_municipio,
lista_cnes=lista_cnes,
periodo_data_inicio=periodo_data_inicio,
)

if df_extraido.empty:
logger.error("Data da competência do relatório não está disponível")
return 0

df_tratado = tratamento_dados(
df_extraido=df_extraido,
periodo_id=periodo_id,
unidade_geografica_id=unidade_geografica_id,
)
except:
df_tratado = tratamento_dados(
df_extraido=df_extraido,
periodo_id=periodo_id,
unidade_geografica_id=unidade_geografica_id,
)

verificar_dados(df_extraido=df_extraido, df_tratado=df_tratado)
verificar_dados(df_extraido=df_extraido, df_tratado=df_tratado)

carregar_dataframe(
sessao=sessao, df=df_tratado, tabela_destino=tabela_destino
)
carregar_dataframe(
sessao=sessao, df=df_tratado, tabela_destino=tabela_destino
)

return df_tratado
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def obter_informacoes_estabelecimentos_identificados(
df_extraido=df_extraido, df_tratado=df_tratado
)
carregar_dataframe(
sessao=sessao, df_tratado=df_tratado, tabela_destino=tabela_destino
sessao=sessao, df=df_tratado, tabela_destino=tabela_destino
)

return df_tratado
9 changes: 7 additions & 2 deletions src/impulsoetl/sisab/excecoes.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ class SisabErroCompetenciaInexistente:
def __init__(self,exception):
self.exception = exception

def insere_erro_database(self,sessao,traceback_str,operacao_id,periodo_id):
def insere_erro_database(self,sessao,traceback_str,operacao_id,periodo_id,unidade_geografica_id):

if unidade_geografica_id is None:
unidade_geografica_id = '28de805e-5bdc-49c3-863c-2cf87f95e371'
else:
unidade_geografica_id = unidade_geografica_id

tabela_destino = tabelas["configuracoes.capturas_erros_etl"]
requisicao_inserir_historico = insert(tabela_destino).values(operacao_id=operacao_id,periodo_id=periodo_id,unidade_geografica_id='28de805e-5bdc-49c3-863c-2cf87f95e371',erro_mensagem=self.exception,erro_traceback=str(traceback_str))
requisicao_inserir_historico = insert(tabela_destino).values(operacao_id=operacao_id,periodo_id=periodo_id,unidade_geografica_id=unidade_geografica_id,erro_mensagem=self.exception,erro_traceback=str(traceback_str))
conector = sessao.connection()
conector.execute(requisicao_inserir_historico)
sessao.commit()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
import warnings

warnings.filterwarnings("ignore")

import pandas as pd
from datetime import date

from impulsoetl.loggers import logger, habilitar_suporte_loguru
from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import extrair_producao_por_municipio
from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import transformar_producao_por_municipio
import pandas as pd

from impulsoetl.loggers import habilitar_suporte_loguru, logger
from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import (
extrair_producao_por_municipio,
transformar_producao_por_municipio,
)

def extrair_relatorio(
periodo_competencia: date)-> pd.DataFrame():

def extrair_relatorio(periodo_competencia: date) -> pd.DataFrame():

df_consolidado = pd.DataFrame()

try:
df_parcial = extrair_producao_por_municipio(
tipo_producao="Atendimento individual",
competencias=[periodo_competencia],
selecoes_adicionais={
"Conduta":"Selecionar Todos",
"Categoria do Profissional":"Selecionar Todos",
"Conduta": "Selecionar Todos",
"Categoria do Profissional": "Selecionar Todos",
"Tipo de Atendimento": "Selecionar Todos",
},
).pipe(transformar_producao_por_municipio)

).pipe(transformar_producao_por_municipio)

df_consolidado = df_consolidado.append(df_parcial)

except ValueError as e:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
import warnings
warnings.filterwarnings("ignore")

import pandas as pd
warnings.filterwarnings("ignore")
import time
import traceback
from datetime import date

import pandas as pd
from prefect import flow
from sqlalchemy.orm import Session
from prefect import flow

from impulsoetl import __VERSION__
from impulsoetl.bd import Sessao

from impulsoetl.loggers import logger, habilitar_suporte_loguru
from impulsoetl.sisab.relatorio_producao_profissional_conduta_tipo_atendimento.extracao import extrair_relatorio
from impulsoetl.sisab.relatorio_producao_profissional_conduta_tipo_atendimento.tratamento import tratamento_dados
from impulsoetl.loggers import habilitar_suporte_loguru, logger
from impulsoetl.sisab.excecoes import SisabExcecao
from impulsoetl.sisab.relatorio_producao_profissional_conduta_tipo_atendimento.extracao import (
extrair_relatorio,
)
from impulsoetl.sisab.relatorio_producao_profissional_conduta_tipo_atendimento.tratamento import (
tratamento_dados,
)
from impulsoetl.utilitarios.bd import carregar_dataframe


@flow(
name="Obter Relatório de Produção por Profissional, Contuta e Tipo de Atendimento (Painel AGP)",
description=(
"Extrai, transforma e carrega os dados de produção da Atenção Primária à Saúde "
+"por problema/condição avaliada, a partir do Sistema de Informação em Saúde da Atenção "
+ "por problema/condição avaliada, a partir do Sistema de Informação em Saúde da Atenção "
+ "Básica do SUS."
),
retries=0,
Expand All @@ -34,21 +40,67 @@ def relatorio_profissional_conduta_atendimento(
tabela_destino: str,
periodo_competencia: date,
periodo_id: str,
unidade_geografica_id: str
)-> None:
unidade_geografica_id: str,
) -> None:

logger.info("Extraindo relatório da competencia {}, ...".format(periodo_competencia))
tempo_inicio_etl = time.time()

df_extraido = extrair_relatorio(
periodo_competencia = periodo_competencia
)

df_tratado = tratamento_dados(
df_extraido=df_extraido,
periodo_id=periodo_id,
unidade_geografica_id=unidade_geografica_id,
operacao_id = "064540b9-78b9-766c-8130-cdc0f1ed5828"

logger.info(
"Extraindo relatório da competencia {}, ...".format(
periodo_competencia
)
)

carregar_dataframe(
sessao=sessao, df=df_tratado, tabela_destino=tabela_destino
try:

df_extraido = extrair_relatorio(
periodo_competencia=periodo_competencia
)

df_tratado = tratamento_dados(
df_extraido=df_extraido,
periodo_id=periodo_id,
unidade_geografica_id=unidade_geografica_id,
)

carregar_dataframe(
sessao=sessao, df=df_tratado, tabela_destino=tabela_destino
)

except (KeyError, pd.errors.ParserError):
traceback_str = traceback.format_exc()
enviar_erro = SisabExcecao("Competência indisponível no SISAB")
enviar_erro.insere_erro_database(
sessao=sessao,
traceback_str=traceback_str,
operacao_id=operacao_id,
periodo_id=periodo_id,
)

logger.error("Data da competência do relatório não está disponível")
return 0

except Exception as mensagem_erro:
traceback_str = traceback.format_exc()
enviar_erro = SisabExcecao(mensagem_erro)
enviar_erro.insere_erro_database(
sessao=sessao,
traceback_str=traceback_str,
operacao_id=operacao_id,
periodo_id=periodo_id,
)

logger.error(mensagem_erro)
return 0

tempo_final_etl = time.time() - tempo_inicio_etl

logger.info(
"Terminou ETL para "
+ "da comepetência`{periodo_competencia}` "
+ "em {tempo_final_etl}.",
periodo_competencia=periodo_competencia,
tempo_final_etl=tempo_final_etl,
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ def obter_lista_registros_inseridos(
acessar a base de dados da ImpulsoGov.
tabela_destino: Tabela que irá acondicionar os dados.
Retorna:
Lista de períodos que já constam na tabela destino
Lista de períodos que já constam na tabela destino
[`sqlalchemy.orm.session.Session`]: https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session
"""

tabela = tabelas[tabela_destino]
registros = sessao.query(tabela.c.periodo_id, tabela.c.unidade_geografica_id).distinct(
registros = sessao.query(
tabela.c.periodo_id, tabela.c.unidade_geografica_id
)
).distinct(tabela.c.periodo_id, tabela.c.unidade_geografica_id)

logger.info("Leitura dos períodos inseridos no banco Impulso OK!")
return registros
Expand Down Expand Up @@ -63,7 +63,10 @@ def carregar_dados(
limpar = (
delete(tabela_relatorio_producao)
.where(tabela_relatorio_producao.c.periodo_id == periodo_id)
.where(tabela_relatorio_producao.c.unidade_geografica_id == unidade_geografica_id)
.where(
tabela_relatorio_producao.c.unidade_geografica_id
== unidade_geografica_id
)
)
logger.debug(limpar)
sessao.execute(limpar)
Expand All @@ -80,4 +83,4 @@ def carregar_dados(
linhas_adicionadas=len(df_tratado),
)

return 0
return 0
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,20 @@

warnings.filterwarnings("ignore")

import pandas as pd

from datetime import date

import pandas as pd
from prefect import task

from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import extrair_producao_por_municipio
from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import transformar_producao_por_municipio
from impulsoetl.loggers import logger
from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import (
extrair_producao_por_municipio,
transformar_producao_por_municipio,
)


from impulsoetl.loggers import logger
"""
@task(
name= "Extrair Dados de Produção da APS por Tipo de Condição Avaliada e Desfecho",
name="Extrair Dados de Produção da APS por Tipo de Condição Avaliada e Desfecho",
description=(
"Extrai os dados do relatório de produção da Atenção Primária à Saúde,"
+ "por problema/condição avaliada e desfecho a partir do portal público"
Expand All @@ -29,34 +30,32 @@
tags=["aps", "sisab", "producao", "extracao"],
retries=2,
retry_delay_seconds=120,
)"""
def extrair_relatorio(
periodo_competencia: date)-> pd.DataFrame():
)
def extrair_relatorio(periodo_competencia: date) -> pd.DataFrame():
"""
Extrai relatório de produção por problema/condição avaliada e conduta a partir da página do SISAB

Argumentos:
periodo_data_inicio: Data da competência
periodo_data_inicio: Data da competência
Retorna:
Objeto [`pandas.DataFrame`] com os dados extraídos.
"""
df_consolidado = pd.DataFrame()

try:
df_parcial = extrair_producao_por_municipio(
tipo_producao="Atendimento Individual",
competencias=[periodo_competencia],
selecoes_adicionais={
"Problema/Condição Avaliada": "Selecionar Todos",
"Conduta":"Selecionar Todos",
"Problema/Condição Avaliada": "Selecionar Todos",
"Conduta": "Selecionar Todos",
},
).pipe(transformar_producao_por_municipio)
).pipe(transformar_producao_por_municipio)

df_consolidado = df_consolidado.append(df_parcial)

except Exception as e:
logger.error(e)
pass

return df_consolidado

Loading