diff --git a/src/impulsoetl/scnes/estabelecimentos_equipes/principal.py b/src/impulsoetl/scnes/estabelecimentos_equipes/principal.py index 0099ac7..1698149 100644 --- a/src/impulsoetl/scnes/estabelecimentos_equipes/principal.py +++ b/src/impulsoetl/scnes/estabelecimentos_equipes/principal.py @@ -6,6 +6,8 @@ """ETL das equipes dos estabelecimentos de saúde do CNES por município.""" import warnings +import pandas as pd + from datetime import date warnings.filterwarnings("ignore") @@ -15,6 +17,7 @@ from impulsoetl import __VERSION__ from impulsoetl.bd import Sessao from impulsoetl.loggers import logger +from impulsoetl.sisab.excecoes import SisabExcecao from impulsoetl.scnes.estabelecimentos_equipes.extracao import extrair_equipes from impulsoetl.scnes.estabelecimentos_equipes.tratamento import ( tratamento_dados, @@ -23,7 +26,7 @@ from impulsoetl.scnes.verificacao_etls_scnes import verificar_dados from impulsoetl.utilitarios.bd import carregar_dataframe - +""" @flow( name="Obter dados da Ficha de Equipes de Saúde por Estabelecimento", description=( @@ -34,7 +37,7 @@ retry_delay_seconds=None, version=__VERSION__, validate_parameters=False, -) +)""" def obter_equipes_cnes( sessao: Session, tabela_destino: str, @@ -53,24 +56,31 @@ def obter_equipes_cnes( unidade_geografica_id: Código de identificação da unidade geográfica. """ - lista_cnes = extrair_lista_cnes(codigo_municipio=codigo_municipio) + try: + + lista_cnes = extrair_lista_cnes(codigo_municipio=codigo_municipio) - df_extraido = extrair_equipes( - codigo_municipio=codigo_municipio, - lista_cnes=lista_cnes, - periodo_data_inicio=periodo_data_inicio, - ) + df_extraido = extrair_equipes( + codigo_municipio=codigo_municipio, + lista_cnes=lista_cnes, + periodo_data_inicio=periodo_data_inicio, + ) + + if df_extraido.empty: + logger.error("Data da competência do relatório não está disponível") + return 0 - df_tratado = tratamento_dados( - df_extraido=df_extraido, - periodo_id=periodo_id, - unidade_geografica_id=unidade_geografica_id, - ) + except: + df_tratado = tratamento_dados( + df_extraido=df_extraido, + periodo_id=periodo_id, + unidade_geografica_id=unidade_geografica_id, + ) - verificar_dados(df_extraido=df_extraido, df_tratado=df_tratado) + verificar_dados(df_extraido=df_extraido, df_tratado=df_tratado) - carregar_dataframe( - sessao=sessao, df=df_tratado, tabela_destino=tabela_destino - ) + carregar_dataframe( + sessao=sessao, df=df_tratado, tabela_destino=tabela_destino + ) return df_tratado diff --git a/src/impulsoetl/scnes/estabelecimentos_identificados/principal.py b/src/impulsoetl/scnes/estabelecimentos_identificados/principal.py index 5072dae..ffd9da1 100644 --- a/src/impulsoetl/scnes/estabelecimentos_identificados/principal.py +++ b/src/impulsoetl/scnes/estabelecimentos_identificados/principal.py @@ -75,7 +75,7 @@ def obter_informacoes_estabelecimentos_identificados( df_extraido=df_extraido, df_tratado=df_tratado ) carregar_dataframe( - sessao=sessao, df_tratado=df_tratado, tabela_destino=tabela_destino + sessao=sessao, df=df_tratado, tabela_destino=tabela_destino ) return df_tratado diff --git a/src/impulsoetl/sisab/excecoes.py b/src/impulsoetl/sisab/excecoes.py index 4a2441f..bea9d5d 100644 --- a/src/impulsoetl/sisab/excecoes.py +++ b/src/impulsoetl/sisab/excecoes.py @@ -30,10 +30,15 @@ class SisabErroCompetenciaInexistente: def __init__(self,exception): self.exception = exception - def insere_erro_database(self,sessao,traceback_str,operacao_id,periodo_id): + def insere_erro_database(self,sessao,traceback_str,operacao_id,periodo_id,unidade_geografica_id): + + if unidade_geografica_id is None: + unidade_geografica_id = '28de805e-5bdc-49c3-863c-2cf87f95e371' + else: + unidade_geografica_id = unidade_geografica_id tabela_destino = tabelas["configuracoes.capturas_erros_etl"] - requisicao_inserir_historico = insert(tabela_destino).values(operacao_id=operacao_id,periodo_id=periodo_id,unidade_geografica_id='28de805e-5bdc-49c3-863c-2cf87f95e371',erro_mensagem=self.exception,erro_traceback=str(traceback_str)) + requisicao_inserir_historico = insert(tabela_destino).values(operacao_id=operacao_id,periodo_id=periodo_id,unidade_geografica_id=unidade_geografica_id,erro_mensagem=self.exception,erro_traceback=str(traceback_str)) conector = sessao.connection() conector.execute(requisicao_inserir_historico) sessao.commit() diff --git a/src/impulsoetl/sisab/relatorio_producao_profissional_conduta_tipo_atendimento/extracao.py b/src/impulsoetl/sisab/relatorio_producao_profissional_conduta_tipo_atendimento/extracao.py index 6fac43c..3138098 100644 --- a/src/impulsoetl/sisab/relatorio_producao_profissional_conduta_tipo_atendimento/extracao.py +++ b/src/impulsoetl/sisab/relatorio_producao_profissional_conduta_tipo_atendimento/extracao.py @@ -1,31 +1,33 @@ import warnings + warnings.filterwarnings("ignore") -import pandas as pd from datetime import date -from impulsoetl.loggers import logger, habilitar_suporte_loguru -from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import extrair_producao_por_municipio -from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import transformar_producao_por_municipio +import pandas as pd +from impulsoetl.loggers import habilitar_suporte_loguru, logger +from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import ( + extrair_producao_por_municipio, + transformar_producao_por_municipio, +) -def extrair_relatorio( - periodo_competencia: date)-> pd.DataFrame(): + +def extrair_relatorio(periodo_competencia: date) -> pd.DataFrame(): df_consolidado = pd.DataFrame() - + try: df_parcial = extrair_producao_por_municipio( tipo_producao="Atendimento individual", competencias=[periodo_competencia], selecoes_adicionais={ - "Conduta":"Selecionar Todos", - "Categoria do Profissional":"Selecionar Todos", + "Conduta": "Selecionar Todos", + "Categoria do Profissional": "Selecionar Todos", "Tipo de Atendimento": "Selecionar Todos", }, + ).pipe(transformar_producao_por_municipio) - ).pipe(transformar_producao_por_municipio) - df_consolidado = df_consolidado.append(df_parcial) except ValueError as e: diff --git a/src/impulsoetl/sisab/relatorio_producao_profissional_conduta_tipo_atendimento/principal.py b/src/impulsoetl/sisab/relatorio_producao_profissional_conduta_tipo_atendimento/principal.py index 1c02f23..7b777f7 100644 --- a/src/impulsoetl/sisab/relatorio_producao_profissional_conduta_tipo_atendimento/principal.py +++ b/src/impulsoetl/sisab/relatorio_producao_profissional_conduta_tipo_atendimento/principal.py @@ -1,18 +1,24 @@ import warnings -warnings.filterwarnings("ignore") -import pandas as pd +warnings.filterwarnings("ignore") +import time +import traceback from datetime import date +import pandas as pd +from prefect import flow from sqlalchemy.orm import Session -from prefect import flow from impulsoetl import __VERSION__ from impulsoetl.bd import Sessao - -from impulsoetl.loggers import logger, habilitar_suporte_loguru -from impulsoetl.sisab.relatorio_producao_profissional_conduta_tipo_atendimento.extracao import extrair_relatorio -from impulsoetl.sisab.relatorio_producao_profissional_conduta_tipo_atendimento.tratamento import tratamento_dados +from impulsoetl.loggers import habilitar_suporte_loguru, logger +from impulsoetl.sisab.excecoes import SisabExcecao +from impulsoetl.sisab.relatorio_producao_profissional_conduta_tipo_atendimento.extracao import ( + extrair_relatorio, +) +from impulsoetl.sisab.relatorio_producao_profissional_conduta_tipo_atendimento.tratamento import ( + tratamento_dados, +) from impulsoetl.utilitarios.bd import carregar_dataframe @@ -20,7 +26,7 @@ name="Obter Relatório de Produção por Profissional, Contuta e Tipo de Atendimento (Painel AGP)", description=( "Extrai, transforma e carrega os dados de produção da Atenção Primária à Saúde " - +"por problema/condição avaliada, a partir do Sistema de Informação em Saúde da Atenção " + + "por problema/condição avaliada, a partir do Sistema de Informação em Saúde da Atenção " + "Básica do SUS." ), retries=0, @@ -34,21 +40,67 @@ def relatorio_profissional_conduta_atendimento( tabela_destino: str, periodo_competencia: date, periodo_id: str, - unidade_geografica_id: str -)-> None: + unidade_geografica_id: str, +) -> None: - logger.info("Extraindo relatório da competencia {}, ...".format(periodo_competencia)) + tempo_inicio_etl = time.time() - df_extraido = extrair_relatorio( - periodo_competencia = periodo_competencia - ) - - df_tratado = tratamento_dados( - df_extraido=df_extraido, - periodo_id=periodo_id, - unidade_geografica_id=unidade_geografica_id, + operacao_id = "064540b9-78b9-766c-8130-cdc0f1ed5828" + + logger.info( + "Extraindo relatório da competencia {}, ...".format( + periodo_competencia + ) ) - carregar_dataframe( - sessao=sessao, df=df_tratado, tabela_destino=tabela_destino + try: + + df_extraido = extrair_relatorio( + periodo_competencia=periodo_competencia + ) + + df_tratado = tratamento_dados( + df_extraido=df_extraido, + periodo_id=periodo_id, + unidade_geografica_id=unidade_geografica_id, + ) + + carregar_dataframe( + sessao=sessao, df=df_tratado, tabela_destino=tabela_destino + ) + + except (KeyError, pd.errors.ParserError): + traceback_str = traceback.format_exc() + enviar_erro = SisabExcecao("Competência indisponível no SISAB") + enviar_erro.insere_erro_database( + sessao=sessao, + traceback_str=traceback_str, + operacao_id=operacao_id, + periodo_id=periodo_id, + ) + + logger.error("Data da competência do relatório não está disponível") + return 0 + + except Exception as mensagem_erro: + traceback_str = traceback.format_exc() + enviar_erro = SisabExcecao(mensagem_erro) + enviar_erro.insere_erro_database( + sessao=sessao, + traceback_str=traceback_str, + operacao_id=operacao_id, + periodo_id=periodo_id, + ) + + logger.error(mensagem_erro) + return 0 + + tempo_final_etl = time.time() - tempo_inicio_etl + + logger.info( + "Terminou ETL para " + + "da comepetência`{periodo_competencia}` " + + "em {tempo_final_etl}.", + periodo_competencia=periodo_competencia, + tempo_final_etl=tempo_final_etl, ) diff --git a/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/carregamento.py b/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/carregamento.py index 2f3d7df..5d4b77e 100644 --- a/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/carregamento.py +++ b/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/carregamento.py @@ -18,14 +18,14 @@ def obter_lista_registros_inseridos( acessar a base de dados da ImpulsoGov. tabela_destino: Tabela que irá acondicionar os dados. Retorna: - Lista de períodos que já constam na tabela destino + Lista de períodos que já constam na tabela destino [`sqlalchemy.orm.session.Session`]: https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session """ tabela = tabelas[tabela_destino] - registros = sessao.query(tabela.c.periodo_id, tabela.c.unidade_geografica_id).distinct( + registros = sessao.query( tabela.c.periodo_id, tabela.c.unidade_geografica_id - ) + ).distinct(tabela.c.periodo_id, tabela.c.unidade_geografica_id) logger.info("Leitura dos períodos inseridos no banco Impulso OK!") return registros @@ -63,7 +63,10 @@ def carregar_dados( limpar = ( delete(tabela_relatorio_producao) .where(tabela_relatorio_producao.c.periodo_id == periodo_id) - .where(tabela_relatorio_producao.c.unidade_geografica_id == unidade_geografica_id) + .where( + tabela_relatorio_producao.c.unidade_geografica_id + == unidade_geografica_id + ) ) logger.debug(limpar) sessao.execute(limpar) @@ -80,4 +83,4 @@ def carregar_dados( linhas_adicionadas=len(df_tratado), ) - return 0 \ No newline at end of file + return 0 diff --git a/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/extracao.py b/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/extracao.py index bb677df..5338952 100644 --- a/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/extracao.py +++ b/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/extracao.py @@ -8,19 +8,20 @@ warnings.filterwarnings("ignore") -import pandas as pd - from datetime import date + +import pandas as pd from prefect import task -from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import extrair_producao_por_municipio -from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import transformar_producao_por_municipio +from impulsoetl.loggers import logger +from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import ( + extrair_producao_por_municipio, + transformar_producao_por_municipio, +) -from impulsoetl.loggers import logger -""" @task( - name= "Extrair Dados de Produção da APS por Tipo de Condição Avaliada e Desfecho", + name="Extrair Dados de Produção da APS por Tipo de Condição Avaliada e Desfecho", description=( "Extrai os dados do relatório de produção da Atenção Primária à Saúde," + "por problema/condição avaliada e desfecho a partir do portal público" @@ -29,29 +30,28 @@ tags=["aps", "sisab", "producao", "extracao"], retries=2, retry_delay_seconds=120, -)""" -def extrair_relatorio( - periodo_competencia: date)-> pd.DataFrame(): +) +def extrair_relatorio(periodo_competencia: date) -> pd.DataFrame(): """ Extrai relatório de produção por problema/condição avaliada e conduta a partir da página do SISAB Argumentos: - periodo_data_inicio: Data da competência + periodo_data_inicio: Data da competência Retorna: Objeto [`pandas.DataFrame`] com os dados extraídos. """ df_consolidado = pd.DataFrame() - + try: df_parcial = extrair_producao_por_municipio( tipo_producao="Atendimento Individual", competencias=[periodo_competencia], selecoes_adicionais={ - "Problema/Condição Avaliada": "Selecionar Todos", - "Conduta":"Selecionar Todos", + "Problema/Condição Avaliada": "Selecionar Todos", + "Conduta": "Selecionar Todos", }, - ).pipe(transformar_producao_por_municipio) - + ).pipe(transformar_producao_por_municipio) + df_consolidado = df_consolidado.append(df_parcial) except Exception as e: @@ -59,4 +59,3 @@ def extrair_relatorio( pass return df_consolidado - diff --git a/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/principal.py b/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/principal.py index 319d73e..f8f9a0c 100644 --- a/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/principal.py +++ b/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/principal.py @@ -7,23 +7,33 @@ import warnings warnings.filterwarnings("ignore") +import time +import traceback from datetime import date +import pandas as pd from prefect import flow from sqlalchemy.orm import Session from impulsoetl import __VERSION__ - -from impulsoetl.sisab.relatorio_producao_resolutividade_por_condicao.extracao import extrair_relatorio -from impulsoetl.sisab.relatorio_producao_resolutividade_por_condicao.tratamento import tratamento_dados -from impulsoetl.sisab.relatorio_producao_resolutividade_por_condicao.carregamento import carregar_dados +from impulsoetl.loggers import habilitar_suporte_loguru, logger +from impulsoetl.sisab.excecoes import SisabExcecao +from impulsoetl.sisab.relatorio_producao_resolutividade_por_condicao.carregamento import ( + carregar_dados, +) +from impulsoetl.sisab.relatorio_producao_resolutividade_por_condicao.extracao import ( + extrair_relatorio, +) +from impulsoetl.sisab.relatorio_producao_resolutividade_por_condicao.tratamento import ( + tratamento_dados, +) @flow( name="Obter Relatório de Resolutividade da APS por Condição Avaliada", description=( "Extrai, transforma e carrega os dados de produção da Atenção Primária à Saúde " - +"por problema/condição avaliada, a partir do Sistema de Informação em Saúde da Atenção " + + "por problema/condição avaliada, a partir do Sistema de Informação em Saúde da Atenção " + "Básica do SUS." ), retries=0, @@ -42,23 +52,65 @@ def obter_relatorio_resolutividade_por_condicao( teste: bool, ) -> None: - df_extraido = extrair_relatorio( - periodo_competencia=periodo_competencia, - ) + tempo_inicio_etl = time.time() - df_tratado = tratamento_dados( - df_extraido=df_extraido, - periodo_id=periodo_id, - municipio_id_sus = unidade_geografica_id_sus, - unidade_geografica_id=unidade_geografica_id - ) + operacao_id = "0644acff-4642-75e1-b559-6193f928cb16" + + try: + + df_extraido = extrair_relatorio( + periodo_competencia=periodo_competencia, + ) - carregar_dados( - sessao=sessao, - df_tratado=df_tratado, - tabela_destino=tabela_destino, - periodo_id = periodo_id, - unidade_geografica_id = unidade_geografica_id + df_tratado = tratamento_dados( + df_extraido=df_extraido, + periodo_id=periodo_id, + municipio_id_sus=unidade_geografica_id_sus, + unidade_geografica_id=unidade_geografica_id, + ) + + carregar_dados( + sessao=sessao, + df_tratado=df_tratado, + tabela_destino=tabela_destino, + periodo_id=periodo_id, + unidade_geografica_id=unidade_geografica_id, + ) + + except (KeyError, pd.errors.ParserError): + traceback_str = traceback.format_exc() + enviar_erro = SisabExcecao("Competência indisponível no SISAB") + enviar_erro.insere_erro_database( + sessao=sessao, + traceback_str=traceback_str, + operacao_id=operacao_id, + periodo_id=periodo_id, + ) + + logger.error("Data da competência do relatório não está disponível") + return 0 + + except Exception as mensagem_erro: + traceback_str = traceback.format_exc() + enviar_erro = SisabExcecao(mensagem_erro) + enviar_erro.insere_erro_database( + sessao=sessao, + traceback_str=traceback_str, + operacao_id=operacao_id, + periodo_id=periodo_id, + ) + + logger.error(mensagem_erro) + return 0 + + tempo_final_etl = time.time() - tempo_inicio_etl + + logger.info( + "Terminou ETL para " + + "da comepetência`{periodo_competencia}` " + + "em {tempo_final_etl}.", + periodo_competencia=periodo_competencia, + tempo_final_etl=tempo_final_etl, ) - return df_tratado \ No newline at end of file + return df_tratado diff --git a/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/tratamento.py b/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/tratamento.py index 076251e..aab24ad 100644 --- a/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/tratamento.py +++ b/src/impulsoetl/sisab/relatorio_producao_resolutividade_por_condicao/tratamento.py @@ -8,41 +8,41 @@ warnings.filterwarnings("ignore") -import pandas as pd -import numpy as np - from datetime import date from typing import Final + +import numpy as np +import pandas as pd from frozendict import frozendict from prefect import task -from impulsoetl.loggers import logger, habilitar_suporte_loguru - +from impulsoetl.loggers import habilitar_suporte_loguru, logger COLUNAS_RENOMEAR: Final[dict[str, str]] = { - "Conduta":"conduta", - "Problema/Condição Avaliada":"problema_condicao_avaliada", - "quantidade_aprovada":"quantidade_registrada" + "Conduta": "conduta", + "Problema/Condição Avaliada": "problema_condicao_avaliada", + "quantidade_aprovada": "quantidade_registrada", } COLUNAS_EXCLUIR = [ - 'uf_sigla', - 'municipio_id_sus', - 'municipio_nome', - 'periodo_data_inicio', + "uf_sigla", + "municipio_id_sus", + "municipio_nome", + "periodo_data_inicio", ] COLUNAS_TIPOS: Final[frozendict] = frozendict( { - "conduta":"str", - "problema_condicao_avaliada":"str", - "quantidade_registrada":"int", - "periodo_id":"str", - "unidade_geografica_id":"str", - "tipo_producao":"str" + "conduta": "str", + "problema_condicao_avaliada": "str", + "quantidade_registrada": "int", + "periodo_id": "str", + "unidade_geografica_id": "str", + "tipo_producao": "str", } ) + def renomear_colunas(df_extraido: pd.DataFrame) -> pd.DataFrame: df_extraido.rename(columns=COLUNAS_RENOMEAR, inplace=True) return df_extraido @@ -52,15 +52,16 @@ def excluir_colunas(df_extraido: pd.DataFrame) -> pd.DataFrame: df_extraido.drop(columns=COLUNAS_EXCLUIR, inplace=True) return df_extraido + def tratar_tipos(df_extraido: pd.DataFrame) -> pd.DataFrame: df_extraido = df_extraido.astype(COLUNAS_TIPOS, errors="ignore").where( df_extraido.notna(), None ) return df_extraido -""" + @task( - name= "Trata os Dados de Produção da APS extraídos por problema/condição avaliada e conduta", + name="Trata os Dados de Produção da APS extraídos por problema/condição avaliada e conduta", description=( "Realiza o tratamento dos dados do relatório de produção da Atenção Primária à Saúde," + "obtidos partir do portal público do Sistema de Informação em Saúde para a Atenção Básica do SUS." @@ -68,23 +69,28 @@ def tratar_tipos(df_extraido: pd.DataFrame) -> pd.DataFrame: tags=["aps", "sisab", "producao", "tratamento"], retries=2, retry_delay_seconds=120, -)""" +) def tratamento_dados( - df_extraido: pd.DataFrame, municipio_id_sus:str, periodo_id: str, unidade_geografica_id: str + df_extraido: pd.DataFrame, + municipio_id_sus: str, + periodo_id: str, + unidade_geografica_id: str, ) -> pd.DataFrame: - + habilitar_suporte_loguru() logger.info("Iniciando o tratamento dos dados...") - df_extraido = df_extraido.loc[df_extraido['municipio_id_sus']==municipio_id_sus].reset_index(drop=True) + df_extraido = df_extraido.loc[ + df_extraido["municipio_id_sus"] == municipio_id_sus + ].reset_index(drop=True) df_extraido = excluir_colunas(df_extraido) df_extraido = renomear_colunas(df_extraido) - df_extraido['periodo_id'] = periodo_id - df_extraido['unidade_geografica_id'] = unidade_geografica_id - df_extraido['tipo_producao']='Atendimento Individual' - tratar_tipos(df_extraido) - print(df_extraido) + df_extraido["periodo_id"] = periodo_id + df_extraido["unidade_geografica_id"] = unidade_geografica_id + df_extraido["tipo_producao"] = "Atendimento Individual" + tratar_tipos(df_extraido) + print(df_extraido) logger.info("Dados tratados com sucesso ...") - return df_extraido \ No newline at end of file + return df_extraido diff --git a/src/impulsoetl/sisab/relatorio_saude_producao/extracao.py b/src/impulsoetl/sisab/relatorio_saude_producao/extracao.py index 6d8a644..9527e0a 100644 --- a/src/impulsoetl/sisab/relatorio_saude_producao/extracao.py +++ b/src/impulsoetl/sisab/relatorio_saude_producao/extracao.py @@ -9,41 +9,48 @@ from prefect import task from impulsoetl.loggers import logger, habilitar_suporte_loguru -from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import extrair_producao_por_municipio -from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import transformar_producao_por_municipio +from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import ( + extrair_producao_por_municipio, +) +from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import ( + transformar_producao_por_municipio, +) CATEGORIA_PROFISSIONAL_REDUZIDA = [ - 'Cirurgião dentista', - 'Enfermeiro', - 'Fisioterapeuta', - 'Médico', - 'Psicólogo', - 'Técnico e auxiliar de enfermagem', - 'Técnico e auxiliar de saúde bucal', - ] - -def obter_relatorio_reduzido( - periodo_competencia: date)-> pd.DataFrame(): + "Cirurgião dentista", + "Enfermeiro", + "Fisioterapeuta", + "Médico", + "Psicólogo", + "Técnico e auxiliar de enfermagem", + "Técnico e auxiliar de saúde bucal", +] + + +def obter_relatorio_reduzido(periodo_competencia: date) -> pd.DataFrame(): df_consolidado = pd.DataFrame() - + + logger.info("Iniciando extraçção do relatório...") + try: df_parcial = extrair_producao_por_municipio( tipo_producao="Atendimento individual", competencias=[periodo_competencia], selecoes_adicionais={ - "Problema/Condição Avaliada": "Selecionar Todos", - "Conduta":"Selecionar Todos", - "Categoria do Profissional":CATEGORIA_PROFISSIONAL_REDUZIDA, + "Problema/Condição Avaliada": "Selecionar Todos", + "Conduta": "Selecionar Todos", + "Categoria do Profissional": CATEGORIA_PROFISSIONAL_REDUZIDA, }, + ).pipe(transformar_producao_por_municipio) - ).pipe(transformar_producao_por_municipio) - print(df_parcial) df_consolidado = df_consolidado.append(df_parcial) + logger.info("Extração concluída") + except Exception as e: logger.error(e) pass @@ -60,14 +67,11 @@ def obter_relatorio_reduzido( retries=2, retry_delay_seconds=120, ) -def extrair_relatorio( - periodo_competencia: date)-> pd.DataFrame(): + +def extrair_relatorio(periodo_competencia: date) -> pd.DataFrame(): habilitar_suporte_loguru() - logger.info("Iniciando extraçção do relatório...") df_extraido = obter_relatorio_reduzido(periodo_competencia) - - logger.info("Extração concluída") return df_extraido diff --git a/src/impulsoetl/sisab/relatorio_saude_producao/extracao_outros.py b/src/impulsoetl/sisab/relatorio_saude_producao/extracao_outros.py index 6d1960a..8699f09 100644 --- a/src/impulsoetl/sisab/relatorio_saude_producao/extracao_outros.py +++ b/src/impulsoetl/sisab/relatorio_saude_producao/extracao_outros.py @@ -3,39 +3,41 @@ warnings.filterwarnings("ignore") from datetime import date -import pandas as pd import numpy as np - +import pandas as pd from prefect import task -from impulsoetl.loggers import logger, habilitar_suporte_loguru -from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import extrair_producao_por_municipio -from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import transformar_producao_por_municipio - +from impulsoetl.loggers import habilitar_suporte_loguru, logger +from impulsoetl.sisab.utilitarios_sisab_relatorio_producao import ( + extrair_producao_por_municipio, + transformar_producao_por_municipio, +) CATEGORIA_PROFISSIONAL_OUTROS = [ - 'Agente de combate a endemias', - 'Agente de saúde', - 'Assistente Social', - 'Educador social', - 'Farmacêutico', - 'Fonoaudiólogo', - 'Médico veterinário', - 'Nutricionista', - 'Outros prof. de nível médio', - 'Outros prof. de nível superior', - 'Profissional de educação física', - 'Sanitarista', - 'Terapeuta ocupacional', - 'Naturólogo', - 'Musicoterapeuta', - 'Arteterapeuta', - 'Terapeuto Holístico', - 'Recepcionista' + "Agente de combate a endemias", + "Agente de saúde", + "Assistente Social", + "Educador social", + "Farmacêutico", + "Fonoaudiólogo", + "Médico veterinário", + "Nutricionista", + "Outros prof. de nível médio", + "Outros prof. de nível superior", + "Profissional de educação física", + "Sanitarista", + "Terapeuta ocupacional", + "Naturólogo", + "Musicoterapeuta", + "Arteterapeuta", + "Terapeuto Holístico", + "Recepcionista", ] -def obter_relatorio_outros( - periodo_competencia: date)-> pd.DataFrame(): + +def obter_relatorio_outros(periodo_competencia: date) -> pd.DataFrame(): + + logger.info("Iniciando extraçção do relatório...") df_consolidado = pd.DataFrame() @@ -44,13 +46,12 @@ def obter_relatorio_outros( tipo_producao="Atendimento individual", competencias=[periodo_competencia], selecoes_adicionais={ - "Problema/Condição Avaliada": "Selecionar Todos", - "Conduta":"Selecionar Todos", - "Categoria do Profissional":CATEGORIA_PROFISSIONAL_OUTROS, + "Problema/Condição Avaliada": "Selecionar Todos", + "Conduta": "Selecionar Todos", + "Categoria do Profissional": CATEGORIA_PROFISSIONAL_OUTROS, }, + ).pipe(transformar_producao_por_municipio) - ).pipe(transformar_producao_por_municipio) - print(df_parcial) df_consolidado = df_consolidado.append(df_parcial) @@ -58,10 +59,13 @@ def obter_relatorio_outros( except Exception as e: logger.error(e) pass - + + logger.info("Extração concluída") + return df_consolidado +""" @task( name="Extrair Relatório de Produção de Saúde - Profissionais Outros ", description=( @@ -70,15 +74,13 @@ def obter_relatorio_outros( tags=["sisab", "produção", "extracao"], retries=2, retry_delay_seconds=120, -) -def extrair_relatorio_outros( - periodo_competencia: date)-> pd.DataFrame(): +)""" + + +def extrair_relatorio_outros(periodo_competencia: date) -> pd.DataFrame(): habilitar_suporte_loguru() - logger.info("Iniciando extraçção do relatório...") df_extraido = obter_relatorio_outros(periodo_competencia) - - logger.info("Extração concluída") return df_extraido diff --git a/src/impulsoetl/sisab/relatorio_saude_producao/principal.py b/src/impulsoetl/sisab/relatorio_saude_producao/principal.py index 354045a..134d3f6 100644 --- a/src/impulsoetl/sisab/relatorio_saude_producao/principal.py +++ b/src/impulsoetl/sisab/relatorio_saude_producao/principal.py @@ -1,8 +1,9 @@ import warnings - warnings.filterwarnings("ignore") import pandas as pd +import time +import traceback from datetime import date from sqlalchemy.orm import Session @@ -10,6 +11,7 @@ from impulsoetl import __VERSION__ from impulsoetl.bd import Sessao +from impulsoetl.sisab.excecoes import SisabExcecao from impulsoetl.sisab.relatorio_saude_producao.extracao import extrair_relatorio from impulsoetl.sisab.relatorio_saude_producao.tratamento import tratamento_dados from impulsoetl.sisab.relatorio_saude_producao.verificacao import verificar_informacoes_relatorio_producao @@ -35,7 +37,6 @@ def obter_relatorio_producao_por_profissionais_reduzidos( periodo_id: str, unidade_geografica_id: str, ) -> None: - """ Extrai, transforma e carrega os dados do Relatório de Produção do SISAB Argumentos: @@ -45,24 +46,53 @@ def obter_relatorio_producao_por_profissionais_reduzidos( periodo_id: Código de identificação do período. unidade_geografica_id: Código de identificação da unidade geográfica. """ + tempo_inicio_etl = time.time() + + operacao_id = '063e2878-3247-78a7-83dd-1d291156cdf6' logger.info("Extraindo relatório da competencia {}, ...".format(periodo_competencia)) - df_extraido = extrair_relatorio( - periodo_competencia = periodo_competencia - ) + try: + + df_extraido = extrair_relatorio( + periodo_competencia = periodo_competencia + ) + + df_tratado = tratamento_dados( + df_extraido=df_extraido, + periodo_id=periodo_id, + unidade_geografica_id=unidade_geografica_id, + ) + + verificar_informacoes_relatorio_producao(df_tratado) + + carregar_dataframe( + sessao=sessao, df=df_tratado, tabela_destino=tabela_destino + ) + except (KeyError,pd.errors.ParserError): + traceback_str = traceback.format_exc() + enviar_erro = SisabExcecao("Competência indisponível no SISAB") + enviar_erro.insere_erro_database(sessao=sessao,traceback_str=traceback_str,operacao_id=operacao_id,periodo_id=periodo_id) + + logger.error("Data da competência do relatório não está disponível") + return 0 - df_tratado = tratamento_dados( - df_extraido=df_extraido, - periodo_id=periodo_id, - unidade_geografica_id=unidade_geografica_id, - ) + except Exception as mensagem_erro: + traceback_str = traceback.format_exc() + enviar_erro = SisabExcecao(mensagem_erro) + enviar_erro.insere_erro_database(sessao=sessao,traceback_str=traceback_str,operacao_id=operacao_id,periodo_id=periodo_id) + + logger.error(mensagem_erro) + return 0 - verificar_informacoes_relatorio_producao(df_tratado) + tempo_final_etl = time.time() - tempo_inicio_etl - carregar_dataframe( - sessao=sessao, df=df_tratado, tabela_destino=tabela_destino + logger.info( + "Terminou ETL para " + + "da comepetência`{periodo_competencia}` " + + "em {tempo_final_etl}.", + periodo_competencia=periodo_competencia, + tempo_final_etl=tempo_final_etl, ) - print('Terminou carga') diff --git a/src/impulsoetl/sisab/relatorio_saude_producao/principal_outros.py b/src/impulsoetl/sisab/relatorio_saude_producao/principal_outros.py index 5e6eea2..d1e44b2 100644 --- a/src/impulsoetl/sisab/relatorio_saude_producao/principal_outros.py +++ b/src/impulsoetl/sisab/relatorio_saude_producao/principal_outros.py @@ -2,20 +2,28 @@ warnings.filterwarnings("ignore") -import pandas as pd - +import time +import traceback from datetime import date + +import pandas as pd +from prefect import flow from sqlalchemy.orm import Session -from prefect import flow from impulsoetl import __VERSION__ from impulsoetl.bd import Sessao -from impulsoetl.sisab.relatorio_saude_producao.extracao_outros import extrair_relatorio_outros -from impulsoetl.sisab.relatorio_saude_producao.tratamento import tratamento_dados -from impulsoetl.sisab.relatorio_saude_producao.verificacao import verificar_informacoes_relatorio_producao - -from impulsoetl.utilitarios.bd import carregar_dataframe from impulsoetl.loggers import logger +from impulsoetl.sisab.excecoes import SisabExcecao +from impulsoetl.sisab.relatorio_saude_producao.extracao import ( + extrair_relatorio, +) +from impulsoetl.sisab.relatorio_saude_producao.tratamento import ( + tratamento_dados, +) +from impulsoetl.sisab.relatorio_saude_producao.verificacao import ( + verificar_informacoes_relatorio_producao, +) +from impulsoetl.utilitarios.bd import carregar_dataframe @flow( @@ -35,7 +43,6 @@ def obter_relatorio_producao_por_profissionais_outros( periodo_id: str, unidade_geografica_id: str, ) -> None: - """ Extrai, transforma e carrega os dados do Relatório de Produção do SISAB Argumentos: @@ -45,23 +52,66 @@ def obter_relatorio_producao_por_profissionais_outros( periodo_id: Código de identificação do período. unidade_geografica_id: Código de identificação da unidade geográfica. """ + tempo_inicio_etl = time.time() - logger.info("Extraindo relatório da competencia {}, ...".format(periodo_competencia)) + operacao_id = "06423293-7fac-7493-b209-e5aa489879fb" - df_extraido = extrair_relatorio_outros( - periodo_competencia = periodo_competencia - ) - - df_tratado = tratamento_dados( - df_extraido=df_extraido, - periodo_id=periodo_id, - unidade_geografica_id=unidade_geografica_id, + logger.info( + "Extraindo relatório da competencia {}, ...".format( + periodo_competencia + ) ) - verificar_informacoes_relatorio_producao(df_tratado) + try: - carregar_dataframe( - sessao=sessao, df=df_tratado, tabela_destino=tabela_destino - ) - print('Terminou carga') + df_extraido = extrair_relatorio( + periodo_competencia=periodo_competencia + ) + + df_tratado = tratamento_dados( + df_extraido=df_extraido, + periodo_id=periodo_id, + unidade_geografica_id=unidade_geografica_id, + ) + + verificar_informacoes_relatorio_producao(df_tratado) + + carregar_dataframe( + sessao=sessao, df=df_tratado, tabela_destino=tabela_destino + ) + + except (KeyError, pd.errors.ParserError): + traceback_str = traceback.format_exc() + enviar_erro = SisabExcecao("Competência indisponível no SISAB") + enviar_erro.insere_erro_database( + sessao=sessao, + traceback_str=traceback_str, + operacao_id=operacao_id, + periodo_id=periodo_id, + ) + logger.error("Data da competência do relatório não está disponível") + return 0 + + except Exception as mensagem_erro: + traceback_str = traceback.format_exc() + enviar_erro = SisabExcecao(mensagem_erro) + enviar_erro.insere_erro_database( + sessao=sessao, + traceback_str=traceback_str, + operacao_id=operacao_id, + periodo_id=periodo_id, + ) + + logger.error(mensagem_erro) + return 0 + + tempo_final_etl = time.time() - tempo_inicio_etl + + logger.info( + "Terminou ETL para " + + "da comepetência`{periodo_competencia}` " + + "em {tempo_final_etl}.", + periodo_competencia=periodo_competencia, + tempo_final_etl=tempo_final_etl, + )